diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index ceed2a53d06..cebca858336 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -14,6 +14,7 @@ import type { MongoClient } from '../mongo_client'; import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types'; import { executeOperation, ExecutionResult } from '../operations/execute_operation'; import { GetMoreOperation } from '../operations/get_more'; +import { KillCursorsOperation } from '../operations/kill_cursors'; import { ReadConcern, ReadConcernLike } from '../read_concern'; import { ReadPreference, ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; @@ -118,7 +119,7 @@ export abstract class AbstractCursor< /** @internal */ [kId]?: Long; /** @internal */ - [kSession]?: ClientSession; + [kSession]: ClientSession; /** @internal */ [kServer]?: Server; /** @internal */ @@ -187,6 +188,8 @@ export abstract class AbstractCursor< if (options.session instanceof ClientSession) { this[kSession] = options.session; + } else { + this[kSession] = this[kClient].startSession({ owner: this, explicit: false }); } } @@ -217,11 +220,11 @@ export abstract class AbstractCursor< } /** @internal */ - get session(): ClientSession | undefined { + get session(): ClientSession { return this[kSession]; } - set session(clientSession: ClientSession | undefined) { + set session(clientSession: ClientSession) { this[kSession] = clientSession; } @@ -592,11 +595,12 @@ export abstract class AbstractCursor< const session = this[kSession]; if (session) { // We only want to end this session if we created it, and it hasn't ended yet - if (session.explicit === false && !session.hasEnded) { - session.endSession(); + if (session.explicit === false) { + if (!session.hasEnded) { + session.endSession(() => null); + } + this[kSession] = this.client.startSession({ owner: this, explicit: false }); } - - this[kSession] = undefined; } } @@ -644,22 +648,10 @@ export abstract class AbstractCursor< * a significant refactor. */ [kInit](callback: Callback): void { - if (this[kSession] == null) { - if (this[kClient].topology?.shouldCheckForSessionSupport()) { - return this[kClient].topology?.selectServer(ReadPreference.primaryPreferred, {}, err => { - if (err) return callback(err); - return this[kInit](callback); - }); - } else if (this[kClient].topology?.hasSessionSupport()) { - this[kSession] = this[kClient].topology?.startSession({ owner: this, explicit: false }); - } - } - this._initialize(this[kSession], (err, state) => { if (state) { const response = state.response; this[kServer] = state.server; - this[kSession] = state.session; if (response.cursor) { this[kId] = @@ -843,11 +835,11 @@ function cleanupCursor( } cursor[kKilled] = true; - server.killCursors( - cursorNs, - [cursorId], - { ...pluckBSONSerializeOptions(cursor[kOptions]), session }, - () => completeCleanup() + + return executeOperation( + cursor[kClient], + new KillCursorsOperation(cursorId, cursorNs, server, { session }), + completeCleanup ); } diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 068142e2cf5..28c4166ad36 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -13,17 +13,17 @@ import type { MONGO_CLIENT_EVENTS } from './constants'; import { Db, DbOptions } from './db'; import type { AutoEncrypter, AutoEncryptionOptions } from './deps'; import type { Encrypter } from './encrypter'; -import { MongoInvalidArgumentError, MongoNotConnectedError } from './error'; +import { MongoInvalidArgumentError } from './error'; import type { Logger, LoggerLevel } from './logger'; import { TypedEventEmitter } from './mongo_types'; import { connect } from './operations/connect'; import { PromiseProvider } from './promise_provider'; import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern'; -import type { ReadPreference, ReadPreferenceMode } from './read_preference'; +import { ReadPreference, ReadPreferenceMode } from './read_preference'; import type { TagSet } from './sdam/server_description'; import type { SrvPoller } from './sdam/srv_polling'; import type { Topology, TopologyEvents } from './sdam/topology'; -import type { ClientSession, ClientSessionOptions } from './sessions'; +import { ClientSession, ClientSessionOptions, ServerSessionPool } from './sessions'; import { Callback, ClientMetadata, @@ -267,10 +267,16 @@ export type WithSessionCallback = (session: ClientSession) => Promise; /** @internal */ export interface MongoClientPrivate { url: string; - sessions: Set; bsonOptions: BSONSerializeOptions; namespace: MongoDBNamespace; hasBeenClosed: boolean; + /** + * We keep a reference to the sessions that are acquired from the pool. + * - used to track and close all sessions in client.close() (which is non-standard behavior) + * - used to notify the leak checker in our tests if test author forgot to clean up explicit sessions + */ + readonly activeSessions: Set; + readonly sessionPool: ServerSessionPool; readonly options: MongoOptions; readonly readConcern?: ReadConcern; readonly writeConcern?: WriteConcern; @@ -352,10 +358,11 @@ export class MongoClient extends TypedEventEmitter { // The internal state this.s = { url, - sessions: new Set(), bsonOptions: resolveBSONOptions(this[kOptions]), namespace: ns('admin'), hasBeenClosed: false, + sessionPool: new ServerSessionPool(this), + activeSessions: new Set(), get options() { return client[kOptions]; @@ -470,23 +477,51 @@ export class MongoClient extends TypedEventEmitter { return maybePromise(callback, callback => { if (this.topology == null) { + // Do not connect just to end sessions return callback(); } - // clear out references to old topology - const topology = this.topology; - this.topology = undefined; - - topology.close({ force }, error => { - if (error) return callback(error); - const { encrypter } = this[kOptions]; - if (encrypter) { - return encrypter.close(this, force, error => { - callback(error); + const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession()); + this.s.activeSessions.clear(); + + Promise.all(activeSessionEnds) + .then(() => { + const endSessions = Array.from(this.s.sessionPool.sessions, ({ id }) => id); + if (endSessions.length === 0) return; + return this.db('admin') + .command( + { endSessions }, + { readPreference: ReadPreference.primaryPreferred, noResponse: true } + ) + .then(() => null) // outcome does not matter + .catch(() => null); // outcome does not matter + }) + .then(() => { + if (this.topology == null) { + return callback(); + } + // clear out references to old topology + const topology = this.topology; + this.topology = undefined; + + return new Promise((resolve, reject) => { + topology.close({ force }, error => { + if (error) return reject(error); + const { encrypter } = this[kOptions]; + if (encrypter) { + return encrypter.close(this, force, error => { + if (error) return reject(error); + resolve(); + }); + } + resolve(); + }); }); - } - callback(); - }); + }) + .then( + () => callback(), + error => callback(error) + ); }); } @@ -553,12 +588,17 @@ export class MongoClient extends TypedEventEmitter { startSession(): ClientSession; startSession(options: ClientSessionOptions): ClientSession; startSession(options?: ClientSessionOptions): ClientSession { - options = Object.assign({ explicit: true }, options); - if (!this.topology) { - throw new MongoNotConnectedError('MongoClient must be connected to start a session'); - } - - return this.topology.startSession(options, this.s.options); + const session = new ClientSession( + this, + this.s.sessionPool, + { explicit: true, ...options }, + this[kOptions] + ); + this.s.activeSessions.add(session); + session.once('ended', () => { + this.s.activeSessions.delete(session); + }); + return session; } /** diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 17674e54874..3579dae570c 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -114,16 +114,24 @@ export function executeOperation< if (topology.hasSessionSupport()) { if (session == null) { owner = Symbol(); - session = topology.startSession({ owner, explicit: false }); + session = client.startSession({ owner, explicit: false }); } else if (session.hasEnded) { return callback(new MongoExpiredSessionError('Use of expired sessions is not permitted')); } else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) { return callback(new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later')); } - } else if (session) { - // If the user passed an explicit session and we are still, after server selection, - // trying to run against a topology that doesn't support sessions we error out. - return callback(new MongoCompatibilityError('Current topology does not support sessions')); + } else { + // no session support + if (session && session.explicit) { + // If the user passed an explicit session and we are still, after server selection, + // trying to run against a topology that doesn't support sessions we error out. + return callback(new MongoCompatibilityError('Current topology does not support sessions')); + } else if (session && !session.explicit) { + // We do not have to worry about ending the session because the server session has not been acquired yet + delete operation.options.session; + operation.clearSession(); + session = undefined; + } } try { @@ -166,8 +174,8 @@ function executeWithServerSelection( let selector: ReadPreference | ServerSelector; - if (operation.hasAspect(Aspect.CURSOR_ITERATING)) { - // Get more operations must always select the same server, but run through + if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) { + // GetMore and KillCursor operations must always select the same server, but run through // server selection to potentially force monitor checks if the server is // in an unknown state. selector = sameServerSelector(operation.server?.description); diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index 51fe9cf4b75..a69c5f58980 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -60,4 +60,4 @@ export class GetMoreOperation extends AbstractOperation { } } -defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.CURSOR_ITERATING]); +defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.MUST_SELECT_SAME_SERVER]); diff --git a/src/operations/kill_cursors.ts b/src/operations/kill_cursors.ts new file mode 100644 index 00000000000..3891a8efaf8 --- /dev/null +++ b/src/operations/kill_cursors.ts @@ -0,0 +1,27 @@ +import type { Long } from '../bson'; +import { MongoRuntimeError } from '../error'; +import type { Server } from '../sdam/server'; +import type { ClientSession } from '../sessions'; +import type { Callback, MongoDBNamespace } from '../utils'; +import { AbstractOperation, Aspect, defineAspects, OperationOptions } from './operation'; + +export class KillCursorsOperation extends AbstractOperation { + cursorId: Long; + constructor(cursorId: Long, ns: MongoDBNamespace, server: Server, options: OperationOptions) { + super(options); + this.ns = ns; + this.cursorId = cursorId; + this.server = server; + } + + execute(server: Server, session: ClientSession | undefined, callback: Callback): void { + if (server !== this.server) { + return callback( + new MongoRuntimeError('Killcursor must run on the same server operation began on') + ); + } + server.killCursors(this.ns, [this.cursorId], { session }, () => callback()); + } +} + +defineAspects(KillCursorsOperation, [Aspect.MUST_SELECT_SAME_SERVER]); diff --git a/src/operations/operation.ts b/src/operations/operation.ts index e21a87585c9..0b5c5248717 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -11,7 +11,7 @@ export const Aspect = { EXPLAINABLE: Symbol('EXPLAINABLE'), SKIP_COLLATION: Symbol('SKIP_COLLATION'), CURSOR_CREATING: Symbol('CURSOR_CREATING'), - CURSOR_ITERATING: Symbol('CURSOR_ITERATING') + MUST_SELECT_SAME_SERVER: Symbol('MUST_SELECT_SAME_SERVER') } as const; /** @public */ @@ -94,6 +94,10 @@ export abstract class AbstractOperation { return this[kSession]; } + clearSession() { + this[kSession] = undefined; + } + get canRetryRead(): boolean { return true; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index c489385c2d3..a3505bd7a93 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -1,5 +1,6 @@ import Denque = require('denque'); import { setTimeout } from 'timers'; +import { promisify } from 'util'; import type { BSONSerializeOptions, Document } from '../bson'; import { deserialize, serialize } from '../bson'; @@ -29,20 +30,14 @@ import { MongoServerSelectionError, MongoTopologyClosedError } from '../error'; -import type { MongoClient, MongoOptions, ServerApi } from '../mongo_client'; +import type { MongoClient, ServerApi } from '../mongo_client'; import { TypedEventEmitter } from '../mongo_types'; import { ReadPreference, ReadPreferenceLike } from '../read_preference'; -import { - ClientSession, - ClientSessionOptions, - ServerSessionId, - ServerSessionPool -} from '../sessions'; +import type { ClientSession } from '../sessions'; import type { Transaction } from '../transactions'; import { Callback, ClientMetadata, - eachAsync, emitWarning, EventEmitterWithState, HostAddress, @@ -120,10 +115,6 @@ export interface TopologyPrivate { minHeartbeatFrequencyMS: number; /** A map of server instances to normalized addresses */ servers: Map; - /** Server Session Pool */ - sessionPool: ServerSessionPool; - /** Active client sessions */ - sessions: Set; credentials?: MongoCredentials; clusterTime?: ClusterTime; /** timers created for the initial connect to a server */ @@ -316,10 +307,6 @@ export class Topology extends TypedEventEmitter { minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS, // a map of server instances to normalized addresses servers: new Map(), - // Server Session Pool - sessionPool: new ServerSessionPool(this), - // Active client sessions - sessions: new Set(), credentials: options?.credentials, clusterTime: undefined, @@ -443,13 +430,13 @@ export class Topology extends TypedEventEmitter { } } + const exitWithError = (error: Error) => + callback ? callback(error) : this.emit(Topology.ERROR, error); + const readPreference = options.readPreference ?? ReadPreference.primary; this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { if (err) { - this.close(); - - typeof callback === 'function' ? callback(err) : this.emit(Topology.ERROR, err); - return; + return this.close({ force: false }, () => exitWithError(err)); } // TODO: NODE-2471 @@ -457,15 +444,14 @@ export class Topology extends TypedEventEmitter { if (!skipPingOnConnect && server && this.s.credentials) { server.command(ns('admin.$cmd'), { ping: 1 }, {}, err => { if (err) { - typeof callback === 'function' ? callback(err) : this.emit(Topology.ERROR, err); - return; + return exitWithError(err); } stateTransition(this, STATE_CONNECTED); this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); - if (typeof callback === 'function') callback(undefined, this); + callback?.(undefined, this); }); return; @@ -475,7 +461,7 @@ export class Topology extends TypedEventEmitter { this.emit(Topology.OPEN, this); this.emit(Topology.CONNECT, this); - if (typeof callback === 'function') callback(undefined, this); + callback?.(undefined, this); }); } @@ -489,52 +475,38 @@ export class Topology extends TypedEventEmitter { if (typeof options === 'boolean') { options = { force: options }; } - options = options ?? {}; - if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) { - if (typeof callback === 'function') { - callback(); - } - return; + if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) { + return callback?.(); } - stateTransition(this, STATE_CLOSING); + const destroyedServers = Array.from(this.s.servers.values(), server => { + return promisify(destroyServer)(server, this, options); + }); - drainWaitQueue(this[kWaitQueue], new MongoTopologyClosedError()); - drainTimerQueue(this.s.connectionTimers); + Promise.all(destroyedServers) + .then(() => { + this.s.servers.clear(); - if (this.s.srvPoller) { - this.s.srvPoller.stop(); - this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords); - } + stateTransition(this, STATE_CLOSING); - this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology); - - eachAsync( - Array.from(this.s.sessions.values()), - (session, cb) => session.endSession(cb), - () => { - this.s.sessionPool.endAllPooledSessions(() => { - eachAsync( - Array.from(this.s.servers.values()), - (server, cb) => destroyServer(server, this, options, cb), - err => { - this.s.servers.clear(); - - // emit an event for close - this.emit(Topology.TOPOLOGY_CLOSED, new TopologyClosedEvent(this.s.id)); - - stateTransition(this, STATE_CLOSED); - - if (typeof callback === 'function') { - callback(err); - } - } - ); - }); - } - ); + drainWaitQueue(this[kWaitQueue], new MongoTopologyClosedError()); + drainTimerQueue(this.s.connectionTimers); + + if (this.s.srvPoller) { + this.s.srvPoller.stop(); + this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords); + } + + this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology); + + stateTransition(this, STATE_CLOSED); + + // emit an event for close + this.emit(Topology.TOPOLOGY_CLOSED, new TopologyClosedEvent(this.s.id)); + }) + .finally(() => callback?.()); } /** @@ -628,44 +600,6 @@ export class Topology extends TypedEventEmitter { return this.loadBalanced || this.description.logicalSessionTimeoutMinutes != null; } - /** Start a logical session */ - startSession(options: ClientSessionOptions, clientOptions?: MongoOptions): ClientSession { - const session = new ClientSession(this.client, this.s.sessionPool, options, clientOptions); - session.once('ended', () => { - this.s.sessions.delete(session); - }); - - this.s.sessions.add(session); - return session; - } - - /** Send endSessions command(s) with the given session ids */ - endSessions(sessions: ServerSessionId[], callback?: Callback): void { - if (!Array.isArray(sessions)) { - sessions = [sessions]; - } - - this.selectServer( - readPreferenceServerSelector(ReadPreference.primaryPreferred), - {}, - (err, server) => { - if (err || !server) { - if (typeof callback === 'function') callback(err); - return; - } - - server.command( - ns('admin.$cmd'), - { endSessions: sessions }, - { noResponse: true }, - (err, result) => { - if (typeof callback === 'function') callback(err, result); - } - ); - } - ); - } - /** * Update the internal TopologyDescription with a ServerDescription * diff --git a/src/sessions.ts b/src/sessions.ts index 36f26132319..1419afa9f36 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -27,7 +27,6 @@ import { PromiseProvider } from './promise_provider'; import { ReadConcernLevel } from './read_concern'; import { ReadPreference } from './read_preference'; import { _advanceClusterTime, ClusterTime, TopologyType } from './sdam/common'; -import type { Topology } from './sdam/topology'; import { isTransactionCommand, Transaction, TransactionOptions, TxnState } from './transactions'; import { calculateDurationInMs, @@ -269,9 +268,10 @@ export class ClientSession extends TypedEventEmitter { if (serverSession != null) { // release the server session back to the pool this.sessionPool.release(serverSession); - // Make sure a new serverSession never makes it on to the ClientSession + // Make sure a new serverSession never makes it onto this ClientSession Object.defineProperty(this, kServerSession, { - value: ServerSession.clone(serverSession) + value: ServerSession.clone(serverSession), + writable: false }); } @@ -878,39 +878,18 @@ export class ServerSession { * @internal */ export class ServerSessionPool { - topology: Topology; + client: MongoClient; sessions: ServerSession[]; - constructor(topology: Topology) { - if (topology == null) { - throw new MongoRuntimeError('ServerSessionPool requires a topology'); + constructor(client: MongoClient) { + if (client == null) { + throw new MongoRuntimeError('ServerSessionPool requires a MongoClient'); } - this.topology = topology; + this.client = client; this.sessions = []; } - /** Ends all sessions in the session pool */ - endAllPooledSessions(callback?: Callback): void { - if (this.sessions.length) { - this.topology.endSessions( - this.sessions.map((session: ServerSession) => session.id), - () => { - this.sessions = []; - if (typeof callback === 'function') { - callback(); - } - } - ); - - return; - } - - if (typeof callback === 'function') { - callback(); - } - } - /** * Acquire a Server Session from the pool. * Iterates through each session in the pool, removing any stale sessions @@ -918,16 +897,29 @@ export class ServerSessionPool { * pool and returned. If no non-stale session is found, a new ServerSession is created. */ acquire(): ServerSession { - const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes || 10; + const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10; - while (this.sessions.length) { - const session = this.sessions.shift(); - if (session && (this.topology.loadBalanced || !session.hasTimedOut(sessionTimeoutMinutes))) { - return session; + let session: ServerSession | null = null; + + // Try to obtain from session pool + while (this.sessions.length > 0) { + const potentialSession = this.sessions.shift(); + if ( + potentialSession != null && + (!!this.client.topology?.loadBalanced || + !potentialSession.hasTimedOut(sessionTimeoutMinutes)) + ) { + session = potentialSession; + break; } } - return new ServerSession(); + // If nothing valid came from the pool make a new one + if (session == null) { + session = new ServerSession(); + } + + return session; } /** @@ -938,9 +930,9 @@ export class ServerSessionPool { * @param session - The session to release to the pool */ release(session: ServerSession): void { - const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes; + const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10; - if (this.topology.loadBalanced && !sessionTimeoutMinutes) { + if (this.client.topology?.loadBalanced && !sessionTimeoutMinutes) { this.sessions.unshift(session); } diff --git a/test/integration/causal-consistency/causal_consistency.prose.test.js b/test/integration/causal-consistency/causal_consistency.prose.test.js index 34465465d1f..cddc6bb5a98 100644 --- a/test/integration/causal-consistency/causal_consistency.prose.test.js +++ b/test/integration/causal-consistency/causal_consistency.prose.test.js @@ -55,9 +55,7 @@ describe('Causal Consistency - prose tests', function () { */ { metadata: { - requires: { topology: ['replicaset', 'sharded'] }, - // Skipping session leak tests b/c these are explicit sessions - sessions: { skipLeakTests: true } + requires: { topology: ['replicaset', 'sharded'] } }, test: function () { @@ -93,9 +91,7 @@ describe('Causal Consistency - prose tests', function () { it('case: successful read with causal consistency', { metadata: { - requires: { topology: ['replicaset', 'sharded'] }, - // Skipping session leak tests b/c these are explicit sessions - sessions: { skipLeakTests: true } + requires: { topology: ['replicaset', 'sharded'] } }, test: function () { @@ -131,9 +127,7 @@ describe('Causal Consistency - prose tests', function () { () => { it('case: second operation is findOne', { metadata: { - requires: { topology: ['replicaset', 'sharded'] }, - // Skipping session leak tests b/c these are explicit sessions - sessions: { skipLeakTests: true } + requires: { topology: ['replicaset', 'sharded'] } }, test: function () { @@ -176,9 +170,7 @@ describe('Causal Consistency - prose tests', function () { () => { it('case: successful insert', { metadata: { - requires: { topology: ['replicaset', 'sharded'] }, - // Skipping session leak tests b/c these are explicit sessions - sessions: { skipLeakTests: true } + requires: { topology: ['replicaset', 'sharded'] } }, test: function () { @@ -216,9 +208,7 @@ describe('Causal Consistency - prose tests', function () { */ { metadata: { - requires: { topology: ['replicaset', 'sharded'] }, - // Skipping session leak tests b/c these are explicit sessions - sessions: { skipLeakTests: true } + requires: { topology: ['replicaset', 'sharded'] } }, test: function () { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 7e588843c17..2e6e96938f6 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -39,7 +39,7 @@ const pipeline = [ { $addFields: { comment: 'The documentKey field has been projected out of this document.' } } ]; -describe('Change Streams', { sessions: { skipLeakTests: true } }, function () { +describe('Change Streams', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; @@ -760,6 +760,8 @@ describe('Change Streams', { sessions: { skipLeakTests: true } }, function () { const [data] = await willBeData; const parsedEvent = JSON.parse(data); expect(parsedEvent).to.have.nested.property('fullDocument.a', 1); + + outStream.destroy(); } }); @@ -873,23 +875,12 @@ describe('Change Streams', { sessions: { skipLeakTests: true } }, function () { }); }); - afterEach(function () { - return Promise.resolve() - .then(() => { - if (changeStream && !changeStream.closed) { - return changeStream.close(); - } - }) - .then(() => { - if (client) { - return client.close(); - } - }) - .then(() => { - coll = undefined; - changeStream = undefined; - client = undefined; - }); + afterEach(async function () { + await changeStream?.close(); + await client?.close(); + coll = undefined; + changeStream = undefined; + client = undefined; }); it('when invoked with promises', { @@ -1147,8 +1138,15 @@ describe('Change Streams', { sessions: { skipLeakTests: true } }, function () { describe('NODE-2626 - handle null changes without error', function () { let mockServer; - afterEach(() => mock.cleanup()); - beforeEach(() => mock.createServer().then(server => (mockServer = server))); + + beforeEach(async () => { + mockServer = await mock.createServer(); + }); + + afterEach(async () => { + await mock.cleanup(); + }); + it('changeStream should close if cursor id for initial aggregate is Long.ZERO', function (done) { mockServer.setMessageHandler(req => { const doc = req.document; diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.js b/test/integration/connection-monitoring-and-pooling/connection.test.js index 81653e06cfc..90760f15582 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.js +++ b/test/integration/connection-monitoring-and-pooling/connection.test.js @@ -1,6 +1,6 @@ 'use strict'; -const { ServerHeartbeatStartedEvent, MongoClient } = require('../../../src'); +const { ServerHeartbeatStartedEvent, MongoClient, MongoServerError } = require('../../../src'); const { Connection } = require('../../../src/cmap/connection'); const { connect } = require('../../../src/cmap/connect'); const { expect } = require('chai'); @@ -162,24 +162,8 @@ describe('Connection', function () { let testClient; afterEach(async () => { - let savedError; - if (client) { - try { - await client.close(); - } catch (err) { - savedError = err; - } - } - if (testClient) { - try { - await testClient.close(); - } catch (err) { - savedError = err; - } - } - if (savedError) { - throw savedError; - } + if (client) await client.close(); + if (testClient) await testClient.close(); }); it('should correctly start monitoring for single server connection', { @@ -394,23 +378,16 @@ describe('Connection', function () { } }); - it('test connect bad auth', { - metadata: { requires: { topology: 'single' } }, - - test: function (done) { - var configuration = this.configuration; - client = configuration.newClient({ - auth: { - username: 'slithy', - password: 'toves' - } - }); - client.connect(function (err, client) { - expect(err).to.exist; - expect(client).to.not.exist; - done(); - }); - } + it('test connect bad auth', async function () { + client = this.configuration.newClient({ + auth: { + username: 'slithy', + password: 'toves' + } + }); + const error = await client.connect().catch(error => error); + expect(error).to.be.instanceOf(MongoServerError); + await client.close(); }); it('test connect bad url', { diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index 9e7b09058f1..8f3e9b43376 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -6,7 +6,7 @@ const { withClient, withMonitoredClient } = require('../shared'); -const { runLater } = require('../../tools/utils'); +const { runLater, getSymbolFrom } = require('../../tools/utils'); const fs = require('fs'); const os = require('os'); const path = require('path'); @@ -1977,11 +1977,7 @@ describe('Cursor', function () { }); it('should close dead tailable cursors', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] }, - sessions: { skipLeakTests: true }, os: '!win32' // NODE-2943: timeout on windows }, @@ -3795,89 +3791,58 @@ describe('Cursor', function () { } }); - it( - 'should return implicit session to pool when client-side cursor exhausts results on initial query', - { - metadata: { - requires: { - topology: ['single', 'replicaset', 'sharded'], - mongodb: '>=3.6.0' - } - }, - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 }); - - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); + it('should return implicit session to pool when client-side cursor exhausts results on initial query', async function () { + const configuration = this.configuration; + const client = configuration.newClient(); - const db = client.db(configuration.db); - const collection = db.collection('cursor_session_tests'); + await client.connect(); + const db = client.db(configuration.db); + const collection = db.collection('cursor_session_tests'); - collection.insertMany([{ a: 1, b: 2 }], err => { - expect(err).to.not.exist; - const cursor = collection.find({}); + await collection.insertMany([{ a: 1, b: 2 }]); + const cursor = collection.find({}); - cursor.next(function () { - test.equal(client.topology.s.sessions.size, 0); - done(); - }); - }); - }); - } - } - ); - - it( - 'should return implicit session to pool when client-side cursor exhausts results after a getMore', - { - metadata: { - requires: { - topology: ['single', 'replicaset', 'sharded'], - mongodb: '>=3.6.0' - } - }, - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 }); + await cursor.next(); // implicit close, cursor is exhausted + expect(client.s.activeSessions.size).to.equal(0); + await cursor.close(); + await client.close(); + }); - client.connect((err, client) => { - expect(err).to.not.exist; - this.defer(() => client.close()); + it('should return implicit session to pool when client-side cursor exhausts results after a getMore', function (done) { + const configuration = this.configuration; + const client = configuration.newClient({ w: 1 }, { maxPoolSize: 1 }); - const db = client.db(configuration.db); - const collection = db.collection('cursor_session_tests2'); + const db = client.db(configuration.db); + const collection = db.collection('cursor_session_tests2'); - const docs = [ - { a: 1, b: 2 }, - { a: 3, b: 4 }, - { a: 5, b: 6 }, - { a: 7, b: 8 }, - { a: 9, b: 10 } - ]; + const docs = [ + { a: 1, b: 2 }, + { a: 3, b: 4 }, + { a: 5, b: 6 }, + { a: 7, b: 8 }, + { a: 9, b: 10 } + ]; - collection.insertMany(docs, err => { - expect(err).to.not.exist; - const cursor = collection.find({}, { batchSize: 3 }); + collection.insertMany(docs, err => { + expect(err).to.not.exist; + const cursor = collection.find({}, { batchSize: 3 }); + cursor.next(function () { + expect(client.s.activeSessions.size).to.equal(1); + cursor.next(function () { + expect(client.s.activeSessions.size).to.equal(1); + cursor.next(function () { + expect(client.s.activeSessions.size).to.equal(1); cursor.next(function () { - test.equal(client.topology.s.sessions.size, 1); - cursor.next(function () { - test.equal(client.topology.s.sessions.size, 1); - cursor.next(function () { - test.equal(client.topology.s.sessions.size, 1); - cursor.next(function () { - test.equal(client.topology.s.sessions.size, 0); - done(); - }); - }); + expect(client.s.activeSessions.size).to.equal(0); + cursor.close(() => { + client.close(done); }); }); }); }); - } - } - ); + }); + }); + }); describe('#clone', function () { let client; @@ -3908,7 +3873,8 @@ describe('Cursor', function () { expect(doc).to.exist; const clonedCursor = cursor.clone(); expect(clonedCursor.cursorOptions.session).to.not.exist; - expect(clonedCursor.session).to.not.exist; + const kServerSession = getSymbolFrom(clonedCursor.session, 'serverSession'); + expect(clonedCursor.session).to.have.property(kServerSession, null); // session is brand new and has not been used }) .finally(() => { return cursor.close(); @@ -3928,7 +3894,8 @@ describe('Cursor', function () { expect(doc).to.exist; const clonedCursor = cursor.clone(); expect(clonedCursor.cursorOptions.session).to.not.exist; - expect(clonedCursor.session).to.not.exist; + const kServerSession = getSymbolFrom(clonedCursor.session, 'serverSession'); + expect(clonedCursor.session).to.have.property(kServerSession, null); // session is brand new and has not been used }) .finally(() => { return cursor.close(); diff --git a/test/integration/gridfs/gridfs_stream.test.js b/test/integration/gridfs/gridfs_stream.test.js index c57042fd564..9d6365e9516 100644 --- a/test/integration/gridfs/gridfs_stream.test.js +++ b/test/integration/gridfs/gridfs_stream.test.js @@ -599,117 +599,97 @@ describe('GridFS Stream', function () { * @example-class GridFSBucket * @example-method delete */ - it('Deleting a file using promises', { - metadata: { - requires: { topology: ['single'], sessions: { skipLeakTests: true } } - }, + it('Deleting a file using promises', function (done) { + const configuration = this.configuration; + const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); + client.connect(function (err, client) { + const db = client.db(configuration.db); + const bucket = new GridFSBucket(db, { bucketName: 'gridfsdownload' }); + const CHUNKS_COLL = 'gridfsdownload.chunks'; + const FILES_COLL = 'gridfsdownload.files'; + const readStream = fs.createReadStream('./LICENSE.md'); - test(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - const db = client.db(configuration.db); - const bucket = new GridFSBucket(db, { bucketName: 'gridfsdownload' }); - const CHUNKS_COLL = 'gridfsdownload.chunks'; - const FILES_COLL = 'gridfsdownload.files'; - const readStream = fs.createReadStream('./LICENSE.md'); + const uploadStream = bucket.openUploadStream('test.dat'); + const id = uploadStream.id; - const uploadStream = bucket.openUploadStream('test.dat'); - const id = uploadStream.id; + uploadStream.once('finish', function () { + bucket.delete(id).then(function () { + const chunksQuery = db.collection(CHUNKS_COLL).find({ files_id: id }); + chunksQuery.toArray(function (error, docs) { + expect(error).to.not.exist; + expect(docs.length).to.equal(0); - uploadStream.once('finish', function () { - bucket.delete(id).then(function () { - const chunksQuery = db.collection(CHUNKS_COLL).find({ files_id: id }); - chunksQuery.toArray(function (error, docs) { + const filesQuery = db.collection(FILES_COLL).find({ _id: id }); + filesQuery.toArray(function (error, docs) { expect(error).to.not.exist; expect(docs.length).to.equal(0); - const filesQuery = db.collection(FILES_COLL).find({ _id: id }); - filesQuery.toArray(function (error, docs) { - expect(error).to.not.exist; - expect(docs.length).to.equal(0); - - client.close(done); - }); + client.close(done); }); }); }); - - readStream.pipe(uploadStream); }); - } - }); - - it('find()', { - metadata: { requires: { topology: ['single'], sessions: { skipLeakTests: true } } }, - - test(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - const db = client.db(configuration.db); - const bucket = new GridFSBucket(db, { bucketName: 'fs' }); - // We're only making sure this doesn't throw - bucket.find({ - batchSize: 1, - limit: 2, - maxTimeMS: 3, - noCursorTimeout: true, - skip: 4, - sort: { _id: 1 } - }); + readStream.pipe(uploadStream); + }); + }); - client.close(done); + it('find()', function (done) { + const configuration = this.configuration; + const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); + client.connect(function (err, client) { + const db = client.db(configuration.db); + const bucket = new GridFSBucket(db, { bucketName: 'fs' }); + + // We're only making sure this doesn't throw + bucket.find({ + batchSize: 1, + limit: 2, + maxTimeMS: 3, + noCursorTimeout: true, + skip: 4, + sort: { _id: 1 } }); - } + + client.close(done); + }); }); - /** - * Drop an entire buckets files and chunks - * - * @example-class GridFSBucket - * @example-method drop - */ - it('drop example', { - metadata: { requires: { topology: ['single'], sessions: { skipLeakTests: true } } }, + it('drop example', function (done) { + const configuration = this.configuration; + const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); + client.connect(function (err, client) { + const db = client.db(configuration.db); + const bucket = new GridFSBucket(db, { bucketName: 'gridfsdownload' }); + const CHUNKS_COLL = 'gridfsdownload.chunks'; + const FILES_COLL = 'gridfsdownload.files'; + const readStream = fs.createReadStream('./LICENSE.md'); - test(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - const db = client.db(configuration.db); - const bucket = new GridFSBucket(db, { bucketName: 'gridfsdownload' }); - const CHUNKS_COLL = 'gridfsdownload.chunks'; - const FILES_COLL = 'gridfsdownload.files'; - const readStream = fs.createReadStream('./LICENSE.md'); + const uploadStream = bucket.openUploadStream('test.dat'); + const id = uploadStream.id; - const uploadStream = bucket.openUploadStream('test.dat'); - const id = uploadStream.id; + uploadStream.once('finish', function () { + bucket.drop(function (err) { + expect(err).to.not.exist; - uploadStream.once('finish', function () { - bucket.drop(function (err) { - expect(err).to.not.exist; + const chunksQuery = db.collection(CHUNKS_COLL).find({ files_id: id }); + chunksQuery.toArray(function (error, docs) { + expect(error).to.not.exist; + expect(docs.length).to.equal(0); - const chunksQuery = db.collection(CHUNKS_COLL).find({ files_id: id }); - chunksQuery.toArray(function (error, docs) { + const filesQuery = db.collection(FILES_COLL).find({ _id: id }); + filesQuery.toArray(function (error, docs) { expect(error).to.not.exist; expect(docs.length).to.equal(0); - const filesQuery = db.collection(FILES_COLL).find({ _id: id }); - filesQuery.toArray(function (error, docs) { - expect(error).to.not.exist; - expect(docs.length).to.equal(0); - - client.close(done); - }); + client.close(done); }); }); }); - - readStream.pipe(uploadStream); }); - } + + readStream.pipe(uploadStream); + }); }); /** @@ -1089,8 +1069,7 @@ describe('GridFS Stream', function () { expect(err).to.not.exist; const names = indexes.map(i => i.name); expect(names).to.eql(['_id_', 'filename_1_uploadDate_1']); - client.close(); - done(); + client.close(done); }); }); }); diff --git a/test/integration/load-balancers/load_balancers.spec.test.js b/test/integration/load-balancers/load_balancers.spec.test.js index 1446a5e42c4..ae8abc298dd 100644 --- a/test/integration/load-balancers/load_balancers.spec.test.js +++ b/test/integration/load-balancers/load_balancers.spec.test.js @@ -61,6 +61,5 @@ const SKIP = [ ]; describe('Load Balancer Unified Tests', function () { - this.timeout(10000); runUnifiedSuite(loadSpecTests(path.join('load-balancers')), SKIP); }); diff --git a/test/integration/node-specific/db.test.js b/test/integration/node-specific/db.test.js index 66e51e7bc31..40c709b44d0 100644 --- a/test/integration/node-specific/db.test.js +++ b/test/integration/node-specific/db.test.js @@ -93,30 +93,6 @@ describe('Db', function () { } }); - it('shouldCorrectlyReconnectWhenError', { - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded'] } - }, - - test: function (done) { - var configuration = this.configuration; - var client = configuration.newClient(`mongodb://127.0.0.1:27088/test`, { - maxPoolSize: 4, - serverSelectionTimeoutMS: 10 - }); - - // Establish connection to db - client.connect(function (err) { - test.ok(err != null); - - client.connect(function (err) { - test.ok(err != null); - client.close(done); - }); - }); - } - }); - it('should not cut collection name when it is the same as the database', { metadata: { requires: { topology: ['single', 'replicaset', 'sharded'] } diff --git a/test/integration/node-specific/examples/causal_consistency.test.js b/test/integration/node-specific/examples/causal_consistency.test.js index 4e369f29957..41e97323d87 100644 --- a/test/integration/node-specific/examples/causal_consistency.test.js +++ b/test/integration/node-specific/examples/causal_consistency.test.js @@ -28,21 +28,16 @@ describe('examples(causal-consistency):', function () { collection = undefined; }); - it('supports causal consistency', { - metadata: { - requires: { topology: ['single'], mongodb: '>=3.6.0' }, - sessions: { skipLeakTests: true } - }, + it('supports causal consistency', async function () { + const session = client.startSession({ causalConsistency: true }); - test: async function () { - const session = client.startSession({ causalConsistency: true }); + collection.insertOne({ darmok: 'jalad' }, { session }); + collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session }); - collection.insertOne({ darmok: 'jalad' }, { session }); - collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session }); + const results = await collection.find({}, { session }).toArray(); - const results = await collection.find({}, { session }).toArray(); + expect(results).to.exist; - expect(results).to.exist; - } + await session.endSession(); }); }); diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 4a6659cbde8..f459ae36152 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -3,6 +3,9 @@ import { once } from 'events'; import * as sinon from 'sinon'; import { + CommandFailedEvent, + CommandStartedEvent, + CommandSucceededEvent, MongoClient, MongoNotConnectedError, MongoServerSelectionError, @@ -531,7 +534,7 @@ describe('class MongoClient', function () { }; beforeEach(function () { - client = this.configuration.newClient(); + client = this.configuration.newClient({ monitorCommands: true }); db = client.db(); }); @@ -627,5 +630,24 @@ describe('class MongoClient', function () { expect(result2).to.not.be.instanceOf(MongoNotConnectedError); expect(result2).to.have.property('ok', 1); }); + + it('sends endSessions with noResponse set', async () => { + const session = client.startSession(); // make a session to be ended + await client.db('test').command({ ping: 1 }, { session }); + await session.endSession(); + + const startedEvents: CommandStartedEvent[] = []; + const endEvents: Array = []; + client.on('commandStarted', event => startedEvents.push(event)); + client.on('commandSucceeded', event => endEvents.push(event)); + client.on('commandFailed', event => endEvents.push(event)); + + await client.close(); + + expect(startedEvents).to.have.lengthOf(1); + expect(startedEvents[0]).to.have.property('commandName', 'endSessions'); + expect(endEvents).to.have.lengthOf(1); + expect(endEvents[0]).to.have.property('reply', undefined); // noReponse: true + }); }); }); diff --git a/test/integration/node-specific/operation_example.test.js b/test/integration/node-specific/operation_example.test.js index 9eb2fc5c49f..8709ae2de05 100644 --- a/test/integration/node-specific/operation_example.test.js +++ b/test/integration/node-specific/operation_example.test.js @@ -2,7 +2,6 @@ const { assert: test, setupDatabase } = require('../shared'); const { setTimeout } = require('timers'); const { format: f } = require('util'); -const { Topology } = require('../../../src/sdam/topology'); const { Code, ObjectId, ReturnDocument } = require('../../../src'); const chai = require('chai'); @@ -4034,41 +4033,6 @@ describe('Operation Examples', function () { } }); - /** - * Simple replicaset connection setup, requires a running replicaset on the correct ports - * - * @example-class Db - * @example-method open - */ - it('Should correctly connect with default replicasetNoOption', { - metadata: { requires: { topology: 'replicaset' } }, - - test: function (done) { - var configuration = this.configuration; - - // Replica configuration - var client = new Topology(configuration.options.hostAddresses, { - replicaSet: configuration.replicasetName - }); - - client.connect(function (err, client) { - expect(err).to.not.exist; - // LINE var MongoClient = require('mongodb').MongoClient, - // LINE test = require('assert'); - // LINE const client = new MongoClient('mongodb://localhost:27017/test'); - // LINE client.connect(function(err, client) { - // LINE var db = client.db('test); - // REPLACE configuration.writeConcernMax() WITH {w:1} - // REMOVE-LINE restartAndDone - // REMOVE-LINE done(); - // REMOVE-LINE var db = client.db(configuration.db); - // BEGIN - client.close(done); - }); - // END - } - }); - /************************************************************************** * * ADMIN TESTS @@ -4732,13 +4696,15 @@ describe('Operation Examples', function () { var cursor = collection.find({}); // Fetch the first object off the cursor cursor.next(function (err, item) { - test.equal(0, item.a); + expect(err).to.not.exist; + expect(item).to.have.property('a', 0); // Rewind the cursor, resetting it to point to the start of the query cursor.rewind(); // Grab the first object again cursor.next(function (err, item) { - test.equal(0, item.a); + expect(err).to.not.exist; + expect(item).to.have.property('a', 0); client.close(done); }); diff --git a/test/integration/sessions/sessions.test.ts b/test/integration/sessions/sessions.test.ts index 96cc6703fdc..482fda07935 100644 --- a/test/integration/sessions/sessions.test.ts +++ b/test/integration/sessions/sessions.test.ts @@ -1,16 +1,27 @@ import { expect } from 'chai'; - -import type { MongoClient } from '../../../src'; +import * as sinon from 'sinon'; + +import type { + Collection, + CommandStartedEvent, + CommandSucceededEvent, + MongoClient +} from '../../../src'; import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; -import { MongoServerError } from '../../../src/error'; +import { MongoCompatibilityError, MongoServerError } from '../../../src/error'; +import type { TestConfiguration } from '../../tools/runner/config'; import { setupDatabase, withMonitoredClient } from '../shared'; const ignoredCommands = [LEGACY_HELLO_COMMAND]; let hasInitialPingOccurred = false; -const test = { +const test: { + client: MongoClient; + commands: { started: CommandStartedEvent[]; succeeded: CommandSucceededEvent[] }; + setup: (config: TestConfiguration) => Promise; +} = { client: null, commands: { started: [], succeeded: [] }, - setup: function (config) { + async setup(config) { this.commands = { started: [], succeeded: [] }; this.client = config.newClient({ w: 1 }, { maxPoolSize: 1, monitorCommands: true }); @@ -35,41 +46,34 @@ const test = { } }); - return this.client.connect(); + await this.client.connect(); } }; -describe('Sessions Spec', function () { +describe('Sessions', function () { describe('Sessions - functional - old format', function () { before(function () { return setupDatabase(this.configuration); }); describe('endSessions', function () { - beforeEach(function () { - return test.setup(this.configuration); + beforeEach(async function () { + await test.setup(this.configuration); }); - it('should send endSessions for multiple sessions', { - metadata: { - requires: { topology: ['single'], mongodb: '>=3.6.0' }, - // Skipping session leak tests b/c these are explicit sessions - sessions: { skipLeakTests: true } - }, - test: function (done) { - const client = test.client; - const sessions = [client.startSession(), client.startSession()].map(s => s.id); - - client.close(err => { - expect(err).to.not.exist; - expect(test.commands.started).to.have.length(1); - expect(test.commands.started[0].commandName).to.equal('endSessions'); - expect(test.commands.started[0].command.endSessions).to.include.deep.members(sessions); - expect(client.s.sessions.size).to.equal(0); - - done(); - }); - } + it('should send endSessions for multiple sessions', function (done) { + const client = test.client; + const sessions = [client.startSession(), client.startSession()].map(s => s.id); + + client.close(err => { + expect(err).to.not.exist; + expect(test.commands.started).to.have.length(1); + expect(test.commands.started[0].commandName).to.equal('endSessions'); + expect(test.commands.started[0].command.endSessions).to.include.deep.members(sessions); + expect(client.s.activeSessions.size).to.equal(0); + + done(); + }); }); }); @@ -147,13 +151,13 @@ describe('Sessions Spec', function () { if (shouldReject) { expect.fail('this should have rejected'); } - expect(client.topology.s.sessionPool.sessions).to.have.length(1); + expect(client.s.sessionPool.sessions).to.have.length(1); }, () => { if (shouldResolve) { expect.fail('this should have resolved'); } - expect(client.topology.s.sessionPool.sessions).to.have.length(1); + expect(client.s.sessionPool.sessions).to.have.length(1); } ) .then(() => { @@ -175,7 +179,7 @@ describe('Sessions Spec', function () { await client.db('test').collection('foo').find({}, { session }).toArray(); }); - expect(client.topology.s.sessionPool.sessions).to.have.length(1); + expect(client.s.sessionPool.sessions).to.have.length(1); expect(sessionWasEnded).to.be.true; }); }); @@ -215,7 +219,9 @@ describe('Sessions Spec', function () { expect(err.message).to.equal( 'Cannot have explicit session with unacknowledged writes' ); - client.close(done); + session.endSession(() => { + client.close(done); + }); }); } ) @@ -400,4 +406,90 @@ describe('Sessions Spec', function () { expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1); }); }); + + describe('session support detection', () => { + let client: MongoClient; + let collection: Collection<{ a: number }>; + + beforeEach(async function () { + client = this.configuration.newClient({ monitorCommands: true }); + await client.connect(); + collection = client.db('test').collection('session.support.detection'); + await collection.drop().catch(() => null); + + // Never run a server selection for support since we're overriding it + sinon.stub(client.topology, 'shouldCheckForSessionSupport').callsFake(() => false); + }); + + afterEach(async function () { + await client.close(); + sinon.restore(); + }); + + context('when hasSessionSupport is false', () => { + beforeEach(() => sinon.stub(client.topology, 'hasSessionSupport').callsFake(() => false)); + + it('should not send session', async () => { + const events: CommandStartedEvent[] = []; + client.on('commandStarted', event => events.push(event)); + + await collection.insertMany([{ a: 1 }, { a: 1 }]); + const cursor = collection.find({ a: 1 }, { batchSize: 1, projection: { _id: 0 } }); + + const docs = [ + await cursor.next(), // find + await cursor.next() // getMore + ]; + + await cursor.close(); + + expect(docs).to.deep.equal([{ a: 1 }, { a: 1 }]); + expect(events.map(({ commandName }) => commandName)).to.deep.equal([ + 'insert', + 'find', + 'getMore', + 'killCursors' + ]); + for (const event of events) { + expect(event.command).to.not.have.property('lsid'); + } + }); + + it('should fail for an explicit session', async () => { + const session = client.startSession(); + + const error = await collection + .insertMany([{ a: 1 }, { a: 1 }], { session }) + .catch(error => error); + + expect(error).to.be.instanceOf(MongoCompatibilityError); + + await session.endSession(); + }); + }); + + context('when hasSessionSupport is true', () => { + beforeEach(() => sinon.stub(client.topology, 'hasSessionSupport').callsFake(() => true)); + + it('should send session', async () => { + const events: CommandStartedEvent[] = []; + client.on('commandStarted', event => events.push(event)); + + await collection.insertMany([{ a: 1 }, { a: 1 }]); + const cursor = collection.find({ a: 1 }, { batchSize: 1, projection: { _id: 0 } }); + + const docs = [await cursor.next(), await cursor.next()]; + + expect(docs).to.deep.equal([{ a: 1 }, { a: 1 }]); + expect(events.map(({ commandName }) => commandName)).to.deep.equal([ + 'insert', + 'find', + 'getMore' + ]); + for (const event of events) { + expect(event.command).to.have.property('lsid'); + } + }); + }); + }); }); diff --git a/test/integration/shared.js b/test/integration/shared.js index 4e6d741260a..9a0b446f082 100644 --- a/test/integration/shared.js +++ b/test/integration/shared.js @@ -142,15 +142,7 @@ function withClient(client, callback) { if (!client) { client = this.configuration.newClient(connectionString); } - return client - .connect() - .then(callback) - .then(err => { - cleanup(); - if (err) { - throw err; - } - }, cleanup); + return client.connect().then(callback).finally(cleanup); } if (this && this.configuration) { @@ -277,20 +269,28 @@ class APMEventCollector { } } -// simplified `withClient` helper that only uses callbacks +/** + * @param {(client: MongoClient, done: Mocha.Done) => void} callback + */ function withClientV2(callback) { - return function testFunction(done) { + return async function testFunction() { const client = this.configuration.newClient({ monitorCommands: true }); - client.connect(err => { - if (err) return done(err); - this.defer(() => client.close()); - try { - callback.call(this, client, done); - } catch (err) { - done(err); - } - }); + try { + await client.connect(); + await new Promise((resolve, reject) => { + try { + callback.call(this, client, error => { + if (error) return reject(error); + resolve(); + }); + } catch (err) { + return reject(err); + } + }); + } finally { + await client.close(); + } }; } diff --git a/test/integration/shared.test.js b/test/integration/shared.test.js deleted file mode 100644 index f41267f93ef..00000000000 --- a/test/integration/shared.test.js +++ /dev/null @@ -1,100 +0,0 @@ -'use strict'; -const { withMonitoredClient } = require('./shared'); -const { expect } = require('chai'); - -describe('shared test utilities', function () { - context('withMonitoredClient', function () { - it('should throw if arrow function', function () { - expect(() => { - withMonitoredClient(['find'], () => {}); - }).to.throw(); - }); - - it('should not throw if function', function () { - expect(() => { - function example() {} - withMonitoredClient(['find'], example); - }).to.not.throw(); - }); - - it('should call done and close connection with callback', function (done) { - var e = []; - const fakeDone = () => { - expect(e.length).to.equal(1); - done(); - }; - const encapsulatedTest = withMonitoredClient(['find'], function (client, events, innerDone) { - e = events; - client - .db('integration_test') - .collection('test') - .find({}) - .toArray(() => { - return innerDone(); - }); - }).bind(this); - encapsulatedTest().then(fakeDone); - }); - - it('should propagate passed error to done', function (done) { - var e = []; - const fakeDone = err => { - expect(err).to.be.instanceOf(Error); - expect(e.length).to.equal(1); - done(); - }; - const encapsulatedTest = withMonitoredClient(['find'], function (client, events, innerDone) { - e = events; - client - .db('integration_test') - .collection('test') - .find({}) - .toArray(() => { - return innerDone(new Error('hello world')); - }); - }).bind(this); - encapsulatedTest().catch(fakeDone); - }); - - it('should call done and close connection with promise', function (done) { - var e = []; - const fakeDone = () => { - expect(e.length).to.equal(1); - done(); - }; - const encapsulatedTest = withMonitoredClient(['find'], function (client, events, innerDone) { - e = events; - client - .db('integration_test') - .collection('test') - .find({}) - .toArray() - .then(() => { - return innerDone(); - }); - }).bind(this); - encapsulatedTest().then(fakeDone); - }); - - it('should propagate passed error to done from promise', function (done) { - var e = []; - const fakeDone = err => { - expect(err).to.be.instanceOf(Error); - expect(e.length).to.equal(1); - done(); - }; - const encapsulatedTest = withMonitoredClient(['find'], function (client, events, innerDone) { - e = events; - client - .db('integration_test') - .collection('test') - .find({}) - .toArray() - .then(() => { - return innerDone(new Error('hello world')); - }); - }).bind(this); - encapsulatedTest().catch(fakeDone); - }); - }); -}); diff --git a/test/integration/transactions/transactions.test.ts b/test/integration/transactions/transactions.test.ts index 55d191b8696..11e472b6a9e 100644 --- a/test/integration/transactions/transactions.test.ts +++ b/test/integration/transactions/transactions.test.ts @@ -12,13 +12,12 @@ describe('Transactions', function () { beforeEach(async function () { client = this.configuration.newClient(); - const topology = (await client.connect()).topology; - sessionPool = topology.s.sessionPool; - session = new ClientSession(topology, sessionPool, {}); + await client.connect(); + sessionPool = client.s.sessionPool; + session = new ClientSession(client, sessionPool, {}); }); afterEach(async () => { - sessionPool.endAllPooledSessions(); await client.close(); }); diff --git a/test/tools/common.js b/test/tools/common.js index c6e7bbe382e..36f93e808d0 100644 --- a/test/tools/common.js +++ b/test/tools/common.js @@ -126,29 +126,7 @@ function genClusterTime(time) { }; } -function sessionCleanupHandler(session, sessionPool, done) { - return err => { - if (session == null) { - sessionPool.endAllPooledSessions(); - done(); - return; - } - - if (session.hasEnded) { - sessionPool.endAllPooledSessions(); - done(err); - return; - } - - session.endSession(() => { - sessionPool.endAllPooledSessions(); - done(err); - }); - }; -} - module.exports = { ReplSetFixture: ReplSetFixture, - genClusterTime: genClusterTime, - sessionCleanupHandler + genClusterTime: genClusterTime }; diff --git a/test/tools/runner/hooks/configuration.js b/test/tools/runner/hooks/configuration.js index 107068ae7c5..38b5c6d0881 100644 --- a/test/tools/runner/hooks/configuration.js +++ b/test/tools/runner/hooks/configuration.js @@ -127,6 +127,7 @@ const testConfigBeforeHook = async function () { version: this.configuration.buildInfo.version, node: process.version, os: process.platform, + pid: process.pid, serverless: process.env.SERVERLESS === '1', auth: process.env.AUTH === 'auth', tls: process.env.SSL === 'ssl', diff --git a/test/tools/unified-spec-runner/runner.ts b/test/tools/unified-spec-runner/runner.ts index c73913c258f..e0030106e51 100644 --- a/test/tools/unified-spec-runner/runner.ts +++ b/test/tools/unified-spec-runner/runner.ts @@ -240,11 +240,8 @@ export function runUnifiedSuite(specTests: uni.UnifiedSuite[], testsToSkip?: str for (const unifiedSuite of specTests) { context(String(unifiedSuite.description), function () { for (const test of unifiedSuite.tests) { - it(String(test.description), { - metadata: { sessions: { skipLeakTests: true } }, - test: async function () { - await runUnifiedTest(this, unifiedSuite, test, testsToSkip); - } + it(String(test.description), async function () { + await runUnifiedTest(this, unifiedSuite, test, testsToSkip); }); } }); diff --git a/test/unit/collection.test.ts b/test/unit/collection.test.ts index 44527631312..c92fbbbfa36 100644 --- a/test/unit/collection.test.ts +++ b/test/unit/collection.test.ts @@ -6,8 +6,12 @@ import { cleanup, createServer, HELLO } from '../tools/mongodb-mock'; describe('Collection', function () { let server = null; - beforeEach(() => createServer().then(_server => (server = _server))); - afterEach(() => cleanup()); + beforeEach(async () => { + server = await createServer(); + }); + afterEach(async () => { + await cleanup(); + }); context('#createIndex', () => { it('should error when createIndex fails', function (done) { diff --git a/test/unit/cursor/abstract_cursor.test.ts b/test/unit/cursor/abstract_cursor.test.ts new file mode 100644 index 00000000000..af32ef4c523 --- /dev/null +++ b/test/unit/cursor/abstract_cursor.test.ts @@ -0,0 +1,40 @@ +import { expect } from 'chai'; + +import { Callback, ExecutionResult, MongoClient, Server } from '../../../src'; +import { AbstractCursor, AbstractCursorOptions } from '../../../src/cursor/abstract_cursor'; +import { ClientSession } from '../../../src/sessions'; +import { ns } from '../../../src/utils'; + +/** Minimal do nothing cursor to focus on testing the base cusor behavior */ +class ConcreteCursor extends AbstractCursor { + constructor(client: MongoClient, options: AbstractCursorOptions = {}) { + super(client, ns('test.test'), options); + } + clone(): ConcreteCursor { + return new ConcreteCursor(new MongoClient('mongodb://iLoveJavascript')); + } + _initialize(session: ClientSession, callback: Callback): void { + return callback(undefined, { server: {} as Server, session, response: { ok: 1 } }); + } +} + +describe('class AbstractCursor', () => { + let client: MongoClient; + beforeEach(async function () { + client = new MongoClient('mongodb://iLoveJavascript'); + }); + + context('#constructor', () => { + it('creates a session if none passed in', () => { + const cursor = new ConcreteCursor(client); + expect(cursor).to.have.property('session').that.is.instanceOf(ClientSession); + }); + + it('uses the passed in session', async () => { + const session = client.startSession(); + const cursor = new ConcreteCursor(client, { session }); + expect(cursor).to.have.property('session', session); + await session.endSession(); + }); + }); +}); diff --git a/test/unit/cursor/aggregation_cursor.test.js b/test/unit/cursor/aggregation_cursor.test.js deleted file mode 100644 index 226b7963c60..00000000000 --- a/test/unit/cursor/aggregation_cursor.test.js +++ /dev/null @@ -1,147 +0,0 @@ -'use strict'; - -const { expect } = require('chai'); -const mock = require('../../tools/mongodb-mock/index'); -const { Long } = require('bson'); -const { MongoDBNamespace, isHello } = require('../../../src/utils'); -const { AggregationCursor } = require('../../../src/cursor/aggregation_cursor'); -const { MongoClient } = require('../../../src/mongo_client'); -const { default: ConnectionString } = require('mongodb-connection-string-url'); - -const test = {}; -describe('Aggregation Cursor', function () { - describe('#next', function () { - afterEach(function () { - mock.cleanup(); - }); - beforeEach(async function () { - test.server = await mock.createServer(); - }); - - context('when there is a data bearing server', function () { - beforeEach(function () { - test.server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(mock.HELLO); - } else if (doc.aggregate) { - request.reply({ - cursor: { - id: Long.fromNumber(1), - ns: 'test.test', - firstBatch: [{ _id: 1, name: 'test' }] - }, - ok: 1 - }); - } - }); - }); - - it('sets the session on the cursor', function (done) { - const client = new MongoClient( - new ConnectionString(`mongodb://${test.server.hostAddress()}`).toString() - ); - const cursor = new AggregationCursor( - client, - MongoDBNamespace.fromString('test.test'), - [], - {} - ); - client.connect(function () { - cursor.next(function () { - expect(cursor.session).to.exist; - client.close(done); - }); - }); - }); - }); - - context('when there is no data bearing server', function () { - beforeEach(function () { - test.server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply({ errmsg: 'network error' }); - } else if (doc.aggregate) { - request.reply({ - cursor: { - id: Long.fromNumber(1), - ns: 'test.test', - firstBatch: [{ _id: 1, name: 'test' }] - }, - ok: 1 - }); - } - }); - }); - - it('does not set the session on the cursor', function (done) { - const client = new MongoClient( - new ConnectionString(`mongodb://${test.server.hostAddress()}`).toString(), - { - serverSelectionTimeoutMS: 1000 - } - ); - const cursor = new AggregationCursor( - client, - MongoDBNamespace.fromString('test.test'), - [], - {} - ); - client.connect(function () { - cursor.next(function () { - expect(cursor.session).to.not.exist; - client.close(done); - }); - }); - }); - }); - - context('when a data bearing server becomes available', function () { - beforeEach(function () { - // Set the count of times hello has been called. - let helloCalls = 0; - test.server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - // After the first hello call errors indicating no data bearing server is - // available, any subsequent hello call should succeed after server selection. - // This gives us a data bearing server available for the next call. - request.reply(helloCalls > 0 ? mock.HELLO : { errmsg: 'network error' }); - helloCalls++; - } else if (doc.aggregate) { - request.reply({ - cursor: { - id: Long.fromNumber(1), - ns: 'test.test', - firstBatch: [{ _id: 1, name: 'test' }] - }, - ok: 1 - }); - } - }); - }); - - it('sets the session on the cursor', function (done) { - const client = new MongoClient( - new ConnectionString(`mongodb://${test.server.hostAddress()}`).toString(), - { - serverSelectionTimeoutMS: 1000 - } - ); - const cursor = new AggregationCursor( - client, - MongoDBNamespace.fromString('test.test'), - [], - {} - ); - client.connect(function () { - cursor.next(function () { - expect(cursor.session).to.exist; - client.close(done); - }); - }); - }); - }); - }); -}); diff --git a/test/unit/cursor/find_cursor.test.js b/test/unit/cursor/find_cursor.test.js deleted file mode 100644 index bd8515424f8..00000000000 --- a/test/unit/cursor/find_cursor.test.js +++ /dev/null @@ -1,193 +0,0 @@ -'use strict'; - -const { expect } = require('chai'); -const mock = require('../../tools/mongodb-mock/index'); -const { Long } = require('bson'); -const { MongoDBNamespace, isHello } = require('../../../src/utils'); -const { FindCursor } = require('../../../src/cursor/find_cursor'); -const { MongoClient, MongoServerError } = require('../../../src'); -const { default: ConnectionString } = require('mongodb-connection-string-url'); - -const test = {}; -describe('Find Cursor', function () { - describe('#next', function () { - afterEach(function () { - mock.cleanup(); - }); - beforeEach(async function () { - test.server = await mock.createServer(); - }); - - context('when there is a data bearing server', function () { - beforeEach(function () { - test.server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply(mock.HELLO); - } else if (doc.find) { - request.reply({ - cursor: { - id: Long.fromNumber(1), - ns: 'test.test', - firstBatch: [{ _id: 1, name: 'test' }] - }, - ok: 1 - }); - } - }); - }); - - it('sets the session on the cursor', function (done) { - const client = new MongoClient( - new ConnectionString(`mongodb://${test.server.hostAddress()}`).toString(), - { - serverSelectionTimeoutMS: 1000 - } - ); - const cursor = new FindCursor(client, MongoDBNamespace.fromString('test.test'), {}, {}); - client.connect(function () { - cursor.next(function () { - expect(cursor.session).to.exist; - client.close(done); - }); - }); - }); - }); - - context('when there is no data bearing server', function () { - beforeEach(function () { - test.server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply({ errmsg: 'network error' }); - } else if (doc.find) { - request.reply({ - cursor: { - id: Long.fromNumber(1), - ns: 'test.test', - firstBatch: [{ _id: 1, name: 'test' }] - }, - ok: 1 - }); - } - }); - }); - - it('does not set the session on the cursor', function (done) { - const client = new MongoClient( - new ConnectionString(`mongodb://${test.server.hostAddress()}`).toString(), - { - serverSelectionTimeoutMS: 1000 - } - ); - const cursor = new FindCursor(client, MongoDBNamespace.fromString('test.test'), {}, {}); - client.connect(function () { - cursor.next(function () { - expect(cursor.session).to.not.exist; - client.close(done); - }); - }); - }); - }); - - context('when a data bearing server becomes available', function () { - beforeEach(function () { - // Set the count of times hello has been called. - let helloCalls = 0; - test.server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - // After the first hello call errors indicating no data bearing server is - // available, any subsequent hello call should succeed after server selection. - // This gives us a data bearing server available for the next call. - request.reply(helloCalls > 0 ? mock.HELLO : { errmsg: 'network error' }); - helloCalls++; - } else if (doc.find) { - request.reply({ - cursor: { - id: Long.fromNumber(1), - ns: 'test.test', - firstBatch: [{ _id: 1, name: 'test' }] - }, - ok: 1 - }); - } - }); - }); - - it('sets the session on the cursor', function (done) { - const client = new MongoClient( - new ConnectionString(`mongodb://${test.server.hostAddress()}`).toString(), - { - serverSelectionTimeoutMS: 1000 - } - ); - const cursor = new FindCursor(client, MongoDBNamespace.fromString('test.test'), {}, {}); - client.connect(function () { - cursor.next(function () { - expect(cursor.session).to.exist; - client.close(done); - }); - }); - }); - }); - }); - - describe('Response', function () { - afterEach(() => mock.cleanup()); - beforeEach(() => { - return mock.createServer().then(mockServer => { - test.server = mockServer; - }); - }); - - it('should throw when document is error', function (done) { - const errdoc = { - errmsg: 'Cursor not found (namespace: "liveearth.entityEvents", id: 2018648316188432590).' - }; - - const client = new MongoClient( - new ConnectionString(`mongodb://${test.server.hostAddress()}`).toString(), - { serverSelectionTimeoutMS: 1000 } - ); - - test.server.setMessageHandler(request => { - const doc = request.document; - if (isHello(doc)) { - request.reply( - Object.assign({}, mock.HELLO, { - maxWireVersion: 6 - }) - ); - } else if (doc.find) { - request.reply({ - cursor: { - id: Long.fromNumber(1), - ns: 'test.test', - firstBatch: [] - }, - ok: 1 - }); - } else if (doc.getMore) { - request.reply(errdoc); - } else if (doc.killCursors) { - request.reply({ ok: 1 }); - } - }); - - client.on('error', done); - client.connect(() => { - const cursor = new FindCursor(client, MongoDBNamespace.fromString('test.test'), {}, {}); - - // Execute next - cursor.next(function (err) { - expect(err).to.exist; - expect(err).to.be.instanceof(MongoServerError); - expect(err.message).to.equal(errdoc.errmsg); - - client.close(done); - }); - }); - }); - }); -}); diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index 6127c0f33cd..483d4a3ca44 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -133,9 +133,9 @@ describe('GetMoreOperation', function () { const server = new Server(new Topology([], {} as any), new ServerDescription(''), {} as any); const operation = new GetMoreOperation(ns, cursorId, server, options); - context('when the aspect is cursor iterating', function () { + context('when the aspect is must select same server', function () { it('returns true', function () { - expect(operation.hasAspect(Aspect.CURSOR_ITERATING)).to.be.true; + expect(operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)).to.be.true; }); }); diff --git a/test/unit/sessions.test.js b/test/unit/sessions.test.js index 0ac8bf8bee4..f60dc9a1944 100644 --- a/test/unit/sessions.test.js +++ b/test/unit/sessions.test.js @@ -2,8 +2,7 @@ const mock = require('../tools/mongodb-mock/index'); const { expect } = require('chai'); -const { genClusterTime, sessionCleanupHandler } = require('../tools/common'); -const { Topology } = require('../../src/sdam/topology'); +const { genClusterTime } = require('../tools/common'); const { ServerSessionPool, ServerSession, @@ -15,32 +14,22 @@ const { getSymbolFrom } = require('../tools/utils'); const { Long } = require('../../src/bson'); const { MongoRuntimeError } = require('../../src/error'); const sinon = require('sinon'); - -let test = { - topology: null -}; +const { MongoClient } = require('../../src'); describe('Sessions - unit', function () { - const topology = {}; - const serverSessionPool = new ServerSessionPool(topology); + let client; + let serverSessionPool; + let session; + beforeEach(async function () { + client = new MongoClient('mongodb://iLoveJavascript'); + serverSessionPool = client.s.sessionPool; + session = client.startSession(); + }); describe('class ClientSession', function () { - let session; - let sessionPool; - - afterEach(done => { - if (sessionPool) { - sessionCleanupHandler(session, sessionPool, done)(); - } else { - done(); - } - }); - describe('startTransaction()', () => { it('should throw an error if the session is snapshot enabled', function () { - const topology = new Topology('localhost:27017', {}); - sessionPool = topology.s.sessionPool; - session = new ClientSession(topology, sessionPool, { snapshot: true }); + session = new ClientSession(client, serverSessionPool, { snapshot: true }); expect(session.snapshotEnabled).to.equal(true); expect(() => session.startTransaction()).to.throw( 'Transactions are not allowed with snapshot sessions' @@ -49,12 +38,6 @@ describe('Sessions - unit', function () { }); describe('advanceClusterTime()', () => { - beforeEach(() => { - const topology = new Topology('localhost:27017', {}); - sessionPool = topology.s.sessionPool; - session = new ClientSession(topology, sessionPool, {}); - }); - it('should throw an error if the input cluster time is not an object', function () { const invalidInputs = [undefined, null, 3, 'a']; for (const input of invalidInputs) { @@ -175,47 +158,46 @@ describe('Sessions - unit', function () { }); it('should throw an error if snapshot and causalConsistency options are both set to true', function () { - const topology = new Topology('localhost:27017', {}); - sessionPool = topology.s.sessionPool; expect( () => - new ClientSession(topology, sessionPool, { causalConsistency: true, snapshot: true }) + new ClientSession(client, serverSessionPool, { + causalConsistency: true, + snapshot: true + }) ).to.throw('Properties "causalConsistency" and "snapshot" are mutually exclusive'); }); it('should default to `null` for `clusterTime`', function () { - const topology = new Topology('localhost:27017', {}); - sessionPool = topology.s.sessionPool; - session = new ClientSession(topology, sessionPool); + const session = new ClientSession(client, serverSessionPool); expect(session.clusterTime).to.not.exist; }); it('should set the internal clusterTime to `initialClusterTime` if provided', function () { const clusterTime = genClusterTime(Date.now()); - const topology = new Topology('localhost:27017'); - sessionPool = topology.s.sessionPool; - session = new ClientSession(topology, sessionPool, { initialClusterTime: clusterTime }); + const session = new ClientSession(client, serverSessionPool, { + initialClusterTime: clusterTime + }); expect(session.clusterTime).to.eql(clusterTime); }); it('should acquire a serverSession in the constructor if the session is explicit', () => { - const session = new ClientSession(topology, serverSessionPool, { explicit: true }); + const session = new ClientSession(client, serverSessionPool, { explicit: true }); const serverSessionSymbol = getSymbolFrom(session, 'serverSession'); expect(session).to.have.property(serverSessionSymbol).that.is.an.instanceOf(ServerSession); }); it('should leave serverSession null if the session is implicit', () => { // implicit via false (this should not be allowed...) - let session = new ClientSession(topology, serverSessionPool, { explicit: false }); + let session = new ClientSession(client, serverSessionPool, { explicit: false }); const serverSessionSymbol = getSymbolFrom(session, 'serverSession'); expect(session).to.have.property(serverSessionSymbol, null); // implicit via omission - session = new ClientSession(topology, serverSessionPool, {}); + session = new ClientSession(client, serverSessionPool, {}); expect(session).to.have.property(serverSessionSymbol, null); }); it('should start the txnNumberIncrement at zero', () => { - const session = new ClientSession(topology, serverSessionPool); + const session = new ClientSession(client, serverSessionPool); const txnNumberIncrementSymbol = getSymbolFrom(session, 'txnNumberIncrement'); expect(session).to.have.property(txnNumberIncrementSymbol, 0); }); @@ -232,20 +214,20 @@ describe('Sessions - unit', function () { describe('from an explicit session', () => { it('should always have a non-null serverSession after construction', () => { - const session = new ClientSession(topology, serverSessionPool, { explicit: true }); + const session = new ClientSession(client, serverSessionPool, { explicit: true }); expect(session).to.have.a.property(serverSessionSymbol).be.an.instanceOf(ServerSession); expect(session.serverSession).be.an.instanceOf(ServerSession); }); it('should always have non-null serverSession even if it is ended before getter called', () => { - const session = new ClientSession(topology, serverSessionPool, { explicit: true }); + const session = new ClientSession(client, serverSessionPool, { explicit: true }); session.hasEnded = true; expect(session).to.have.a.property(serverSessionSymbol).be.an.instanceOf(ServerSession); expect(session.serverSession).be.an.instanceOf(ServerSession); }); it('should throw if the serverSession at the symbol property goes missing', () => { - const session = new ClientSession(topology, serverSessionPool, { explicit: true }); + const session = new ClientSession(client, serverSessionPool, { explicit: true }); // We really want to make sure a ClientSession is not separated from its serverSession session[serverSessionSymbol] = null; expect(session).to.have.a.property(serverSessionSymbol).be.null; @@ -255,14 +237,14 @@ describe('Sessions - unit', function () { describe('from an implicit session', () => { it('should throw if the session ended before serverSession was acquired', () => { - const session = new ClientSession(topology, serverSessionPool, { explicit: false }); // make an implicit session + const session = new ClientSession(client, serverSessionPool, { explicit: false }); // make an implicit session expect(session).to.have.property(serverSessionSymbol, null); session.hasEnded = true; expect(() => session.serverSession).to.throw(MongoRuntimeError); }); it('should acquire a serverSession if clientSession.hasEnded is false and serverSession is not set', () => { - const session = new ClientSession(topology, serverSessionPool, { explicit: false }); // make an implicit session + const session = new ClientSession(client, serverSessionPool, { explicit: false }); // make an implicit session expect(session).to.have.property(serverSessionSymbol, null); session.hasEnded = false; const acquireSpy = sinon.spy(serverSessionPool, 'acquire'); @@ -272,7 +254,7 @@ describe('Sessions - unit', function () { }); it('should return the existing serverSession and not acquire a new one if one is already set', () => { - const session = new ClientSession(topology, serverSessionPool, { explicit: false }); // make an implicit session + const session = new ClientSession(client, serverSessionPool, { explicit: false }); // make an implicit session expect(session).to.have.property(serverSessionSymbol, null); const acquireSpy = sinon.spy(serverSessionPool, 'acquire'); const firstServerSessionGetResult = session.serverSession; @@ -295,7 +277,7 @@ describe('Sessions - unit', function () { }); it('should return the existing serverSession and not acquire a new one if one is already set and session is ended', () => { - const session = new ClientSession(topology, serverSessionPool, { explicit: false }); // make an implicit session + const session = new ClientSession(client, serverSessionPool, { explicit: false }); // make an implicit session expect(session).to.have.property(serverSessionSymbol, null); const acquireSpy = sinon.spy(serverSessionPool, 'acquire'); const firstServerSessionGetResult = session.serverSession; @@ -323,7 +305,7 @@ describe('Sessions - unit', function () { describe('incrementTransactionNumber()', () => { it('should not allocate serverSession', () => { - const session = new ClientSession(topology, serverSessionPool); + const session = new ClientSession(client, serverSessionPool); const txnNumberIncrementSymbol = getSymbolFrom(session, 'txnNumberIncrement'); session.incrementTransactionNumber(); @@ -334,7 +316,7 @@ describe('Sessions - unit', function () { }); it('should save increments to txnNumberIncrement symbol', () => { - const session = new ClientSession(topology, serverSessionPool); + const session = new ClientSession(client, serverSessionPool); const txnNumberIncrementSymbol = getSymbolFrom(session, 'txnNumberIncrement'); session.incrementTransactionNumber(); @@ -347,7 +329,7 @@ describe('Sessions - unit', function () { describe('applySession()', () => { it('should allocate serverSession', () => { - const session = new ClientSession(topology, serverSessionPool); + const session = new ClientSession(client, serverSessionPool); const serverSessionSymbol = getSymbolFrom(session, 'serverSession'); const command = { magic: 1 }; @@ -359,7 +341,7 @@ describe('Sessions - unit', function () { }); it('should apply saved txnNumberIncrements', () => { - const session = new ClientSession(topology, serverSessionPool); + const session = new ClientSession(client, serverSessionPool); const serverSessionSymbol = getSymbolFrom(session, 'serverSession'); session.incrementTransactionNumber(); @@ -382,43 +364,32 @@ describe('Sessions - unit', function () { }); describe('class ServerSessionPool', function () { - afterEach(() => { - test.topology.close(); - return mock.cleanup(); + let client; + let server; + beforeEach(async () => { + server = await mock.createServer(); + server.setMessageHandler(request => { + var doc = request.document; + if (isHello(doc)) { + request.reply(Object.assign({}, mock.HELLO, { logicalSessionTimeoutMinutes: 10 })); + } + }); + + client = new MongoClient(`mongodb://${server.hostAddress()}`); + await client.connect(); }); - beforeEach(() => { - return mock - .createServer() - .then(server => { - test.server = server; - test.server.setMessageHandler(request => { - var doc = request.document; - if (isHello(doc)) { - request.reply(Object.assign({}, mock.HELLO, { logicalSessionTimeoutMinutes: 10 })); - } - }); - }) - .then(() => { - test.topology = new Topology(test.server.hostAddress()); - - return new Promise((resolve, reject) => { - test.topology.once('error', reject); - test.topology.once('connect', resolve); - test.topology.connect(); - }); - }); + afterEach(async () => { + if (client) await client.close(); + await mock.cleanup(); }); it('should throw errors with invalid parameters', function () { - expect(() => { - new ServerSessionPool(); - }).to.throw(/ServerSessionPool requires a topology/); + expect(() => new ServerSessionPool()).to.throw(MongoRuntimeError); }); it('should create a new session if the pool is empty', function (done) { - const pool = new ServerSessionPool(test.topology); - done = sessionCleanupHandler(null, pool, done); + const pool = new ServerSessionPool(client); expect(pool.sessions).to.have.length(0); const session = pool.acquire(); @@ -431,8 +402,7 @@ describe('Sessions - unit', function () { it('should reuse sessions which have not timed out yet on acquire', function (done) { const oldSession = new ServerSession(); - const pool = new ServerSessionPool(test.topology); - done = sessionCleanupHandler(null, pool, done); + const pool = new ServerSessionPool(client); pool.sessions.push(oldSession); const session = pool.acquire(); @@ -447,8 +417,7 @@ describe('Sessions - unit', function () { const oldSession = new ServerSession(); oldSession.lastUse = now() - 30 * 60 * 1000; // add 30min - const pool = new ServerSessionPool(test.topology); - done = sessionCleanupHandler(null, pool, done); + const pool = new ServerSessionPool(client); pool.sessions.push(oldSession); const session = pool.acquire(); @@ -466,8 +435,7 @@ describe('Sessions - unit', function () { return session; }); - const pool = new ServerSessionPool(test.topology); - done = sessionCleanupHandler(null, pool, done); + const pool = new ServerSessionPool(client); pool.sessions = pool.sessions.concat(oldSessions); pool.release(newSession); @@ -480,8 +448,7 @@ describe('Sessions - unit', function () { const session = new ServerSession(); session.lastUse = now() - 9.5 * 60 * 1000; // add 9.5min - const pool = new ServerSessionPool(test.topology); - done = sessionCleanupHandler(null, pool, done); + const pool = new ServerSessionPool(client); pool.release(session); expect(pool.sessions).to.have.length(0); @@ -489,8 +456,7 @@ describe('Sessions - unit', function () { }); it('should maintain a LIFO queue of sessions', function (done) { - const pool = new ServerSessionPool(test.topology); - done = sessionCleanupHandler(null, pool, done); + const pool = new ServerSessionPool(client); const sessionA = new ServerSession(); const sessionB = new ServerSession();