Skip to content

Commit

Permalink
await heartbeats part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-khare-mongoDB committed Jan 22, 2025
1 parent 777d890 commit a91e9af
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 185 deletions.
161 changes: 0 additions & 161 deletions socket-connection-rtt-monitoring.cjs

This file was deleted.

71 changes: 48 additions & 23 deletions test/integration/node-specific/client_close.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { type TestConfiguration } from '../../tools/runner/config';
import { runScriptAndGetProcessInfo } from './resource_tracking_script_builder';

describe.only('MongoClient.close() Integration', () => {
describe.skip('MongoClient.close() Integration', () => {
// note: these tests are set-up in accordance of the resource ownership tree

let config: TestConfiguration;
Expand Down Expand Up @@ -120,13 +120,19 @@ describe.only('MongoClient.close() Integration', () => {
'monitor interval timer is cleaned up by client.close()',
metadata,
async function () {
const run = async function ({ MongoClient, uri, expect, getTimerCount, mongodb }) {
const run = async function ({
MongoClient,
uri,
expect,
getTimerCount,
promiseWithResolvers
}) {
const heartbeatFrequencyMS = 2000;
const client = new MongoClient(uri, { heartbeatFrequencyMS });
const { heartbeatPromise, resolve } = mongodb.promiseWithResolvers();
const { promise, resolve } = promiseWithResolvers();
client.once('serverHeartbeatSucceeded', () => resolve());
await client.connect();
await heartbeatPromise;
await promise;

function monitorTimersExist(servers) {
for (const [, server] of servers) {
Expand All @@ -153,12 +159,18 @@ describe.only('MongoClient.close() Integration', () => {
'the new monitor interval timer is cleaned up by client.close()',
metadata,
async () => {
const run = async function ({ MongoClient, expect, getTimerCount }) {
const run = async function ({
MongoClient,
expect,
getTimerCount,
promiseWithResolvers
}) {
const heartbeatFrequencyMS = 2000;
const client = new MongoClient('mongodb://fakeUri', { heartbeatFrequencyMS });
const heartbeatPromise = client.once('serverHeartbeatFailed', v => v);
const { promise, resolve } = promiseWithResolvers();
client.once('serverHeartbeatFailed', () => resolve());
client.connect();
await heartbeatPromise;
await promise;

function getMonitorTimer(servers) {
for (const [, server] of servers) {
Expand Down Expand Up @@ -213,16 +225,22 @@ describe.only('MongoClient.close() Integration', () => {
'the rtt pinger timer is cleaned up by client.close()',
metadata,
async function () {
const run = async function ({ MongoClient, uri, expect, getTimerCount, mongodb }) {
const run = async function ({
MongoClient,
uri,
expect,
getTimerCount,
promiseWithResolvers
}) {
const heartbeatFrequencyMS = 2000;
const client = new MongoClient(uri, {
serverMonitoringMode: 'stream',
heartbeatFrequencyMS
});
await client.connect();
const { heartbeatPromise, resolve } = mongodb.promiseWithResolvers();
const { promise, resolve } = promiseWithResolvers();
client.once('serverHeartbeatSucceeded', () => resolve());
await heartbeatPromise;
await promise;

function getRttTimer(servers) {
for (const [, server] of servers) {
Expand All @@ -248,8 +266,14 @@ describe.only('MongoClient.close() Integration', () => {
describe('Node.js resource: Socket', () => {
describe('when rtt monitoring is turned on', () => {
it('no sockets remain after client.close()', metadata, async () => {
const run = async ({ MongoClient, uri, expect, getSockets, mongodb }) => {
const heartbeatFrequencyMS = 100;
const run = async ({
MongoClient,
uri,
expect,
getSockets,
promiseWithResolvers
}) => {
const heartbeatFrequencyMS = 500;
const client = new MongoClient(uri, {
serverMonitoringMode: 'stream',
heartbeatFrequencyMS
Expand All @@ -258,27 +282,28 @@ describe.only('MongoClient.close() Integration', () => {

const socketsAddressesBeforeHeartbeat = getSockets().map(r => r.address);

const activeSocketsAfterHeartbeat = () =>
getSockets()
.filter(r => !socketsAddressesBeforeHeartbeat.includes(r.address))
.map(r => r.remoteEndpoint?.host + ':' + r.remoteEndpoint?.port);

// set of servers whose hearbeats have occurred
// set of servers whose heartbeats have occurred
const heartbeatOccurredSet = new Set();

const servers = client.topology.s.servers;

while (heartbeatOccurredSet.size < servers.size) {
const { heartbeatPromise, resolve } = mongodb.promiseWithResolvers();
client.once('serverHeartbeatSucceeded', (ev) => {
const { promise, resolve } = promiseWithResolvers();
client.once('serverHeartbeatSucceeded', ev => {
heartbeatOccurredSet.add(ev.connectionId);
resolve();
});
await heartbeatPromise;
await promise;
}

const activeSocketsAfterHeartbeat = () =>
getSockets()
.filter(r => !socketsAddressesBeforeHeartbeat.includes(r.address))
.map(r => r.remoteEndpoint?.host + ':' + r.remoteEndpoint?.port);
// all servers should have had a heartbeat event and had a new socket created for rtt pinger
for (const [server,] of servers) {
expect(activeSocketsAfterHeartbeat()).to.deep.contain(server[0]);
const activeSocketsBeforeClose = activeSocketsAfterHeartbeat();
for (const [server] of servers) {
expect(activeSocketsBeforeClose).to.deep.contain(server);
}

// close the client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export type ProcessResourceTestFunction = (options: {
timers?: typeof timers;
getSocketReport?: () => { host: string; port: string };
getSocketEndpointReport?: () => any;
promiseWithResolvers?: () => any;
}) => Promise<void>;

const HEAP_RESOURCE_SCRIPT_PATH = path.resolve(
Expand Down
23 changes: 22 additions & 1 deletion test/tools/fixtures/process_resource_script.in.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ const getSocketEndpoints = () =>
.libuv.filter(r => r.type === 'tcp')
.map(r => r.remoteEndpoint);

function promiseWithResolvers() {
let resolve;
let reject;
const promise = new Promise((promiseResolve, promiseReject) => {
resolve = promiseResolve;
reject = promiseReject;
});
return { promise, resolve, reject };
}

// A log function for debugging
function log(message) {
// remove outer parentheses for easier parsing
Expand All @@ -110,7 +120,18 @@ async function main() {
process.on('beforeExit', () => {
log({ beforeExitHappened: true });
});
await run({ MongoClient, uri, log, expect, mongodb, sleep, getTimerCount, getSockets, getSocketEndpoints });
await run({
MongoClient,
uri,
log,
expect,
mongodb,
sleep,
getTimerCount,
getSockets,
getSocketEndpoints,
promiseWithResolvers
});
log({ newResources: getNewResources() });
}

Expand Down

0 comments on commit a91e9af

Please sign in to comment.