Skip to content

Commit

Permalink
refactor(NODE-5915): topology close logic to be synchronous (#4021)
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James authored Mar 11, 2024
1 parent 937c9c8 commit 36fa752
Show file tree
Hide file tree
Showing 18 changed files with 108 additions and 249 deletions.
2 changes: 1 addition & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export async function connect(options: ConnectionOptions): Promise<Connection> {
await performInitialHandshake(connection, options);
return connection;
} catch (error) {
connection?.destroy({ force: false });
connection?.destroy();
throw error;
}
}
Expand Down
12 changes: 1 addition & 11 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,6 @@ export interface ConnectionOptions
mongoLogger?: MongoLogger | undefined;
}

/** @internal */
export interface DestroyOptions {
/** Force the destruction. */
force: boolean;
}

/** @public */
export type ConnectionEvents = {
commandStarted(event: CommandStartedEvent): void;
Expand Down Expand Up @@ -301,14 +295,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}, 1).unref(); // No need for this timer to hold the event loop open
}

public destroy(options: DestroyOptions, callback?: Callback): void {
public destroy(): void {
if (this.closed) {
if (typeof callback === 'function') process.nextTick(callback);
return;
}
if (typeof callback === 'function') {
this.once('close', () => process.nextTick(() => callback()));
}

// load balanced mode requires that these listeners remain on the connection
// after cleanup on timeouts, errors or close so we remove them before calling
Expand Down
44 changes: 14 additions & 30 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import {
type Callback,
eachAsync,
List,
makeCounter,
promiseWithResolvers,
Expand Down Expand Up @@ -493,25 +492,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
private interruptInUseConnections(minGeneration: number) {
for (const connection of this[kCheckedOut]) {
if (connection.generation <= minGeneration) {
this.checkIn(connection);
connection.onError(new PoolClearedOnNetworkError(this));
this.checkIn(connection);
}
}
}

/** Close the pool */
close(callback: Callback<void>): void;
close(options: CloseOptions, callback: Callback<void>): void;
close(_options?: CloseOptions | Callback<void>, _cb?: Callback<void>): void {
let options = _options as CloseOptions;
const callback = (_cb ?? _options) as Callback<void>;
if (typeof options === 'function') {
options = {};
}

options = Object.assign({ force: false }, options);
close(): void {
if (this.closed) {
return callback();
return;
}

// immediately cancel any in-flight connections
Expand All @@ -526,21 +516,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this.clearMinPoolSizeTimer();
this.processWaitQueue();

eachAsync<Connection>(
this[kConnections].toArray(),
(conn, cb) => {
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy({ force: !!options.force }, cb);
},
err => {
this[kConnections].clear();
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
callback(err);
}
);
for (const conn of this[kConnections]) {
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy();
}
this[kConnections].clear();
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
}

/**
Expand Down Expand Up @@ -592,7 +576,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionClosedEvent(this, connection, reason)
);
// destroy the connection
process.nextTick(() => connection.destroy({ force: false }));
connection.destroy();
}

private connectionIsStale(connection: Connection) {
Expand Down Expand Up @@ -648,7 +632,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// The pool might have closed since we started trying to create a connection
if (this[kPoolState] !== PoolState.ready) {
this[kPending]--;
connection.destroy({ force: true });
connection.destroy();
callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this));
return;
}
Expand Down
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ export type {
Connection,
ConnectionEvents,
ConnectionOptions,
DestroyOptions,
ProxyOptions
} from './cmap/connection';
export type {
Expand Down
21 changes: 7 additions & 14 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
try {
await promisify(callback => this.topology?.connect(options, callback))();
} catch (error) {
this.topology?.close({ force: true });
this.topology?.close();
throw error;
}
};
Expand Down Expand Up @@ -614,19 +614,12 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
const topology = this.topology;
this.topology = undefined;

await new Promise<void>((resolve, reject) => {
topology.close({ force }, error => {
if (error) return reject(error);
const { encrypter } = this[kOptions];
if (encrypter) {
return encrypter.closeCallback(this, force, error => {
if (error) return reject(error);
resolve();
});
}
resolve();
});
});
topology.close();

const { encrypter } = this[kOptions];
if (encrypter) {
await encrypter.close(this, force);
}
}

/**
Expand Down
14 changes: 7 additions & 7 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ function resetMonitorState(monitor: Monitor) {

monitor[kCancellationToken].emit('cancel');

monitor.connection?.destroy({ force: true });
monitor.connection?.destroy();
monitor.connection = null;
}

Expand Down Expand Up @@ -247,7 +247,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);

function onHeartbeatFailed(err: Error) {
monitor.connection?.destroy({ force: true });
monitor.connection?.destroy();
monitor.connection = null;

monitor.emitAndLogHeartbeat(
Expand Down Expand Up @@ -366,13 +366,13 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
await performInitialHandshake(connection, monitor.connectOptions);
return connection;
} catch (error) {
connection.destroy({ force: false });
connection.destroy();
throw error;
}
})().then(
connection => {
if (isInCloseState(monitor)) {
connection.destroy({ force: true });
connection.destroy();
return;
}

Expand Down Expand Up @@ -479,7 +479,7 @@ export class RTTPinger {
this.closed = true;
clearTimeout(this[kMonitorId]);

this.connection?.destroy({ force: true });
this.connection?.destroy();
this.connection = undefined;
}
}
Expand All @@ -495,7 +495,7 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {

function measureAndReschedule(conn?: Connection) {
if (rttPinger.closed) {
conn?.destroy({ force: true });
conn?.destroy();
return;
}

Expand Down Expand Up @@ -529,7 +529,7 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then(
() => measureAndReschedule(),
() => {
rttPinger.connection?.destroy({ force: true });
rttPinger.connection?.destroy();
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
Expand Down
25 changes: 5 additions & 20 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Document } from '../bson';
import { type AutoEncrypter } from '../client-side-encryption/auto_encrypter';
import { type CommandOptions, Connection, type DestroyOptions } from '../cmap/connection';
import { type CommandOptions, Connection } from '../cmap/connection';
import {
ConnectionPool,
type ConnectionPoolEvents,
Expand Down Expand Up @@ -41,7 +41,6 @@ import type { GetMoreOptions } from '../operations/get_more';
import type { ClientSession } from '../sessions';
import { isTransactionCommand } from '../transactions';
import {
type Callback,
type EventEmitterWithState,
makeStateMachine,
maxWireVersion,
Expand Down Expand Up @@ -236,18 +235,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

/** Destroy the server connection */
destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
}
options = Object.assign({}, { force: false }, options);

destroy(): void {
if (this.s.state === STATE_CLOSED) {
if (typeof callback === 'function') {
callback();
}

return;
}

Expand All @@ -257,13 +246,9 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this.monitor?.close();
}

this.pool.close(options, err => {
stateTransition(this, STATE_CLOSED);
this.emit('closed');
if (typeof callback === 'function') {
callback(err);
}
});
this.pool.close();
stateTransition(this, STATE_CLOSED);
this.emit('closed');
}

/**
Expand Down
Loading

0 comments on commit 36fa752

Please sign in to comment.