Skip to content

Commit

Permalink
fix: rollup pull consumer (#142)
Browse files Browse the repository at this point in the history
* fix: rollup pull consumer

* debug: fix test

---------

Co-authored-by: gleip <kuchinsn@lad24.ru>
  • Loading branch information
gleip and gleip authored Jan 22, 2025
1 parent 69ddd98 commit 974dcb8
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 17 deletions.
2 changes: 1 addition & 1 deletion examples/MathService/service.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
"prefix": "stream",
"actions": [
{
"action": "*",
"action": ">",
"storage": "file",
"retentionPolicy": "limits",
"discardPolicy": "old",
Expand Down
3 changes: 3 additions & 0 deletions src/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ export class Client<E extends Emitter = Emitter> extends Root {
eventName: string,
) {
for await (const event of subscription) {
if (!event) {
continue;
}
let data: unknown;
try {
data = JSONCodec<unknown>().decode(event.data);
Expand Down
3 changes: 3 additions & 0 deletions src/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export class Service<E extends Emitter = Emitter> extends Root {
* Roll-up only same subject message in the stream
*/
private readonly ROLLUP_STRATEGY = 'sub';
private readonly BASE_EVENT_SUFFIX = 'base';

constructor(private options: ServiceOptions<E>) {
super(options.brokerConnection, options.loggerOutputFormatter);
Expand Down Expand Up @@ -91,6 +92,8 @@ export class Service<E extends Emitter = Emitter> extends Root {
settings = settings ?? { headers: headers() };
settings.headers.append(this.ROLLUP_HEADER, this.ROLLUP_STRATEGY);
subject.push(rollupId);
} else {
subject.push(this.BASE_EVENT_SUFFIX);
}

this.broker.publish(subject.join('.'), this.buildMessage(params), settings);
Expand Down
13 changes: 3 additions & 10 deletions src/StreamSingleMsgFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
import { Consumer, ConsumerMessages, JsMsg } from 'nats';
import { Consumer } from 'nats';

export class StreamSingleMsgFetcher {
private done = false;
private msg: JsMsg;

constructor(private consumer: Consumer) {}

[Symbol.asyncIterator]() {
const done = {
value: this.msg,
done: true,
};

return {
next: async () => {
const msg = await this.consumer.next();
if (msg) {
this.msg = msg;
return {
value: this.msg,
value: msg,
done: this.done,
};
}

return done;
return { done: false };
},
};
}
Expand Down
11 changes: 5 additions & 6 deletions src/__tests__/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,15 @@ describe('Testing Client class methods', () => {

test('Successful subscription and event processing for a streaming event', () => {
const payload = { data: { elapsed: 42 } };
const subscribe = new PassThrough({ objectMode: true });
jetstreamSubscribeMock.mockReturnValue(subscribe);
jetstreamNextMock.mockResolvedValue({ data: codec.encode(payload.data), sid: '1', ack: jest.fn(), nak: jest.fn() });

const result = mathClient.getListener('Test');

result.on('Elapsed', event => {
const handler = event => {
expect(event.data.elapsed).toBe(payload.data.elapsed);
});
result.off('Elapsed', handler);
}
result.on('Elapsed', handler);

subscribe.write({ data: codec.encode(payload.data), sid: '1', ack: jest.fn(), nak: jest.fn() });
});

test('Successful unsubscribe from the event', () => {
Expand Down

0 comments on commit 974dcb8

Please sign in to comment.