Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jan 30, 2025
1 parent 8458c6f commit e3341b0
Show file tree
Hide file tree
Showing 24 changed files with 202 additions and 127 deletions.
1 change: 1 addition & 0 deletions .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ export MONGODB_URI=${MONGODB_URI}
export LOAD_BALANCER=${LOAD_BALANCER}
export TEST_CSFLE=${TEST_CSFLE}
export COMPRESSOR=${COMPRESSOR}
export NODE_OPTIONS="${NODE_OPTIONS} --trace-uncaught"
npm run "${TEST_NPM_SCRIPT}"
2 changes: 2 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ export class ChangeStream<
this.isClosed = false;
this.mode = false;

this.on('error', () => null);

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
Expand Down
8 changes: 2 additions & 6 deletions src/client-side-encryption/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,8 @@ export class StateMachine {
let socket: tls.TLSSocket;

function destroySockets() {
for (const sock of [socket, netSocket]) {
if (sock) {
sock.removeAllListeners();
sock.destroy();
}
}
socket?.destroy();
netSocket?.destroy();
}

function onerror(cause: Error) {
Expand Down
1 change: 1 addition & 0 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ export async function makeSocket(

addAbortSignalToStream(closeSignal, socket);

socket.unref();
socket.setKeepAlive(true, 300000);
socket.setTimeout(connectTimeoutMS);
socket.setNoDelay(noDelay);
Expand Down
10 changes: 9 additions & 1 deletion src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
);
}

unref() {
this.socket.unref();
}

ref() {
this.socket.ref();
}

public markAvailable(): void {
this.lastUseTime = now();
}
Expand Down Expand Up @@ -353,7 +361,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
return;
}

this.socket.destroy();
if (!this.socket.destroyed) this.socket.destroy();
this.error = error;

this.dataEvents?.throw(error).then(undefined, squashError);
Expand Down
3 changes: 3 additions & 0 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
if (!this.checkedOut.has(connection)) {
return;
}

connection.unref();
const poolClosed = this.closed;
const stale = this.connectionIsStale(connection);
const willDestroy = !!(poolClosed || stale || connection.closed);
Expand Down Expand Up @@ -788,6 +790,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);

this.waitQueue.shift();
connection.ref();
waitQueueMember.resolve(connection);
}
}
Expand Down
17 changes: 9 additions & 8 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -691,14 +691,15 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements

/* @internal */
private async _close(force = false): Promise<void> {
this.closeController.abort();
// There's no way to set hasBeenClosed back to false
Object.defineProperty(this.s, 'hasBeenClosed', {
value: true,
enumerable: true,
configurable: false,
writable: false
});
try {
this.closeController.abort();
// There's no way to set hasBeenClosed back to false
Object.defineProperty(this.s, 'hasBeenClosed', {
value: true,
enumerable: true,
configurable: false,
writable: false
});

const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
this.s.activeCursors.clear();
Expand Down
22 changes: 8 additions & 14 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,20 +384,13 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}

// connecting does an implicit `hello`
(async () => {
const makeMonitoringConnection = async () => {
const socket = await makeSocket(monitor.connectOptions, monitor.closeSignal);
const connection = makeConnection(monitor.connectOptions, socket);
// The start time is after socket creation but before the handshake
start = now();
try {
await performInitialHandshake(connection, monitor.connectOptions, monitor.closeSignal);
return connection;
} catch (error) {
connection.destroy();
throw error;
}
})().then(
connection => {
if (isInCloseState(monitor)) {
connection.destroy();
return;
Expand All @@ -417,15 +410,16 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
)
);

callback(undefined, connection.hello);
},
error => {
return connection.hello;
} catch (error) {
connection.destroy();
monitor.connection = null;
awaited = false;
onHeartbeatFailed(error);
throw error;
}
);
};

makeMonitoringConnection().then(callback.bind(undefined, undefined), onHeartbeatFailed);
}

