Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve Node.js client streaming implementation #182

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 19 additions & 32 deletions clients/node/src/__tests__/moondream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { EventEmitter } from 'events';
// Extend EventEmitter to include http/https specific properties
interface MockResponse extends EventEmitter {
statusCode: number;
[Symbol.asyncIterator]?: () => AsyncGenerator<Buffer, void, unknown>;
}

interface MockRequest extends EventEmitter {
Expand Down Expand Up @@ -114,11 +115,14 @@ describe('MoondreamClient', () => {
);
});

// TODO: Fix streaming unit tests. Integration tests verify this functionality works.
it.skip('should handle streaming responses', async () => {
it('should handle streaming responses', async () => {
const mockReq = createMockRequest();
const mockRes = new EventEmitter() as MockResponse;
mockRes.statusCode = 200;
mockRes[Symbol.asyncIterator] = async function* () {
yield Buffer.from('data: {"chunk":"test chunk"}\n');
yield Buffer.from('data: {"completed":true}\n');
};

(https.request as jest.Mock).mockImplementation((url, options, callback) => {
callback(mockRes);
Expand All @@ -133,19 +137,12 @@ describe('MoondreamClient', () => {
const result = await client.caption(request);
expect(result.caption).toBeDefined();

// Emit chunks immediately
mockRes.emit('data', Buffer.from('data: {"chunk":"test chunk"}\n'));
mockRes.emit('data', Buffer.from('data: {"completed":true}\n'));
mockRes.emit('end');

// Get first chunk
const chunk = await (result.caption as AsyncGenerator<string>).next();
expect(chunk.value).toBe('test chunk');
expect(chunk.done).toBe(false);
const chunks: string[] = [];
for await (const chunk of result.caption as AsyncGenerator<string>) {
chunks.push(chunk);
}

// Get completion
const done = await (result.caption as AsyncGenerator<string>).next();
expect(done.done).toBe(true);
expect(chunks).toEqual(['test chunk']);
});

it('should throw an error on API failure', async () => {
Expand Down Expand Up @@ -199,11 +196,14 @@ describe('MoondreamClient', () => {
);
});

// TODO: Fix streaming unit tests. Integration tests verify this functionality works.
it.skip('should handle streaming query responses', async () => {
it('should handle streaming query responses', async () => {
const mockReq = createMockRequest();
const mockRes = new EventEmitter() as MockResponse;
mockRes.statusCode = 200;
mockRes[Symbol.asyncIterator] = async function* () {
yield Buffer.from('data: {"chunk":"test answer"}\n');
yield Buffer.from('data: {"completed":true}\n');
};

(https.request as jest.Mock).mockImplementation((url, options, callback) => {
callback(mockRes);
Expand All @@ -218,24 +218,11 @@ describe('MoondreamClient', () => {
const result = await client.query(request);
expect(result.answer).toBeDefined();

// Create a promise that resolves when we get all chunks
const chunks: string[] = [];
const streamDone = new Promise<void>((resolve) => {
(async () => {
for await (const chunk of result.answer as AsyncGenerator<string>) {
chunks.push(chunk);
}
resolve();
})();
});

// Emit chunks
mockRes.emit('data', Buffer.from('data: {"chunk":"test answer"}\n'));
mockRes.emit('data', Buffer.from('data: {"completed":true}\n'));
mockRes.emit('end');
for await (const chunk of result.answer as AsyncGenerator<string>) {
chunks.push(chunk);
}

// Wait for streaming to complete
await streamDone;
expect(chunks).toEqual(['test answer']);
});
});
Expand Down
79 changes: 27 additions & 52 deletions clients/node/src/moondream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,68 +130,43 @@ export class vl {
private async* streamResponse(response: any): AsyncGenerator<string, void, unknown> {
let buffer = '';

// Create a promise that resolves when we get data or end
const getNextChunk = () => new Promise<{ value: string | undefined; done: boolean }>((resolve, reject) => {
const onData = (chunk: Buffer) => {
try {
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop() || '';

for (const line of lines) {
if (line.startsWith('data: ')) {
try {
for await (const chunk of response) {
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop() || '';

for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));
if ('chunk' in data) {
response.removeListener('data', onData);
response.removeListener('end', onEnd);
response.removeListener('error', onError);
resolve({ value: data.chunk, done: false });
return;
yield data.chunk;
}
if (data.completed) {
response.removeListener('data', onData);
response.removeListener('end', onEnd);
response.removeListener('error', onError);
resolve({ value: undefined, done: true });
return;
}
} catch (error) {
throw new Error(`Failed to parse JSON response from server: ${(error as Error).message}`);
}
}
} catch (error) {
response.removeListener('data', onData);
response.removeListener('end', onEnd);
response.removeListener('error', onError);
reject(new Error(`Failed to parse JSON response from server: ${(error as Error).message}`));
}
};

const onEnd = () => {
response.removeListener('data', onData);
response.removeListener('end', onEnd);
response.removeListener('error', onError);
resolve({ value: undefined, done: true });
};

const onError = (error: Error) => {
response.removeListener('data', onData);
response.removeListener('end', onEnd);
response.removeListener('error', onError);
reject(error);
};

response.on('data', onData);
response.on('end', onEnd);
response.on('error', onError);
});
}

try {
while (true) {
const result = await getNextChunk();
if (result.done) {
return;
}
if (result.value !== undefined) {
yield result.value;
// Handle any remaining data in the buffer
if (buffer) {
const lines = buffer.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));
if ('chunk' in data) {
yield data.chunk;
}
} catch (error) {
throw new Error(`Failed to parse JSON response from server: ${(error as Error).message}`);
}
}
}
}
} catch (error) {
Expand Down
Loading