From 423d1a86b4fb37b97a9cb9af8e4871b1e62caa76 Mon Sep 17 00:00:00 2001 From: Sergey Gleip <3on.gleip@gmail.com> Date: Tue, 14 Jan 2025 14:18:35 +0300 Subject: [PATCH] feat: add rollup support (#138) Co-authored-by: gleip --- src/Service.ts | 15 ++++++++++++++- src/StreamManager.ts | 4 ++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/Service.ts b/src/Service.ts index 1546211..4a4c582 100644 --- a/src/Service.ts +++ b/src/Service.ts @@ -48,6 +48,14 @@ export class Service extends Root { * de-duplication within the configured Duplicate Window */ private readonly UNIQ_ID_HEADER = 'Nats-Msg-Id'; + /** + * Nats-Rollup header indicating all prior messages should be purged + */ + private readonly ROLLUP_HEADER = 'Nats-Rollup'; + /** + * Roll-up only same subject message in the stream + */ + private readonly ROLLUP_STRATEGY = 'sub'; constructor(private options: ServiceOptions) { super(options.brokerConnection, options.loggerOutputFormatter); @@ -57,7 +65,7 @@ export class Service extends Root { if (options.events) { const events = Object.keys(options.events.list) as [keyof E]; this.emitter = events.reduce((result, eventName) => { - result[eventName] = ((params: unknown, uniqId?: string) => { + result[eventName] = ((params: unknown, uniqId?: string, rollupId?: string) => { const subject: string[] = [options.name]; const eventOptions = options.events?.list[eventName]; @@ -79,6 +87,11 @@ export class Service extends Root { settings = { headers: headers() }; settings.headers.append(this.UNIQ_ID_HEADER, uniqId); } + if (rollupId) { + settings = settings ?? { headers: headers() }; + settings.headers.append(this.ROLLUP_HEADER, this.ROLLUP_STRATEGY); + subject.push(rollupId); + } this.broker.publish(subject.join('.'), this.buildMessage(params), settings); }) as E[keyof E]; diff --git a/src/StreamManager.ts b/src/StreamManager.ts index 2044ea6..6300667 100644 --- a/src/StreamManager.ts +++ b/src/StreamManager.ts @@ -96,7 +96,7 @@ export class StreamManager extends Root { ): Promise { const consumerName = this.capitalizeFirstLetter(serviceNameFrom) + this.capitalizeFirstLetter(eventName); const prefix = this.param.options.prefix; - const subject = `${this.param.serviceName}.${prefix}.${eventName}`; + const subject = `${this.param.serviceName}.${prefix}.${eventName}.*`; if (!this.jsm) { this.jsm = await this.param.broker.jetstreamManager(); @@ -154,7 +154,7 @@ export class StreamManager extends Root { if (!isConsumerExist) { await this.jsm.consumers.add(streamName, { ...options.config, filter_subject: subject }); } else { - await this.jsm.consumers.update(streamName, consumerName, options.config); + await this.jsm.consumers.update(streamName, consumerName, { ...options.config, filter_subject: subject }); } }