function monitorServer(monitor: Monitor) {
Expand Down
33 changes: 32 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-console */
import * as crypto from 'crypto';
import type { SrvRecord } from 'dns';
import { type EventEmitter } from 'events';
Expand Down Expand Up @@ -1571,7 +1572,37 @@ export function addAbortSignalToStream(

const abortListener = addAbortListener(signal, function () {
stream.off('close', abortListener[kDispose]).off('error', abortListener[kDispose]);
stream.destroy(this.reason);
const removeAll = stream.removeAllListeners.bind(stream);

//@ts-expect-error: overwrite
stream.removeAllListeners = function (...args) {
const removingError = args.length === 0 || args.includes('error');
if (removingError) {
console.log('removing all', args, new Error('why are u removing my error listener'));
}
return removeAll(...args);
};

stream.on('removeListener', (name, listener) => {
if (name === 'error') {
console.log(
'who doth remove my error listener',
new Error('error listener gone missing!!'),
listener
);
}
});

const error = new Error(
//@ts-expect-error: we added these
`sad: ${stream.___socketId}: error listeners: ${stream.listenerCount('error')} + ${stream.___stack}`,
{ cause: this.reason }
);

//@ts-expect-error: adding this for debug
error.stream = stream;

stream.destroy(error);
});
// not nearly as complex as node's eos() but... do we need all that?? sobbing emoji.
stream.once('close', abortListener[kDispose]).once('error', abortListener[kDispose]);
Expand Down
16 changes: 14 additions & 2 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
MongoChangeStreamError,
type MongoClient,
MongoServerError,
promiseWithResolvers,
ReadPreference,
type ResumeToken
} from '../../mongodb';
Expand Down Expand Up @@ -62,6 +63,7 @@ describe('Change Streams', function () {
await csDb.createCollection('test').catch(() => null);
collection = csDb.collection('test');
changeStream = collection.watch();
changeStream.once('error', error => this.error(error));
});

afterEach(async () => {
Expand Down Expand Up @@ -695,10 +697,18 @@ describe('Change Streams', function () {
async test() {
await initIteratorMode(changeStream);

const { promise, resolve, reject } = promiseWithResolvers<void>();

const outStream = new PassThrough({ objectMode: true });

// @ts-expect-error: transform requires a Document return type
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
const csStream = changeStream
// @ts-expect-error: transform requires a Document return type
.stream({ transform: JSON.stringify });

csStream.once('error', reject).pipe(outStream).once('error', reject);

outStream.on('close', resolve);
csStream.on('close', resolve);

const willBeData = once(outStream, 'data');

Expand All @@ -709,6 +719,8 @@ describe('Change Streams', function () {
expect(parsedEvent).to.have.nested.property('fullDocument.a', 1);

outStream.destroy();
csStream.destroy();
await promise;
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const metadata: MongoDBMetadataUI = {
}
};

const closeSignal = new AbortController().signal;

context('Azure KMS Mock Server Tests', function () {
context('Case 1: Success', metadata, function () {
// Do not set an ``X-MongoDB-HTTP-TestParams`` header.
Expand All @@ -44,7 +46,7 @@ context('Azure KMS Mock Server Tests', function () {
// 5. The token will have a resource of ``"https://vault.azure.net"``

it('returns a properly formatted access token', async () => {
const credentials = await fetchAzureKMSToken(new KMSRequestOptions());
const credentials = await fetchAzureKMSToken(new KMSRequestOptions(), closeSignal);
expect(credentials).to.have.property('accessToken', 'magic-cookie');
});
});
Expand All @@ -59,7 +61,10 @@ context('Azure KMS Mock Server Tests', function () {
// The test case should ensure that this error condition is handled gracefully.

it('returns an error', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('empty-json')).catch(e => e);
const error = await fetchAzureKMSToken(
new KMSRequestOptions('empty-json'),
closeSignal
).catch(e => e);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand All @@ -74,7 +79,9 @@ context('Azure KMS Mock Server Tests', function () {
// The test case should ensure that this error condition is handled gracefully.

it('returns an error', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('bad-json')).catch(e => e);
const error = await fetchAzureKMSToken(new KMSRequestOptions('bad-json'), closeSignal).catch(
e => e
);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand All @@ -89,7 +96,9 @@ context('Azure KMS Mock Server Tests', function () {
// 2. The response body is unspecified.
// The test case should ensure that this error condition is handled gracefully.
it('returns an error', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('404')).catch(e => e);
const error = await fetchAzureKMSToken(new KMSRequestOptions('404'), closeSignal).catch(
e => e
);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand All @@ -104,7 +113,9 @@ context('Azure KMS Mock Server Tests', function () {
// 2. The response body is unspecified.
// The test case should ensure that this error condition is handled gracefully.
it('returns an error', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('500')).catch(e => e);
const error = await fetchAzureKMSToken(new KMSRequestOptions('500'), closeSignal).catch(
e => e
);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand All @@ -117,7 +128,9 @@ context('Azure KMS Mock Server Tests', function () {
// The HTTP response from the ``fake_azure`` server will take at least 1000 seconds
// to complete. The request should fail with a timeout.
it('returns an error after the request times out', async () => {
const error = await fetchAzureKMSToken(new KMSRequestOptions('slow')).catch(e => e);
const error = await fetchAzureKMSToken(new KMSRequestOptions('slow'), closeSignal).catch(
e => e
);

expect(error).to.be.instanceof(MongoCryptAzureKMSRequestError);
});
Expand Down
6 changes: 4 additions & 2 deletions test/integration/client-side-encryption/driver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -829,12 +829,14 @@ describe('CSOT', function () {
});

describe('State machine', function () {
const stateMachine = new StateMachine({} as any);
const signal = new AbortController().signal;
const stateMachine = new StateMachine({} as any, undefined, signal);

const timeoutContext = () => ({
timeoutContext: new CSOTTimeoutContext({
timeoutMS: 1000,
serverSelectionTimeoutMS: 30000
serverSelectionTimeoutMS: 30000,
closeSignal: signal
})
});

Expand Down
Loading

0 comments on commit e3341b0

Please sign in to comment.