diff --git a/clients/node/src/__tests__/moondream.test.ts b/clients/node/src/__tests__/moondream.test.ts index d52120c9..cad704bc 100644 --- a/clients/node/src/__tests__/moondream.test.ts +++ b/clients/node/src/__tests__/moondream.test.ts @@ -7,6 +7,7 @@ import { EventEmitter } from 'events'; // Extend EventEmitter to include http/https specific properties interface MockResponse extends EventEmitter { statusCode: number; + [Symbol.asyncIterator]?: () => AsyncGenerator; } interface MockRequest extends EventEmitter { @@ -114,11 +115,14 @@ describe('MoondreamClient', () => { ); }); - // TODO: Fix streaming unit tests. Integration tests verify this functionality works. - it.skip('should handle streaming responses', async () => { + it('should handle streaming responses', async () => { const mockReq = createMockRequest(); const mockRes = new EventEmitter() as MockResponse; mockRes.statusCode = 200; + mockRes[Symbol.asyncIterator] = async function* () { + yield Buffer.from('data: {"chunk":"test chunk"}\n'); + yield Buffer.from('data: {"completed":true}\n'); + }; (https.request as jest.Mock).mockImplementation((url, options, callback) => { callback(mockRes); @@ -133,19 +137,12 @@ describe('MoondreamClient', () => { const result = await client.caption(request); expect(result.caption).toBeDefined(); - // Emit chunks immediately - mockRes.emit('data', Buffer.from('data: {"chunk":"test chunk"}\n')); - mockRes.emit('data', Buffer.from('data: {"completed":true}\n')); - mockRes.emit('end'); - - // Get first chunk - const chunk = await (result.caption as AsyncGenerator).next(); - expect(chunk.value).toBe('test chunk'); - expect(chunk.done).toBe(false); + const chunks: string[] = []; + for await (const chunk of result.caption as AsyncGenerator) { + chunks.push(chunk); + } - // Get completion - const done = await (result.caption as AsyncGenerator).next(); - expect(done.done).toBe(true); + expect(chunks).toEqual(['test chunk']); }); it('should throw an error on API failure', async () => { @@ -199,11 +196,14 @@ describe('MoondreamClient', () => { ); }); - // TODO: Fix streaming unit tests. Integration tests verify this functionality works. - it.skip('should handle streaming query responses', async () => { + it('should handle streaming query responses', async () => { const mockReq = createMockRequest(); const mockRes = new EventEmitter() as MockResponse; mockRes.statusCode = 200; + mockRes[Symbol.asyncIterator] = async function* () { + yield Buffer.from('data: {"chunk":"test answer"}\n'); + yield Buffer.from('data: {"completed":true}\n'); + }; (https.request as jest.Mock).mockImplementation((url, options, callback) => { callback(mockRes); @@ -218,24 +218,11 @@ describe('MoondreamClient', () => { const result = await client.query(request); expect(result.answer).toBeDefined(); - // Create a promise that resolves when we get all chunks const chunks: string[] = []; - const streamDone = new Promise((resolve) => { - (async () => { - for await (const chunk of result.answer as AsyncGenerator) { - chunks.push(chunk); - } - resolve(); - })(); - }); - - // Emit chunks - mockRes.emit('data', Buffer.from('data: {"chunk":"test answer"}\n')); - mockRes.emit('data', Buffer.from('data: {"completed":true}\n')); - mockRes.emit('end'); + for await (const chunk of result.answer as AsyncGenerator) { + chunks.push(chunk); + } - // Wait for streaming to complete - await streamDone; expect(chunks).toEqual(['test answer']); }); }); diff --git a/clients/node/src/moondream.ts b/clients/node/src/moondream.ts index d998df0a..c8593596 100644 --- a/clients/node/src/moondream.ts +++ b/clients/node/src/moondream.ts @@ -130,68 +130,43 @@ export class vl { private async* streamResponse(response: any): AsyncGenerator { let buffer = ''; - // Create a promise that resolves when we get data or end - const getNextChunk = () => new Promise<{ value: string | undefined; done: boolean }>((resolve, reject) => { - const onData = (chunk: Buffer) => { - try { - buffer += chunk.toString(); - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - - for (const line of lines) { - if (line.startsWith('data: ')) { + try { + for await (const chunk of response) { + buffer += chunk.toString(); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (line.startsWith('data: ')) { + try { const data = JSON.parse(line.slice(6)); if ('chunk' in data) { - response.removeListener('data', onData); - response.removeListener('end', onEnd); - response.removeListener('error', onError); - resolve({ value: data.chunk, done: false }); - return; + yield data.chunk; } if (data.completed) { - response.removeListener('data', onData); - response.removeListener('end', onEnd); - response.removeListener('error', onError); - resolve({ value: undefined, done: true }); return; } + } catch (error) { + throw new Error(`Failed to parse JSON response from server: ${(error as Error).message}`); } } - } catch (error) { - response.removeListener('data', onData); - response.removeListener('end', onEnd); - response.removeListener('error', onError); - reject(new Error(`Failed to parse JSON response from server: ${(error as Error).message}`)); } - }; - - const onEnd = () => { - response.removeListener('data', onData); - response.removeListener('end', onEnd); - response.removeListener('error', onError); - resolve({ value: undefined, done: true }); - }; - - const onError = (error: Error) => { - response.removeListener('data', onData); - response.removeListener('end', onEnd); - response.removeListener('error', onError); - reject(error); - }; - - response.on('data', onData); - response.on('end', onEnd); - response.on('error', onError); - }); + } - try { - while (true) { - const result = await getNextChunk(); - if (result.done) { - return; - } - if (result.value !== undefined) { - yield result.value; + // Handle any remaining data in the buffer + if (buffer) { + const lines = buffer.split('\n'); + for (const line of lines) { + if (line.startsWith('data: ')) { + try { + const data = JSON.parse(line.slice(6)); + if ('chunk' in data) { + yield data.chunk; + } + } catch (error) { + throw new Error(`Failed to parse JSON response from server: ${(error as Error).message}`); + } + } } } } catch (error) {