diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index 4c7c917998..c484a80f49 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -16,7 +16,7 @@ import { MongoRuntimeError, needsRetryableWriteLabel } from '../error'; -import { type Callback, HostAddress, ns } from '../utils'; +import { HostAddress, ns, promiseWithResolvers } from '../utils'; import { AuthContext, type AuthProvider } from './auth/auth_provider'; import { GSSAPI } from './auth/gssapi'; import { MongoCR } from './auth/mongocr'; @@ -55,27 +55,26 @@ export const AUTH_PROVIDERS = new Map([ /** @public */ export type Stream = Socket | TLSSocket; -export function connect(options: ConnectionOptions, callback: Callback): void { - makeConnection({ ...options, existingSocket: undefined }, (err, socket) => { - if (err || !socket) { - return callback(err); - } - - let ConnectionType = options.connectionType ?? Connection; - if (options.autoEncrypter) { - ConnectionType = CryptoConnection; - } +export async function connect(options: ConnectionOptions): Promise { + let connection: Connection | null = null; + try { + const socket = await makeSocket(options); + connection = makeConnection(options, socket); + await performInitialHandshake(connection, options); + return connection; + } catch (error) { + connection?.destroy({ force: false }); + throw error; + } +} - const connection = new ConnectionType(socket, options); +export function makeConnection(options: ConnectionOptions, socket: Stream): Connection { + let ConnectionType = options.connectionType ?? Connection; + if (options.autoEncrypter) { + ConnectionType = CryptoConnection; + } - performInitialHandshake(connection, options).then( - () => callback(undefined, connection), - error => { - connection.destroy({ force: false }); - callback(error); - } - ); - }); + return new ConnectionType(socket, options); } function checkSupportedServer(hello: Document, options: ConnectionOptions) { @@ -103,7 +102,7 @@ function checkSupportedServer(hello: Document, options: ConnectionOptions) { return new MongoCompatibilityError(message); } -async function performInitialHandshake( +export async function performInitialHandshake( conn: Connection, options: ConnectionOptions ): Promise { @@ -329,11 +328,7 @@ function parseSslOptions(options: MakeConnectionOptions): TLSConnectionOpts { return result; } -const SOCKET_ERROR_EVENT_LIST = ['error', 'close', 'timeout', 'parseError'] as const; -type ErrorHandlerEventName = (typeof SOCKET_ERROR_EVENT_LIST)[number] | 'cancel'; -const SOCKET_ERROR_EVENTS = new Set(SOCKET_ERROR_EVENT_LIST); - -function makeConnection(options: MakeConnectionOptions, _callback: Callback) { +export async function makeSocket(options: MakeConnectionOptions): Promise { const useTLS = options.tls ?? false; const noDelay = options.noDelay ?? true; const connectTimeoutMS = options.connectTimeoutMS ?? 30000; @@ -341,23 +336,13 @@ function makeConnection(options: MakeConnectionOptions, _callback: Callback = function (err, ret) { - if (err && socket) { - socket.destroy(); - } - - _callback(err, ret); - }; if (options.proxyHost != null) { // Currently, only Socks5 is supported. - return makeSocks5Connection( - { - ...options, - connectTimeoutMS // Should always be present for Socks5 - }, - callback - ); + return makeSocks5Connection({ + ...options, + connectTimeoutMS // Should always be present for Socks5 + }); } if (useTLS) { @@ -379,47 +364,41 @@ function makeConnection(options: MakeConnectionOptions, _callback: Callback void; - function errorHandler(eventName: ErrorHandlerEventName) { - return (err: Error) => { - SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event)); - if (cancellationHandler && options.cancellationToken) { - options.cancellationToken.removeListener('cancel', cancellationHandler); - } - - socket.removeListener(connectEvent, connectHandler); - callback(connectionFailureError(eventName, err)); - }; - } + let cancellationHandler: ((err: Error) => void) | null = null; - function connectHandler() { - SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event)); - if (cancellationHandler && options.cancellationToken) { - options.cancellationToken.removeListener('cancel', cancellationHandler); + const { promise: connectedSocket, resolve, reject } = promiseWithResolvers(); + if (existingSocket) { + resolve(socket); + } else { + const connectEvent = useTLS ? 'secureConnect' : 'connect'; + socket + .once(connectEvent, () => resolve(socket)) + .once('error', error => reject(connectionFailureError('error', error))) + .once('timeout', () => reject(connectionFailureError('timeout'))) + .once('close', () => reject(connectionFailureError('close'))); + + if (options.cancellationToken != null) { + cancellationHandler = () => reject(connectionFailureError('cancel')); + options.cancellationToken.once('cancel', cancellationHandler); } + } - if ('authorizationError' in socket) { - if (socket.authorizationError && rejectUnauthorized) { - // TODO(NODE-5192): wrap this with a MongoError subclass - return callback(socket.authorizationError); - } + try { + socket = await connectedSocket; + return socket; + } catch (error) { + socket.destroy(); + if ('authorizationError' in socket && socket.authorizationError != null && rejectUnauthorized) { + // TODO(NODE-5192): wrap this with a MongoError subclass + throw socket.authorizationError; } - + throw error; + } finally { socket.setTimeout(0); - callback(undefined, socket); - } - - SOCKET_ERROR_EVENTS.forEach(event => socket.once(event, errorHandler(event))); - if (options.cancellationToken) { - cancellationHandler = errorHandler('cancel'); - options.cancellationToken.once('cancel', cancellationHandler); - } - - if (existingSocket) { - process.nextTick(connectHandler); - } else { - socket.once(connectEvent, connectHandler); + socket.removeAllListeners(); + if (cancellationHandler != null) { + options.cancellationToken?.removeListener('cancel', cancellationHandler); + } } } @@ -435,78 +414,68 @@ function loadSocks() { return socks; } -function makeSocks5Connection(options: MakeConnectionOptions, callback: Callback) { +async function makeSocks5Connection(options: MakeConnectionOptions): Promise { const hostAddress = HostAddress.fromHostPort( options.proxyHost ?? '', // proxyHost is guaranteed to set here options.proxyPort ?? 1080 ); // First, connect to the proxy server itself: - makeConnection( - { - ...options, - hostAddress, - tls: false, - proxyHost: undefined - }, - (err, rawSocket) => { - if (err || !rawSocket) { - return callback(err); - } + const rawSocket = await makeSocket({ + ...options, + hostAddress, + tls: false, + proxyHost: undefined + }); - const destination = parseConnectOptions(options) as net.TcpNetConnectOpts; - if (typeof destination.host !== 'string' || typeof destination.port !== 'number') { - return callback( - new MongoInvalidArgumentError('Can only make Socks5 connections to TCP hosts') - ); - } + const destination = parseConnectOptions(options) as net.TcpNetConnectOpts; + if (typeof destination.host !== 'string' || typeof destination.port !== 'number') { + throw new MongoInvalidArgumentError('Can only make Socks5 connections to TCP hosts'); + } - try { - socks ??= loadSocks(); - } catch (error) { - return callback(error); + socks ??= loadSocks(); + + try { + // Then, establish the Socks5 proxy connection: + const { socket } = await socks.SocksClient.createConnection({ + existing_socket: rawSocket, + timeout: options.connectTimeoutMS, + command: 'connect', + destination: { + host: destination.host, + port: destination.port + }, + proxy: { + // host and port are ignored because we pass existing_socket + host: 'iLoveJavaScript', + port: 0, + type: 5, + userId: options.proxyUsername || undefined, + password: options.proxyPassword || undefined } + }); - // Then, establish the Socks5 proxy connection: - socks.SocksClient.createConnection({ - existing_socket: rawSocket, - timeout: options.connectTimeoutMS, - command: 'connect', - destination: { - host: destination.host, - port: destination.port - }, - proxy: { - // host and port are ignored because we pass existing_socket - host: 'iLoveJavaScript', - port: 0, - type: 5, - userId: options.proxyUsername || undefined, - password: options.proxyPassword || undefined - } - }).then( - ({ socket }) => { - // Finally, now treat the resulting duplex stream as the - // socket over which we send and receive wire protocol messages: - makeConnection( - { - ...options, - existingSocket: socket, - proxyHost: undefined - }, - callback - ); - }, - error => callback(connectionFailureError('error', error)) - ); - } - ); + // Finally, now treat the resulting duplex stream as the + // socket over which we send and receive wire protocol messages: + return await makeSocket({ + ...options, + existingSocket: socket, + proxyHost: undefined + }); + } catch (error) { + throw connectionFailureError('error', error); + } } -function connectionFailureError(type: ErrorHandlerEventName, err: Error) { +function connectionFailureError(type: 'error', cause: Error): MongoNetworkError; +function connectionFailureError(type: 'close' | 'timeout' | 'cancel'): MongoNetworkError; +function connectionFailureError( + type: 'error' | 'close' | 'timeout' | 'cancel', + cause?: Error +): MongoNetworkError { switch (type) { case 'error': - return new MongoNetworkError(MongoError.buildErrorMessage(err), { cause: err }); + return new MongoNetworkError(MongoError.buildErrorMessage(cause), { cause }); case 'timeout': return new MongoNetworkTimeoutError('connection timed out'); case 'close': diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 5c7de8dd54..6071794f60 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -710,8 +710,48 @@ export class ConnectionPool extends TypedEventEmitter { new ConnectionCreatedEvent(this, { id: connectOptions.id }) ); - connect(connectOptions, (err, connection) => { - if (err || !connection) { + connect(connectOptions).then( + connection => { + // The pool might have closed since we started trying to create a connection + if (this[kPoolState] !== PoolState.ready) { + this[kPending]--; + connection.destroy({ force: true }); + callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this)); + return; + } + + // forward all events from the connection to the pool + for (const event of [...APM_EVENTS, Connection.CLUSTER_TIME_RECEIVED]) { + connection.on(event, (e: any) => this.emit(event, e)); + } + + if (this.loadBalanced) { + connection.on(Connection.PINNED, pinType => this[kMetrics].markPinned(pinType)); + connection.on(Connection.UNPINNED, pinType => this[kMetrics].markUnpinned(pinType)); + + const serviceId = connection.serviceId; + if (serviceId) { + let generation; + const sid = serviceId.toHexString(); + if ((generation = this.serviceGenerations.get(sid))) { + connection.generation = generation; + } else { + this.serviceGenerations.set(sid, 0); + connection.generation = 0; + } + } + } + + connection.markAvailable(); + this.emitAndLog( + ConnectionPool.CONNECTION_READY, + new ConnectionReadyEvent(this, connection) + ); + + this[kPending]--; + callback(undefined, connection); + }, + error => { this[kPending]--; this.emitAndLog( ConnectionPool.CONNECTION_CLOSED, @@ -720,53 +760,15 @@ export class ConnectionPool extends TypedEventEmitter { { id: connectOptions.id, serviceId: undefined }, 'error', // TODO(NODE-5192): Remove this cast - err as MongoError + error as MongoError ) ); - if (err instanceof MongoNetworkError || err instanceof MongoServerError) { - err.connectionGeneration = connectOptions.generation; - } - callback(err ?? new MongoRuntimeError('Connection creation failed without error')); - return; - } - - // The pool might have closed since we started trying to create a connection - if (this[kPoolState] !== PoolState.ready) { - this[kPending]--; - connection.destroy({ force: true }); - callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this)); - return; - } - - // forward all events from the connection to the pool - for (const event of [...APM_EVENTS, Connection.CLUSTER_TIME_RECEIVED]) { - connection.on(event, (e: any) => this.emit(event, e)); - } - - if (this.loadBalanced) { - connection.on(Connection.PINNED, pinType => this[kMetrics].markPinned(pinType)); - connection.on(Connection.UNPINNED, pinType => this[kMetrics].markUnpinned(pinType)); - - const serviceId = connection.serviceId; - if (serviceId) { - let generation; - const sid = serviceId.toHexString(); - if ((generation = this.serviceGenerations.get(sid))) { - connection.generation = generation; - } else { - this.serviceGenerations.set(sid, 0); - connection.generation = 0; - } + if (error instanceof MongoNetworkError || error instanceof MongoServerError) { + error.connectionGeneration = connectOptions.generation; } + callback(error ?? new MongoRuntimeError('Connection creation failed without error')); } - - connection.markAvailable(); - this.emitAndLog(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection)); - - this[kPending]--; - callback(undefined, connection); - return; - }); + ); } private ensureMinPoolSize() { diff --git a/src/error.ts b/src/error.ts index 311b5bfac5..b488d0d5d7 100644 --- a/src/error.ts +++ b/src/error.ts @@ -109,8 +109,8 @@ export interface ErrorDescription extends Document { errInfo?: Document; } -function isAggregateError(e: Error): e is Error & { errors: Error[] } { - return 'errors' in e && Array.isArray(e.errors); +function isAggregateError(e: unknown): e is Error & { errors: Error[] } { + return e != null && typeof e === 'object' && 'errors' in e && Array.isArray(e.errors); } /** @@ -150,7 +150,7 @@ export class MongoError extends Error { } /** @internal */ - static buildErrorMessage(e: Error | string): string { + static buildErrorMessage(e: unknown): string { if (typeof e === 'string') { return e; } @@ -160,7 +160,9 @@ export class MongoError extends Error { : e.errors.map(({ message }) => message).join(', '); } - return e.message; + return e != null && typeof e === 'object' && 'message' in e && typeof e.message === 'string' + ? e.message + : 'empty error message'; } override get name(): string { diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index b3a10a74fb..a85403c746 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -1,7 +1,7 @@ import { clearTimeout, setTimeout } from 'timers'; import { type Document, Long } from '../bson'; -import { connect } from '../cmap/connect'; +import { connect, makeConnection, makeSocket, performInitialHandshake } from '../cmap/connect'; import type { Connection, ConnectionOptions } from '../cmap/connection'; import { getFAASEnv } from '../cmap/handshake/client_metadata'; import { LEGACY_HELLO_COMMAND } from '../constants'; @@ -235,7 +235,7 @@ function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion } function checkServer(monitor: Monitor, callback: Callback) { - let start = now(); + let start: number; let awaited: boolean; const topologyVersion = monitor[kServer].description.topologyVersion; const isAwaitable = useStreamingProtocol(monitor, topologyVersion); @@ -287,15 +287,17 @@ function checkServer(monitor: Monitor, callback: Callback) { new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable) ); - // If we are using the streaming protocol then we immediately issue another 'started' - // event, otherwise the "check" is complete and return to the main monitor loop. if (isAwaitable) { + // If we are using the streaming protocol then we immediately issue another 'started' + // event, otherwise the "check" is complete and return to the main monitor loop monitor.emitAndLogHeartbeat( Server.SERVER_HEARTBEAT_STARTED, monitor[kServer].topology.s.id, undefined, new ServerHeartbeatStartedEvent(monitor.address, true) ); + // We have not actually sent an outgoing handshake, but when we get the next response we + // want the duration to reflect the time since we last heard from the server start = now(); } else { monitor.rttPinger?.close(); @@ -335,6 +337,9 @@ function checkServer(monitor: Monitor, callback: Callback) { ); } + // Record new start time before sending handshake + start = now(); + if (isAwaitable) { awaited = true; return connection.exhaustCommand(ns('admin.$cmd'), cmd, options, (error, hello) => { @@ -352,37 +357,46 @@ function checkServer(monitor: Monitor, callback: Callback) { } // connecting does an implicit `hello` - connect(monitor.connectOptions, (err, conn) => { - if (err) { - monitor.connection = null; - - awaited = false; - onHeartbeatFailed(err); - return; + (async () => { + const socket = await makeSocket(monitor.connectOptions); + const connection = makeConnection(monitor.connectOptions, socket); + // The start time is after socket creation but before the handshake + start = now(); + try { + await performInitialHandshake(connection, monitor.connectOptions); + return connection; + } catch (error) { + connection.destroy({ force: false }); + throw error; } - - if (conn) { + })().then( + connection => { if (isInCloseState(monitor)) { - conn.destroy({ force: true }); + connection.destroy({ force: true }); return; } - monitor.connection = conn; + monitor.connection = connection; monitor.emitAndLogHeartbeat( Server.SERVER_HEARTBEAT_SUCCEEDED, monitor[kServer].topology.s.id, - conn.hello?.connectionId, + connection.hello?.connectionId, new ServerHeartbeatSucceededEvent( monitor.address, calculateDurationInMs(start), - conn.hello, - useStreamingProtocol(monitor, conn.hello?.topologyVersion) + connection.hello, + useStreamingProtocol(monitor, connection.hello?.topologyVersion) ) ); - callback(undefined, conn.hello); + callback(undefined, connection.hello); + }, + error => { + monitor.connection = null; + awaited = false; + onHeartbeatFailed(error); } - }); + ); } function monitorServer(monitor: Monitor) { @@ -498,16 +512,15 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { const connection = rttPinger.connection; if (connection == null) { - connect(options, (err, conn) => { - if (err) { + connect(options).then( + connection => { + measureAndReschedule(connection); + }, + () => { rttPinger.connection = undefined; rttPinger[kRoundTripTime] = 0; - return; } - - measureAndReschedule(conn); - }); - + ); return; } diff --git a/src/utils.ts b/src/utils.ts index 677c681396..719367cad2 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -497,9 +497,9 @@ export function now(): number { } /** @internal */ -export function calculateDurationInMs(started: number): number { +export function calculateDurationInMs(started: number | undefined): number { if (typeof started !== 'number') { - throw new MongoInvalidArgumentError('Numeric value required to calculate duration'); + return -1; } const elapsed = now() - started; diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index 3c46cfe38e..e702795b40 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -1,11 +1,10 @@ -import { promisify } from 'node:util'; - import { expect } from 'chai'; import { connect, Connection, type ConnectionOptions, + HostAddress, LEGACY_HELLO_COMMAND, makeClientMetadata, MongoClient, @@ -17,6 +16,16 @@ import { import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration'; import { assert as test, setupDatabase } from '../shared'; +const commonConnectOptions = { + id: 1, + generation: 1, + monitorCommands: false, + tls: false, + loadBalanced: false, + // Will be overridden by configuration options + hostAddress: HostAddress.fromString('127.0.0.1:1') +}; + describe('Connection', function () { beforeEach( skipBrokenAuthTestBeforeEachHook({ @@ -35,7 +44,8 @@ describe('Connection', function () { it('should execute a command against a server', { metadata: { requires: { apiVersion: false, topology: '!load-balanced' } }, test: async function () { - const connectOptions: Partial = { + const connectOptions: ConnectionOptions = { + ...commonConnectOptions, connectionType: Connection, ...this.configuration.options, metadata: makeClientMetadata({ driverInfo: {} }) @@ -43,7 +53,7 @@ describe('Connection', function () { let conn; try { - conn = await promisify(connect)(connectOptions as any as ConnectionOptions); + conn = await connect(connectOptions); const hello = await conn?.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }); expect(hello).to.have.property('ok', 1); } finally { @@ -55,7 +65,8 @@ describe('Connection', function () { it('should emit command monitoring events', { metadata: { requires: { apiVersion: false, topology: '!load-balanced' } }, test: async function () { - const connectOptions: Partial = { + const connectOptions: ConnectionOptions = { + ...commonConnectOptions, connectionType: Connection, ...this.configuration.options, monitorCommands: true, @@ -64,7 +75,7 @@ describe('Connection', function () { let conn; try { - conn = await promisify(connect)(connectOptions as any as ConnectionOptions); + conn = await connect(connectOptions); const events: any[] = []; conn.on('commandStarted', event => events.push(event)); diff --git a/test/tools/runner/hooks/configuration.js b/test/tools/runner/hooks/configuration.js index 9d4798faa1..e947a6f069 100644 --- a/test/tools/runner/hooks/configuration.js +++ b/test/tools/runner/hooks/configuration.js @@ -94,7 +94,11 @@ const testSkipBeforeEachHook = async function () { } }; -// TODO: NODE-3891 - fix tests that are broken with auth enabled and remove this hook +/** + * TODO: NODE-3891 - fix tests that are broken with auth enabled and remove this hook + * @param {{ skippedTests: string[] }} skippedTests - define list of tests to skip + * @returns + */ const skipBrokenAuthTestBeforeEachHook = function ({ skippedTests } = { skippedTests: [] }) { return function () { if (process.env.AUTH === 'auth' && skippedTests.includes(this.currentTest.title)) { diff --git a/test/unit/cmap/connect.test.ts b/test/unit/cmap/connect.test.ts index 39ae57d0de..7f69f54d17 100644 --- a/test/unit/cmap/connect.test.ts +++ b/test/unit/cmap/connect.test.ts @@ -1,5 +1,4 @@ import { expect } from 'chai'; -import { promisify } from 'util'; import { CancellationToken, @@ -51,7 +50,7 @@ describe('Connect Tests', function () { afterEach(() => mock.cleanup()); - it('should auth against a non-arbiter', function (done) { + it('should auth against a non-arbiter', async function () { const whatHappened = {}; test.server.setMessageHandler(request => { @@ -71,19 +70,13 @@ describe('Connect Tests', function () { } }); - connect(test.connectOptions, err => { - try { - expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true); - expect(whatHappened).to.have.property('saslStart', true); - } catch (_err) { - err = _err; - } + await connect(test.connectOptions); - done(err); - }); + expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true); + expect(whatHappened).to.have.property('saslStart', true); }); - it('should not auth against an arbiter', function (done) { + it('should not auth against an arbiter', async function () { const whatHappened = {}; test.server.setMessageHandler(request => { const doc = request.document; @@ -102,16 +95,10 @@ describe('Connect Tests', function () { } }); - connect(test.connectOptions, err => { - try { - expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true); - expect(whatHappened).to.not.have.property('saslStart'); - } catch (_err) { - err = _err; - } + await connect(test.connectOptions); - done(err); - }); + expect(whatHappened).to.have.property(LEGACY_HELLO_COMMAND, true); + expect(whatHappened).to.not.have.property('saslStart'); }); }); @@ -133,10 +120,7 @@ describe('Connect Tests', function () { socketTimeoutMS: 15000 }; - connection = await promisify(callback => - //@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence - connect(connectOptions, callback) - )(); + connection = await connect(connectOptions); }); afterEach(async () => { @@ -166,19 +150,13 @@ describe('Connect Tests', function () { }); }); - const error = await promisify(callback => - connect( - { - ...connectOptions, - // Ensure these timeouts do not fire first - socketTimeoutMS: 5000, - connectTimeoutMS: 5000, - cancellationToken - }, - //@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence - callback - ) - )().catch(error => error); + const error = await connect({ + ...connectOptions, + // Ensure these timeouts do not fire first + socketTimeoutMS: 5000, + connectTimeoutMS: 5000, + cancellationToken + }).catch(error => error); expect(error, error.stack).to.match(/connection establishment was cancelled/); }); @@ -189,21 +167,20 @@ describe('Connect Tests', function () { // set no response handler for mock server, effectively black hole requests server.setMessageHandler(() => null); - const error = await promisify(callback => - //@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence - connect({ ...connectOptions, connectTimeoutMS: 5 }, callback) - )().catch(error => error); + const error = await connect({ ...connectOptions, connectTimeoutMS: 5 }).catch( + error => error + ); expect(error).to.match(/timed out/); }); }); }); - it('should emit `MongoNetworkError` for network errors', function (done) { - connect({ hostAddress: new HostAddress('non-existent:27018') }, err => { - expect(err).to.be.instanceOf(MongoNetworkError); - done(); - }); + it('should emit `MongoNetworkError` for network errors', async function () { + const error = await connect({ + hostAddress: new HostAddress('non-existent:27018') + }).catch(e => e); + expect(error).to.be.instanceOf(MongoNetworkError); }); context('prepareHandshakeDocument', () => { diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 04194d47b3..ec902fffd3 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -1,19 +1,10 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { promisify } from 'util'; - -import { - connect as driverConnectCb, - Connection, - isHello, - MongoNetworkTimeoutError, - ns -} from '../../mongodb'; + +import { connect, Connection, isHello, MongoNetworkTimeoutError, ns } from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; import { getSymbolFrom } from '../../tools/utils'; -const connect = promisify(driverConnectCb); - const connectionOptionsDefaults = { id: 0, generation: 0, diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index caf2cff17c..c5eb826ddf 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -1,8 +1,11 @@ +import * as net from 'node:net'; + import { expect } from 'chai'; import { coerce } from 'semver'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; +import { MongoClient } from '../../mongodb'; import { isHello, LEGACY_HELLO_COMMAND, @@ -579,4 +582,37 @@ describe('monitoring', function () { }); }); }); + + describe('Heartbeat duration', function () { + let client: MongoClient; + let serverHeartbeatFailed; + let sockets; + + beforeEach(async function () { + sockets = []; + // Artificially make creating a connection take 200ms + sinon.stub(net, 'createConnection').callsFake(function () { + const socket = new net.Socket(); + sockets.push(socket); + setTimeout(() => socket.emit('connect'), 80); + socket.on('data', () => socket.destroy(new Error('I am not real!'))); + return socket; + }); + + client = new MongoClient(`mongodb://localhost:1`, { serverSelectionTimeoutMS: 200 }); + client.on('serverHeartbeatFailed', ev => (serverHeartbeatFailed = ev)); + }); + + afterEach(function () { + sinon.restore(); + for (const socket of sockets ?? []) socket.destroy(); + sockets = undefined; + }); + + it('includes only the time to perform handshake', async function () { + const maybeError = await client.connect().catch(e => e); + expect(maybeError).to.be.instanceOf(Error); + expect(serverHeartbeatFailed).to.have.property('duration').that.is.lessThan(20); // way less than 80ms + }); + }); });