Skip to content

Commit

Permalink
feat: Uniq key for event (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
gleip authored Nov 8, 2024
1 parent 61145d8 commit ff69a67
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
6 changes: 3 additions & 3 deletions examples/MathService/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ export interface FibonacciNumberEvent {
}

export type EmitterMath = {
Elapsed: (params: ElapsedEvent) => void;
Notify: (params: NotifyEvent) => void;
FibonacciNumber: (params: FibonacciNumberEvent) => void;
Elapsed: (params: ElapsedEvent, uniqId?: string) => void;
Notify: (params: NotifyEvent, uniqId?: string) => void;
FibonacciNumber: (params: FibonacciNumberEvent, uniqId?: string) => void;
};

export type EmitterMathExternal = {
Expand Down
18 changes: 15 additions & 3 deletions src/Service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Root } from './Root';
import { JSONCodec, Subscription, DebugEvents, Events } from 'nats';
import { JSONCodec, Subscription, DebugEvents, Events, headers, MsgHdrs } from 'nats';
import {
Message,
Emitter,
Expand Down Expand Up @@ -43,6 +43,12 @@ export class Service<E extends Emitter = Emitter> extends Root {
private httpMethods = new Map<string, Method>();
private rootSpans = new Map<string, Span>();

/**
* Unique identifier NATS header for a message that will be used by the server apply
* de-duplication within the configured Duplicate Window
*/
private readonly UNIQ_ID_HEADER = 'Nats-Msg-Id';

constructor(private options: ServiceOptions<E>) {
super(options.brokerConnection, options.loggerOutputFormatter);

Expand All @@ -51,7 +57,7 @@ export class Service<E extends Emitter = Emitter> extends Root {
if (options.events) {
const events = Object.keys(options.events.list) as [keyof E];
this.emitter = events.reduce((result, eventName) => {
result[eventName] = ((params: unknown) => {
result[eventName] = ((params: unknown, uniqId?: string) => {
const subject: string[] = [options.name];

const eventOptions = options.events?.list[eventName];
Expand All @@ -68,7 +74,13 @@ export class Service<E extends Emitter = Emitter> extends Root {

subject.push(String(eventName));

this.broker.publish(subject.join('.'), this.buildMessage(params));
let settings: { headers: MsgHdrs } | undefined;
if (uniqId) {
settings = { headers: headers() };
settings.headers.append(this.UNIQ_ID_HEADER, uniqId);
}

this.broker.publish(subject.join('.'), this.buildMessage(params), settings);
}) as E[keyof E];
return result;
}, this.emitter);
Expand Down
15 changes: 15 additions & 0 deletions src/__tests__/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ describe('Testing Service class methods', () => {
subscribe: jetstreamSubscribeMock,
}),
status: () => brokerEvents,
publish: jest.fn(),
};

const codec = JSONCodec();
Expand Down Expand Up @@ -234,6 +235,20 @@ describe('Testing Service class methods', () => {
await setTimeout(1);
expect(result).toMatchObject(streamResponse);
});

test('Event sending', async () => {
const mathService = new Service<EmitterMath>({
name,
brokerConnection: broker as any,
methods: [],
events,
});

await mathService.start();

mathService.emitter.Notify({ method: 'Test' }, 'uniq_key');
expect(broker.publish).toBeCalledTimes(1);
});
});

describe('Other', () => {
Expand Down

0 comments on commit ff69a67

Please sign in to comment.