Skip to content

Commit

Permalink
fix: grow subscriptions (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
gleip authored Nov 7, 2024
1 parent dd1945f commit d8b0ce2
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export class Client<E extends Emitter = Emitter> extends Root {
private async startBatchWatch(fetcher: StreamFetcher, listener: EventEmitter, eventName: string) {
while (true) {
const batch: Partial<EmitterStreamEvent<any>>[] = [];
const events = fetcher.fetch();
const events = await fetcher.fetch();

for await (const event of events) {
let data: unknown;
Expand All @@ -97,7 +97,7 @@ export class Client<E extends Emitter = Emitter> extends Root {
batch.push(message);
}
if (batch.length > 0) listener.emit(eventName, batch);
events.stop();
await events.close();
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/StreamFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JetStreamClient, JsMsg, QueuedIterator } from 'nats';
import { ConsumerMessages, JetStreamClient } from 'nats';

interface BatcherOptions {
batchSize?: number;
Expand All @@ -13,11 +13,11 @@ export class StreamFetcher {
private options: BatcherOptions,
) {}

public fetch(noWait?: boolean, size?: number, expires?: number): QueuedIterator<JsMsg> {
return this.jsClient.fetch(this.streamName, this.consumerName, {
batch: size ?? this.options.batchSize,
public async fetch(size?: number, expires?: number): Promise<ConsumerMessages> {
const consumer = await this.jsClient.consumers.get(this.streamName, this.consumerName);
return await consumer.fetch({
max_messages: size ?? this.options.batchSize,
expires: expires ?? this.options.batchTimeout,
no_wait: noWait ?? this.options.noWait,
});
}
}
14 changes: 9 additions & 5 deletions src/__tests__/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ describe('Testing Client class methods', () => {
request: jest.fn(),
jetstream: () => ({
subscribe: jetstreamSubscribeMock,
fetch: jetstreamFetchMock,
consumers: {
get: jest.fn().mockResolvedValue({
fetch: jetstreamFetchMock,
}),
},
}),
jetstreamManager: jest.fn().mockResolvedValue({
streams: {
Expand Down Expand Up @@ -97,10 +101,10 @@ describe('Testing Client class methods', () => {
const payload = { data: { elapsed: 42 } };
const subscribe = new PassThrough({ objectMode: true });
const secondSubscribe = new PassThrough({ objectMode: true });
subscribe['stop'] = jest.fn();
secondSubscribe['stop'] = jest.fn();
jetstreamFetchMock.mockReturnValueOnce(subscribe);
jetstreamFetchMock.mockReturnValueOnce(secondSubscribe);
subscribe['close'] = jest.fn().mockResolvedValue('Ok');
secondSubscribe['close'] = jest.fn().mockResolvedValue('Ok');
jetstreamFetchMock.mockResolvedValueOnce(subscribe);
jetstreamFetchMock.mockResolvedValueOnce(secondSubscribe);

const result = mathClient.getListener('Test', { batch: true });

Expand Down

0 comments on commit d8b0ce2

Please sign in to comment.