diff --git a/lib/core/connection/pool.js b/lib/core/connection/pool.js index 53ce1c79492..16d8aa6d545 100644 --- a/lib/core/connection/pool.js +++ b/lib/core/connection/pool.js @@ -25,12 +25,14 @@ const makeStateMachine = require('../utils').makeStateMachine; const DISCONNECTED = 'disconnected'; const CONNECTING = 'connecting'; const CONNECTED = 'connected'; +const DRAINING = 'draining'; const DESTROYING = 'destroying'; const DESTROYED = 'destroyed'; const stateTransition = makeStateMachine({ - [DISCONNECTED]: [CONNECTING, DESTROYING, DISCONNECTED], - [CONNECTING]: [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED], - [CONNECTED]: [CONNECTED, DISCONNECTED, DESTROYING], + [DISCONNECTED]: [CONNECTING, DRAINING, DISCONNECTED], + [CONNECTING]: [CONNECTING, CONNECTED, DRAINING, DISCONNECTED], + [CONNECTED]: [CONNECTED, DISCONNECTED, DRAINING], + [DRAINING]: [DRAINING, DESTROYING, DESTROYED], [DESTROYING]: [DESTROYING, DESTROYED], [DESTROYED]: [DESTROYED] }); @@ -239,7 +241,10 @@ function resetPoolState(pool) { function connectionFailureHandler(pool, event, err, conn) { if (conn) { - if (conn._connectionFailHandled) return; + if (conn._connectionFailHandled) { + return; + } + conn._connectionFailHandled = true; conn.destroy(); @@ -270,8 +275,10 @@ function connectionFailureHandler(pool, event, err, conn) { // No more socket available propegate the event if (pool.socketCount() === 0) { - if (pool.state !== DESTROYED && pool.state !== DESTROYING) { - stateTransition(pool, DISCONNECTED); + if (pool.state !== DESTROYED && pool.state !== DESTROYING && pool.state !== DRAINING) { + if (pool.options.reconnect) { + stateTransition(pool, DISCONNECTED); + } } // Do not emit error events, they are always close events @@ -426,7 +433,7 @@ function messageHandler(self) { updateSessionFromResponse(session, document); } - if (document.$clusterTime) { + if (self.topology && document.$clusterTime) { self.topology.clusterTime = document.$clusterTime; } } @@ -537,7 +544,7 @@ Pool.prototype.isDisconnected = function() { /** * Connect pool */ -Pool.prototype.connect = function() { +Pool.prototype.connect = function(callback) { if (this.state !== DISCONNECTED) { throw new MongoError('connection in unlawful state ' + this.state); } @@ -545,6 +552,12 @@ Pool.prototype.connect = function() { stateTransition(this, CONNECTING); createConnection(this, (err, conn) => { if (err) { + if (typeof callback === 'function') { + this.destroy(); + callback(err); + return; + } + if (this.state === CONNECTING) { this.emit('error', err); } @@ -554,7 +567,6 @@ Pool.prototype.connect = function() { } stateTransition(this, CONNECTED); - this.emit('connect', this, conn); // create min connections if (this.minSize) { @@ -562,6 +574,12 @@ Pool.prototype.connect = function() { createConnection(this); } } + + if (typeof callback === 'function') { + callback(null, conn); + } else { + this.emit('connect', this, conn); + } }); }; @@ -596,6 +614,8 @@ Pool.prototype.unref = function() { // Destroy the connections function destroy(self, connections, options, callback) { + stateTransition(self, DESTROYING); + eachAsync( connections, (conn, cb) => { @@ -626,14 +646,19 @@ function destroy(self, connections, options, callback) { */ Pool.prototype.destroy = function(force, callback) { var self = this; + if (typeof force === 'function') { + callback = force; + force = false; + } + // Do not try again if the pool is already dead if (this.state === DESTROYED || self.state === DESTROYING) { if (typeof callback === 'function') callback(null, null); return; } - // Set state to destroyed - stateTransition(this, DESTROYING); + // Set state to draining + stateTransition(this, DRAINING); // Are we force closing if (force) { @@ -660,6 +685,14 @@ Pool.prototype.destroy = function(force, callback) { // Wait for the operations to drain before we close the pool function checkStatus() { + if (self.state === DESTROYED || self.state === DESTROYING) { + if (typeof callback === 'function') { + callback(); + } + + return; + } + flushMonitoringOperations(self.queue); if (self.queue.length === 0) { @@ -676,7 +709,6 @@ Pool.prototype.destroy = function(force, callback) { } destroy(self, connections, { force: false }, callback); - // } else if (self.queue.length > 0 && !this.reconnectId) { } else { // Ensure we empty the queue _execute(self)(); @@ -695,6 +727,14 @@ Pool.prototype.destroy = function(force, callback) { * @param {function} [callback] */ Pool.prototype.reset = function(callback) { + if (this.s.state !== CONNECTED) { + if (typeof callback === 'function') { + callback(new MongoError('pool is not connected, reset aborted')); + } + + return; + } + const connections = this.availableConnections.concat(this.inUseConnections); eachAsync( connections, @@ -715,12 +755,12 @@ Pool.prototype.reset = function(callback) { resetPoolState(this); - // create an initial connection, and kick off execution again - createConnection(this); - - if (typeof callback === 'function') { - callback(null, null); - } + // create a new connection, this will ultimately trigger execution + createConnection(this, () => { + if (typeof callback === 'function') { + callback(null, null); + } + }); } ); }; @@ -788,17 +828,12 @@ Pool.prototype.write = function(command, options, cb) { // Pool was destroyed error out if (this.state === DESTROYED || this.state === DESTROYING) { - // Callback with an error - if (cb) { - try { - cb(new MongoError('pool destroyed')); - } catch (err) { - process.nextTick(function() { - throw err; - }); - } - } + cb(new MongoError('pool destroyed')); + return; + } + if (this.state === DRAINING) { + cb(new MongoError('pool is draining, new operations prohibited')); return; } @@ -846,10 +881,6 @@ Pool.prototype.write = function(command, options, cb) { // Optional per operation socketTimeout operation.socketTimeout = options.socketTimeout; operation.monitoring = options.monitoring; - // Custom socket Timeout - if (options.socketTimeout) { - operation.socketTimeout = options.socketTimeout; - } // Get the requestId operation.requestId = command.requestId; @@ -948,15 +979,6 @@ function createConnection(pool, callback) { pool.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`); } - if (pool.options.legacyCompatMode === false) { - // The unified topology uses the reported `error` from a pool to track what error - // reason is returned to the user during selection timeout. We only want to emit - // this if the pool is active because the listeners are removed on destruction. - if (pool.state !== DESTROYED && pool.state !== DESTROYING) { - pool.emit('error', err); - } - } - // check if reconnect is enabled, and attempt retry if so if (!pool.reconnectId && pool.options.reconnect) { if (pool.state === CONNECTING && pool.options.legacyCompatMode) { @@ -1047,6 +1069,12 @@ function _execute(self) { if (self.availableConnections.length === 0) { // Flush any monitoring operations flushMonitoringOperations(self.queue); + + // Try to create a new connection to execute stuck operation + if (totalConnections < self.options.size && self.queue.length > 0) { + createConnection(self); + } + break; } @@ -1111,10 +1139,7 @@ function _execute(self) { } // Re-execute the operation - setTimeout(function() { - _execute(self)(); - }, 10); - + setTimeout(() => _execute(self)(), 10); break; } } diff --git a/lib/core/sdam/monitoring.js b/lib/core/sdam/monitoring.js index 15f081c89d9..98b46636acd 100644 --- a/lib/core/sdam/monitoring.js +++ b/lib/core/sdam/monitoring.js @@ -4,8 +4,8 @@ const ServerDescription = require('./server_description').ServerDescription; const calculateDurationInMs = require('../utils').calculateDurationInMs; // pulled from `Server` implementation -const STATE_DISCONNECTED = 'disconnected'; -const STATE_DISCONNECTING = 'disconnecting'; +const STATE_CLOSED = 'closed'; +const STATE_CLOSING = 'closing'; /** * Published when server description changes, but does NOT include changes to the RTT. @@ -180,7 +180,7 @@ function monitorServer(server, options) { // emit an event indicating that our description has changed server.emit('descriptionReceived', new ServerDescription(server.description.address, isMaster)); - if (server.s.state === STATE_DISCONNECTED || server.s.state === STATE_DISCONNECTING) { + if (server.s.state === STATE_CLOSED || server.s.state === STATE_CLOSING) { return; } diff --git a/lib/core/sdam/server.js b/lib/core/sdam/server.js index 19f94cd84bc..582f7e2ad63 100644 --- a/lib/core/sdam/server.js +++ b/lib/core/sdam/server.js @@ -148,16 +148,13 @@ class Server extends EventEmitter { { bson: this.s.bson } ); - // NOTE: this should only be the case if we are connecting to a single server - poolOptions.reconnect = true; + // NOTE: reconnect is explicitly false because of the server selection loop + poolOptions.reconnect = false; poolOptions.legacyCompatMode = false; this.s.pool = new Pool(this, poolOptions); // setup listeners - this.s.pool.on('connect', connectEventHandler(this)); - this.s.pool.on('close', errorEventHandler(this)); - this.s.pool.on('error', errorEventHandler(this)); this.s.pool.on('parseError', parseErrorEventHandler(this)); // it is unclear whether consumers should even know about these events @@ -170,13 +167,7 @@ class Server extends EventEmitter { stateTransition(this, STATE_CONNECTING); - // If auth settings have been provided, use them - if (options.auth) { - this.s.pool.connect.apply(this.s.pool, options.auth); - return; - } - - this.s.pool.connect(); + this.s.pool.connect(connectEventHandler(this)); } /** @@ -249,6 +240,11 @@ class Server extends EventEmitter { (callback = options), (options = {}), (options = options || {}); } + if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { + callback(new MongoError('server is closed')); + return; + } + const error = basicReadValidations(this, options); if (error) { return callback(error, null); @@ -298,6 +294,11 @@ class Server extends EventEmitter { * @param {function} callback */ query(ns, cmd, cursorState, options, callback) { + if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { + callback(new MongoError('server is closed')); + return; + } + wireProtocol.query(this, ns, cmd, cursorState, options, (err, result) => { if (err) { if (options.session && err instanceof MongoNetworkError) { @@ -322,6 +323,11 @@ class Server extends EventEmitter { * @param {function} callback */ getMore(ns, cursorState, batchSize, options, callback) { + if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { + callback(new MongoError('server is closed')); + return; + } + wireProtocol.getMore(this, ns, cursorState, batchSize, options, (err, result) => { if (err) { if (options.session && err instanceof MongoNetworkError) { @@ -345,6 +351,14 @@ class Server extends EventEmitter { * @param {function} callback */ killCursors(ns, cursorState, callback) { + if (this.s.state === STATE_CLOSING || this.s.state === STATE_CLOSED) { + if (typeof callback === 'function') { + callback(new MongoError('server is closed')); + } + + return; + } + wireProtocol.killCursors(this, ns, cursorState, (err, result) => { if (err && isSDAMUnrecoverableError(err, this)) { this.emit('error', err); @@ -447,6 +461,11 @@ function executeWriteOperation(args, options, callback) { const ns = args.ns; const ops = Array.isArray(args.ops) ? args.ops : [args.ops]; + if (server.s.state === STATE_CLOSING || server.s.state === STATE_CLOSED) { + callback(new MongoError('server is closed')); + return; + } + const error = basicWriteValidations(server, options); if (error) { callback(error, null); @@ -474,7 +493,19 @@ function executeWriteOperation(args, options, callback) { } function connectEventHandler(server) { - return function(pool, conn) { + return function(err, conn) { + if (server.s.state === STATE_CLOSING || server.s.state === STATE_CLOSED) { + return; + } + + if (err) { + server.emit('error', new MongoNetworkError(err)); + + stateTransition(server, STATE_CLOSED); + server.emit('close'); + return; + } + const ismaster = conn.ismaster; server.s.lastIsMasterMS = conn.lastIsMasterMS; if (conn.agreedCompressor) { @@ -506,16 +537,6 @@ function connectEventHandler(server) { }; } -function errorEventHandler(server) { - return function(err) { - if (err) { - server.emit('error', new MongoNetworkError(err)); - } - - server.emit('close'); - }; -} - function parseErrorEventHandler(server) { return function(err) { stateTransition(this, STATE_CLOSED); diff --git a/lib/core/sdam/topology.js b/lib/core/sdam/topology.js index 6bd570fbdea..ee1211ba476 100644 --- a/lib/core/sdam/topology.js +++ b/lib/core/sdam/topology.js @@ -26,6 +26,7 @@ const resolveClusterTime = require('../topologies/shared').resolveClusterTime; const SrvPoller = require('./srv_polling').SrvPoller; const getMMAPError = require('../topologies/shared').getMMAPError; const makeStateMachine = require('../utils').makeStateMachine; +const eachAsync = require('../utils').eachAsync; // Global state let globalTopologyCounter = 0; @@ -143,7 +144,7 @@ class Topology extends EventEmitter { ), serverSelectionTimeoutMS: options.serverSelectionTimeoutMS, heartbeatFrequencyMS: options.heartbeatFrequencyMS, - minHeartbeatIntervalMS: options.minHeartbeatIntervalMS, + minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS, // allow users to override the cursor factory Cursor: options.cursorFactory || CoreCursor, // the bson parser @@ -177,8 +178,9 @@ class Topology extends EventEmitter { clusterTime: null, // timer management - monitorTimers: [], - iterationTimers: [] + monitorTimers: new Set(), + iterationTimers: new Set(), + connectionTimers: new Set() }; // amend options for server instance creation @@ -271,7 +273,6 @@ class Topology extends EventEmitter { translateReadPreference(options); const readPreference = options.readPreference || ReadPreference.primary; - this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { if (err) { stateTransition(this, STATE_CLOSED); @@ -333,11 +334,9 @@ class Topology extends EventEmitter { } // clear all existing monitor timers - this.s.monitorTimers.map(timer => clearTimeout(timer)); - this.s.monitorTimers = []; - - this.s.iterationTimers.map(timer => clearTimeout(timer)); - this.s.iterationTimers = []; + drainTimerQueue(this.s.monitorTimers); + drainTimerQueue(this.s.iterationTimers); + drainTimerQueue(this.s.connectionTimers); if (this.s.sessionPool) { this.s.sessions.forEach(session => session.endSession()); @@ -360,31 +359,22 @@ class Topology extends EventEmitter { // defer state transition because we may need to send an `endSessions` command above stateTransition(this, STATE_CLOSING); - const servers = this.s.servers; - if (servers.size === 0) { - stateTransition(this, STATE_CLOSED); - if (typeof callback === 'function') { - callback(null, null); - } + eachAsync( + Array.from(this.s.servers.values()), + (server, cb) => destroyServer(server, this, options, cb), + () => { + this.s.servers.clear(); - return; - } + // emit an event for close + this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id)); + + stateTransition(this, STATE_CLOSED); + this.emit('close'); - // destroy all child servers - let destroyed = 0; - servers.forEach(server => - destroyServer(server, this, options, () => { - destroyed++; - if (destroyed === servers.size) { - // emit an event for close - this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id)); - - stateTransition(this, STATE_CLOSED); - if (typeof callback === 'function') { - callback(null, null); - } + if (typeof callback === 'function') { + callback(); } - }) + } ); } @@ -432,9 +422,7 @@ class Topology extends EventEmitter { return; } - // clear out any existing iteration timers - this.s.iterationTimers.map(timer => clearTimeout(timer)); - this.s.iterationTimers = []; + drainTimerQueue(this.s.iterationTimers); selectServers( this, @@ -442,7 +430,7 @@ class Topology extends EventEmitter { options.serverSelectionTimeoutMS, process.hrtime(), (err, servers) => { - if (err) return callback(err, null); + if (err) return callback(err); const selectedServer = randomSelection(servers); if (isSharded && transaction && transaction.isActive) { @@ -656,7 +644,7 @@ class Topology extends EventEmitter { this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { if (err) { - callback(err, null); + callback(err); return; } @@ -853,10 +841,11 @@ function selectServers(topology, selector, timeout, start, callback) { }, timeout - duration); const connectHandler = () => { - clearTimeout(failToConnectTimer); + clearAndRemoveTimerFrom(failToConnectTimer, topology.s.connectionTimers); selectServers(topology, selector, timeout, process.hrtime(), callback); }; + topology.s.connectionTimers.add(failToConnectTimer); topology.once('connect', connectHandler); return; } @@ -889,8 +878,7 @@ function selectServers(topology, selector, timeout, start, callback) { const retrySelection = () => { // clear all existing monitor timers - topology.s.monitorTimers.map(timer => clearTimeout(timer)); - topology.s.monitorTimers = []; + drainTimerQueue(topology.s.monitorTimers); // ensure all server monitors attempt monitoring soon topology.s.servers.forEach(server => { @@ -899,18 +887,9 @@ function selectServers(topology, selector, timeout, start, callback) { TOPOLOGY_DEFAULTS.minHeartbeatFrequencyMS ); - topology.s.monitorTimers.push(timer); + topology.s.monitorTimers.add(timer); }); - const descriptionChangedHandler = () => { - // successful iteration, clear the check timer - clearTimeout(iterationTimer); - topology.s.iterationTimers.splice(timerIndex, 1); - - // topology description has changed due to monitoring, reattempt server selection - selectServers(topology, selector, timeout, start, callback); - }; - const iterationTimer = setTimeout(() => { topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler); callback( @@ -921,8 +900,16 @@ function selectServers(topology, selector, timeout, start, callback) { ); }, timeout - duration); + const descriptionChangedHandler = () => { + // successful iteration, clear the check timer + clearAndRemoveTimerFrom(iterationTimer, topology.s.iterationTimers); + + // topology description has changed due to monitoring, reattempt server selection + selectServers(topology, selector, timeout, start, callback); + }; + // track this timer in case we need to clean it up outside this loop - const timerIndex = topology.s.iterationTimers.push(iterationTimer); + topology.s.iterationTimers.add(iterationTimer); topology.once('topologyDescriptionChanged', descriptionChangedHandler); }; @@ -930,7 +917,7 @@ function selectServers(topology, selector, timeout, start, callback) { retrySelection(); } -function createAndConnectServer(topology, serverDescription) { +function createAndConnectServer(topology, serverDescription, connectDelay) { topology.emit( 'serverOpening', new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address) @@ -942,11 +929,50 @@ function createAndConnectServer(topology, serverDescription) { server.once('connect', serverConnectEventHandler(server, topology)); server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology)); server.on('error', serverErrorEventHandler(server, topology)); - server.on('close', () => topology.emit('close', server)); + + if (connectDelay) { + const connectTimer = setTimeout(() => { + clearAndRemoveTimerFrom(connectTimer, topology.s.connectionTimers); + server.connect(); + }, connectDelay); + + topology.s.connectionTimers.add(connectTimer); + return server; + } + server.connect(); return server; } +function resetServer(topology, serverDescription) { + if (!topology.s.servers.has(serverDescription.address)) { + return; + } + + // first remove the old server + const server = topology.s.servers.get(serverDescription.address); + destroyServer(server, topology); + + // add the new server, and attempt connection after a delay + const newServer = createAndConnectServer( + topology, + serverDescription, + topology.s.minHeartbeatFrequencyMS + ); + + topology.s.servers.set(serverDescription.address, newServer); +} + +function drainTimerQueue(queue) { + queue.forEach(clearTimeout); + queue.clear(); +} + +function clearAndRemoveTimerFrom(timer, timers) { + clearTimeout(timer); + return timers.delete(timer); +} + /** * Create `Server` instances for all initially known servers, connect them, and assign * them to the passed in `Topology`. @@ -963,6 +989,15 @@ function connectServers(topology, serverDescriptions) { } function updateServers(topology, incomingServerDescription) { + // if the server was reset internally because of an error, we need to replace the + // `Server` instance for it so we can attempt reconnect. + // + // TODO: this logical can change once CMAP is put in place + if (incomingServerDescription && incomingServerDescription.error) { + resetServer(topology, incomingServerDescription); + return; + } + // update the internal server's description if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) { const server = topology.s.servers.get(incomingServerDescription.address); @@ -1001,10 +1036,16 @@ function serverConnectEventHandler(server, topology) { }; } -function serverErrorEventHandler(server /*, topology */) { +function serverErrorEventHandler(server, topology) { return function(err) { + if (topology.s.state === STATE_CLOSING || topology.s.state === STATE_CLOSED) { + return; + } + if (isSDAMUnrecoverableError(err, server)) { - resetServerState(server, err, { clearPool: true }); + // NOTE: this must be commented out until we switch to the new CMAP pool because + // we presently _always_ clear the pool on error. + resetServerState(server, err /*, { clearPool: true } */); return; } diff --git a/lib/core/sdam/topology_description.js b/lib/core/sdam/topology_description.js index 7fef2bc6432..c2a1bd6191c 100644 --- a/lib/core/sdam/topology_description.js +++ b/lib/core/sdam/topology_description.js @@ -191,7 +191,7 @@ class TopologyDescription { } if (topologyType === TopologyType.ReplicaSetNoPrimary) { - if ([ServerType.Mongos, ServerType.Unknown].indexOf(serverType) >= 0) { + if ([ServerType.Standalone, ServerType.Mongos].indexOf(serverType) >= 0) { serverDescriptions.delete(address); } diff --git a/lib/operations/close.js b/lib/operations/close.js index 57fdef6f960..1c59d111347 100644 --- a/lib/operations/close.js +++ b/lib/operations/close.js @@ -3,6 +3,7 @@ const Aspect = require('./operation').Aspect; const defineAspects = require('./operation').defineAspects; const OperationBase = require('./operation').OperationBase; +const NativeTopology = require('../topologies/native_topology'); class CloseOperation extends OperationBase { constructor(client, force) { @@ -16,8 +17,11 @@ class CloseOperation extends OperationBase { const force = this.force; const completeClose = err => { client.emit('close', client); - for (const item of client.s.dbCache) { - item[1].emit('close', client); + + if (!(client.topology instanceof NativeTopology)) { + for (const item of client.s.dbCache) { + item[1].emit('close', client); + } } client.removeAllListeners('close'); diff --git a/test/core/functional/pool_tests.js b/test/core/functional/pool_tests.js index cbc0df68f40..77658999183 100644 --- a/test/core/functional/pool_tests.js +++ b/test/core/functional/pool_tests.js @@ -167,7 +167,7 @@ describe('Pool tests', function() { var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, - socketTimeout: 3000, + socketTimeout: 500, bson: new Bson(), reconnect: false }); @@ -1141,4 +1141,64 @@ describe('Pool tests', function() { pool.connect(); } }); + + it('should support callback mode for connect', { + metadata: { requires: { topology: 'single' } }, + test: function(done) { + const pool = new Pool(null, { + host: this.configuration.host, + port: this.configuration.port, + bson: new Bson() + }); + + pool.on('connect', () => done(new Error('connect was emitted'))); + pool.connect(err => { + expect(err).to.not.exist; + setTimeout(() => { + pool.destroy(true, done); + }, 100); // wait to ensure event is not emitted + }); + } + }); + + it('should support resetting', function(done) { + const pool = new Pool(null, { + host: this.configuration.host, + port: this.configuration.port, + bson: new Bson() + }); + + const isMasterQuery = new Query( + new Bson(), + 'system.$cmd', + { ismaster: true }, + { numberToSkip: 0, numberToReturn: 1 } + ); + + pool.once('connect', () => { + const connections = pool.allConnections().map(conn => conn.id); + expect(connections).to.have.length(1); + + pool.write(isMasterQuery, err => { + expect(err).to.not.exist; + + pool.reset(err => { + expect(err).to.not.exist; + + pool.write(isMasterQuery, err => { + expect(err).to.not.exist; + + // verify the previous connection was dropped, and a new connection was created + const newConnections = pool.allConnections().map(conn => conn.id); + expect(newConnections).to.have.length(1); + expect(newConnections[0]).to.not.equal(connections[0]); + + pool.destroy(done); + }); + }); + }); + }); + + pool.connect(); + }); }); diff --git a/test/core/functional/server_tests.js b/test/core/functional/server_tests.js index baa70775387..0c2a8e2aeb7 100644 --- a/test/core/functional/server_tests.js +++ b/test/core/functional/server_tests.js @@ -995,7 +995,7 @@ describe('Server tests', function() { const config = this.configuration; var client = config.newTopology(server.address().host, server.address().port, { - serverSelectionTimeoutMS: 10 + serverSelectionTimeoutMS: 500 }); client.on('error', error => { diff --git a/test/functional/spec/transactions/convenient-api/transaction-options.json b/test/functional/spec/transactions/convenient-api/transaction-options.json index 6b197a44c59..62389e3c616 100644 --- a/test/functional/spec/transactions/convenient-api/transaction-options.json +++ b/test/functional/spec/transactions/convenient-api/transaction-options.json @@ -5,12 +5,6 @@ "topology": [ "replicaset" ] - }, - { - "minServerVersion": "4.1.8", - "topology": [ - "sharded" - ] } ], "database_name": "withTransaction-tests", diff --git a/test/functional/spec/transactions/convenient-api/transaction-options.yml b/test/functional/spec/transactions/convenient-api/transaction-options.yml index bc243972bfd..519ef069f3c 100644 --- a/test/functional/spec/transactions/convenient-api/transaction-options.yml +++ b/test/functional/spec/transactions/convenient-api/transaction-options.yml @@ -2,9 +2,6 @@ runOn: - minServerVersion: "4.0" topology: ["replicaset"] - - - minServerVersion: "4.1.8" - topology: ["sharded"] database_name: &database_name "withTransaction-tests" collection_name: &collection_name "test" diff --git a/test/functional/spec/transactions/transaction-options.json b/test/functional/spec/transactions/transaction-options.json index 7af7bb64562..76310b3026f 100644 --- a/test/functional/spec/transactions/transaction-options.json +++ b/test/functional/spec/transactions/transaction-options.json @@ -5,12 +5,6 @@ "topology": [ "replicaset" ] - }, - { - "minServerVersion": "4.1.8", - "topology": [ - "sharded" - ] } ], "database_name": "transaction-tests", diff --git a/test/functional/spec/transactions/transaction-options.yml b/test/functional/spec/transactions/transaction-options.yml index 444ade0d6b8..8a04302490c 100644 --- a/test/functional/spec/transactions/transaction-options.yml +++ b/test/functional/spec/transactions/transaction-options.yml @@ -2,9 +2,6 @@ runOn: - minServerVersion: "4.0" topology: ["replicaset"] - - - minServerVersion: "4.1.8" - topology: ["sharded"] database_name: &database_name "transaction-tests" collection_name: &collection_name "test" diff --git a/test/mocha.opts b/test/mocha.opts index 90b5b4cf804..b814086a20a 100644 --- a/test/mocha.opts +++ b/test/mocha.opts @@ -1,3 +1,4 @@ + --recursive --timeout 60000 --file test/runner --ui test/runner/metadata_ui.js diff --git a/test/runner/config.js b/test/runner/config.js index f1d7c5ef604..2d31da57012 100644 --- a/test/runner/config.js +++ b/test/runner/config.js @@ -97,9 +97,6 @@ class NativeConfiguration { newTopology(host, port, options) { options = Object.assign({}, options); - host = host || this.options.host; - port = port || this.options.port; - const hosts = host == null ? [].concat(this.options.hosts) : [{ host, port }]; if (this.usingUnifiedTopology()) { return new core.Topology(hosts, options); diff --git a/test/runner/filters/unified_filter.js b/test/runner/filters/unified_filter.js index 534b8abd04b..0768099a9df 100644 --- a/test/runner/filters/unified_filter.js +++ b/test/runner/filters/unified_filter.js @@ -12,11 +12,13 @@ */ class UnifiedTopologyFilter { filter(test) { - if (!test.metadata) return true; - if (!test.metadata.requires) return true; - if (!test.metadata.requires.unifiedTopology) return true; + const unifiedTopology = + test.metadata && test.metadata.requires && test.metadata.requires.unifiedTopology; - return !!process.env.MONGODB_UNIFIED_TOPOLOGY; + return ( + typeof unifiedTopology !== 'boolean' || + unifiedTopology === process.env.MONGODB_UNIFIED_TOPOLOGY + ); } } diff --git a/test/tools/run_each_test.sh b/test/tools/run_each_test.sh new file mode 100755 index 00000000000..f7722b8f6c0 --- /dev/null +++ b/test/tools/run_each_test.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +if [ "$#" -ne 1 ]; then + echo "usage: run_each_test " + exit +fi + +TEST_PATH=$1 +find $TEST_PATH -type f \( -iname "*_tests.js" ! -iname "*atlas*" ! -path "*node-next*" \) -exec npx mocha {} \;