Skip to content

Commit

Permalink
feat: add rollup support (#138)
Browse files Browse the repository at this point in the history
Co-authored-by: gleip <kuchinsn@lad24.ru>
  • Loading branch information
gleip and gleip authored Jan 14, 2025
1 parent 51d63a1 commit 423d1a8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
15 changes: 14 additions & 1 deletion src/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ export class Service<E extends Emitter = Emitter> extends Root {
* de-duplication within the configured Duplicate Window
*/
private readonly UNIQ_ID_HEADER = 'Nats-Msg-Id';
/**
* Nats-Rollup header indicating all prior messages should be purged
*/
private readonly ROLLUP_HEADER = 'Nats-Rollup';
/**
* Roll-up only same subject message in the stream
*/
private readonly ROLLUP_STRATEGY = 'sub';

constructor(private options: ServiceOptions<E>) {
super(options.brokerConnection, options.loggerOutputFormatter);
Expand All @@ -57,7 +65,7 @@ export class Service<E extends Emitter = Emitter> extends Root {
if (options.events) {
const events = Object.keys(options.events.list) as [keyof E];
this.emitter = events.reduce((result, eventName) => {
result[eventName] = ((params: unknown, uniqId?: string) => {
result[eventName] = ((params: unknown, uniqId?: string, rollupId?: string) => {
const subject: string[] = [options.name];

const eventOptions = options.events?.list[eventName];
Expand All @@ -79,6 +87,11 @@ export class Service<E extends Emitter = Emitter> extends Root {
settings = { headers: headers() };
settings.headers.append(this.UNIQ_ID_HEADER, uniqId);
}
if (rollupId) {
settings = settings ?? { headers: headers() };
settings.headers.append(this.ROLLUP_HEADER, this.ROLLUP_STRATEGY);
subject.push(rollupId);
}

this.broker.publish(subject.join('.'), this.buildMessage(params), settings);
}) as E[keyof E];
Expand Down
4 changes: 2 additions & 2 deletions src/StreamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class StreamManager extends Root {
): Promise<StreamSingleMsgFetcher | StreamBatchMsgFetcher> {
const consumerName = this.capitalizeFirstLetter(serviceNameFrom) + this.capitalizeFirstLetter(eventName);
const prefix = this.param.options.prefix;
const subject = `${this.param.serviceName}.${prefix}.${eventName}`;
const subject = `${this.param.serviceName}.${prefix}.${eventName}.*`;

if (!this.jsm) {
this.jsm = await this.param.broker.jetstreamManager();
Expand Down Expand Up @@ -154,7 +154,7 @@ export class StreamManager extends Root {
if (!isConsumerExist) {
await this.jsm.consumers.add(streamName, { ...options.config, filter_subject: subject });
} else {
await this.jsm.consumers.update(streamName, consumerName, options.config);
await this.jsm.consumers.update(streamName, consumerName, { ...options.config, filter_subject: subject });
}
}

Expand Down

0 comments on commit 423d1a8

Please sign in to comment.