Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8d487d1
fix(sdam): don't remove unknown servers in topology updates
mbroadst Oct 25, 2019
2f8f8fd
refactor(pool): support a callback in `connect`
mbroadst Oct 25, 2019
5498b6f
fix(sdam): don't emit `close` every time a child server closes
mbroadst Oct 25, 2019
db0d63a
fix(sdam): don't lose servers when they fail monitoring
mbroadst Oct 28, 2019
0183427
refactor(sdam): `null` => `undefined`
mbroadst Oct 29, 2019
d2b5549
refactor(sdam): track server connections in a different timer list
mbroadst Oct 30, 2019
c152aa1
refactor(pool): support creating connections in `destroying` state
mbroadst Oct 30, 2019
eca376c
fix(close): the unified topology emits a close event on close now
mbroadst Oct 30, 2019
85a4db5
fix(server): ensure state is transitioned to closed on connect fail
mbroadst Oct 30, 2019
93497a5
fix(server): don't emit error in connect if closing/closed
mbroadst Oct 30, 2019
7951949
fix(sdam): ignore server errors when closing/closed
mbroadst Oct 30, 2019
fd26385
fix(sdam): `minHeartbeatIntervalMS` => `minHeartbeatFrequencyMS`
mbroadst Oct 30, 2019
dda0a7f
refactor(sdam): revert `null` => `undefined` changes until 4.x
mbroadst Oct 30, 2019
1714ace
test: allow specifying that tests require legacy topologies
mbroadst Oct 30, 2019
8dc06a0
refactor(pool): don't explicitly create new connection on reset
mbroadst Oct 30, 2019
051fce7
test: `newTopology` should use all hosts from a connection string
mbroadst Oct 30, 2019
ef9b45e
test: add script for running each test, useful for finding leaks
mbroadst Oct 31, 2019
4b46845
test: ensure mocha is always being run recursively
mbroadst Nov 1, 2019
8c9bf39
refactor(pool): introduce `draining` state to account for late ops
mbroadst Nov 1, 2019
00b8833
fix(pool): only transition to `DISCONNECTED` if reconnect enabled
mbroadst Nov 1, 2019
87bd12f
test: increase server selection timeout for flakey test
mbroadst Nov 2, 2019
15b4c9c
fix(monitoring): incorrect states used to determine rescheduling
mbroadst Nov 3, 2019
db6f3fe
fix(pool): don't reset a pool if we'not already connected
mbroadst Nov 3, 2019
7d5fab2
refactor(server): do not permit commands when server is closed
mbroadst Nov 3, 2019
060b90f
test: skip broken transaction tests until SERVER-40685 is fixed
mbroadst Nov 3, 2019
c552786
test: refactor logic for unified topology filter
mbroadst Nov 4, 2019
f57b9e2
refactor(topology): use sets for timer management
mbroadst Nov 4, 2019
55b4e24
test: add tests around pool reset behavior
mbroadst Nov 4, 2019
c38668d
refactor(pool): wait for `createConnection` on pool reset
mbroadst Nov 4, 2019
30dfc02
refactor(topology): don't explicit reset pool on SDAM unrecoverable
mbroadst Nov 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 70 additions & 45 deletions lib/core/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ const makeStateMachine = require('../utils').makeStateMachine;
const DISCONNECTED = 'disconnected';
const CONNECTING = 'connecting';
const CONNECTED = 'connected';
const DRAINING = 'draining';
const DESTROYING = 'destroying';
const DESTROYED = 'destroyed';
const stateTransition = makeStateMachine({
[DISCONNECTED]: [CONNECTING, DESTROYING, DISCONNECTED],
[CONNECTING]: [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
[CONNECTED]: [CONNECTED, DISCONNECTED, DESTROYING],
[DISCONNECTED]: [CONNECTING, DRAINING, DISCONNECTED],
[CONNECTING]: [CONNECTING, CONNECTED, DRAINING, DISCONNECTED],
[CONNECTED]: [CONNECTED, DISCONNECTED, DRAINING],
[DRAINING]: [DRAINING, DESTROYING, DESTROYED],
[DESTROYING]: [DESTROYING, DESTROYED],
[DESTROYED]: [DESTROYED]
});
Expand Down Expand Up @@ -239,7 +241,10 @@ function resetPoolState(pool) {

function connectionFailureHandler(pool, event, err, conn) {
if (conn) {
if (conn._connectionFailHandled) return;
if (conn._connectionFailHandled) {
return;
}

conn._connectionFailHandled = true;
conn.destroy();

Expand Down Expand Up @@ -270,8 +275,10 @@ function connectionFailureHandler(pool, event, err, conn) {

// No more socket available propegate the event
if (pool.socketCount() === 0) {
if (pool.state !== DESTROYED && pool.state !== DESTROYING) {
stateTransition(pool, DISCONNECTED);
if (pool.state !== DESTROYED && pool.state !== DESTROYING && pool.state !== DRAINING) {
if (pool.options.reconnect) {
stateTransition(pool, DISCONNECTED);
}
}

// Do not emit error events, they are always close events
Expand Down Expand Up @@ -426,7 +433,7 @@ function messageHandler(self) {
updateSessionFromResponse(session, document);
}

if (document.$clusterTime) {
if (self.topology && document.$clusterTime) {
self.topology.clusterTime = document.$clusterTime;
}
}
Expand Down Expand Up @@ -537,14 +544,20 @@ Pool.prototype.isDisconnected = function() {
/**
* Connect pool
*/
Pool.prototype.connect = function() {
Pool.prototype.connect = function(callback) {
if (this.state !== DISCONNECTED) {
throw new MongoError('connection in unlawful state ' + this.state);
}

stateTransition(this, CONNECTING);
createConnection(this, (err, conn) => {
if (err) {
if (typeof callback === 'function') {
this.destroy();
callback(err);
return;
}

if (this.state === CONNECTING) {
this.emit('error', err);
}
Expand All @@ -554,14 +567,19 @@ Pool.prototype.connect = function() {
}

stateTransition(this, CONNECTED);
this.emit('connect', this, conn);

// create min connections
if (this.minSize) {
for (let i = 0; i < this.minSize; i++) {
createConnection(this);
}
}

if (typeof callback === 'function') {
callback(null, conn);
} else {
this.emit('connect', this, conn);
}
});
};

Expand Down Expand Up @@ -596,6 +614,8 @@ Pool.prototype.unref = function() {

// Destroy the connections
function destroy(self, connections, options, callback) {
stateTransition(self, DESTROYING);

eachAsync(
connections,
(conn, cb) => {
Expand Down Expand Up @@ -626,14 +646,19 @@ function destroy(self, connections, options, callback) {
*/
Pool.prototype.destroy = function(force, callback) {
var self = this;
if (typeof force === 'function') {
callback = force;
force = false;
}

// Do not try again if the pool is already dead
if (this.state === DESTROYED || self.state === DESTROYING) {
if (typeof callback === 'function') callback(null, null);
return;
}

// Set state to destroyed
stateTransition(this, DESTROYING);
// Set state to draining
stateTransition(this, DRAINING);

// Are we force closing
if (force) {
Expand All @@ -660,6 +685,14 @@ Pool.prototype.destroy = function(force, callback) {

// Wait for the operations to drain before we close the pool
function checkStatus() {
if (self.state === DESTROYED || self.state === DESTROYING) {
if (typeof callback === 'function') {
callback();
}

return;
}

flushMonitoringOperations(self.queue);

if (self.queue.length === 0) {
Expand All @@ -676,7 +709,6 @@ Pool.prototype.destroy = function(force, callback) {
}

destroy(self, connections, { force: false }, callback);
// } else if (self.queue.length > 0 && !this.reconnectId) {
} else {
// Ensure we empty the queue
_execute(self)();
Expand All @@ -695,6 +727,14 @@ Pool.prototype.destroy = function(force, callback) {
* @param {function} [callback]
*/
Pool.prototype.reset = function(callback) {
if (this.s.state !== CONNECTED) {
if (typeof callback === 'function') {
callback(new MongoError('pool is not connected, reset aborted'));
}

return;
}

const connections = this.availableConnections.concat(this.inUseConnections);
eachAsync(
connections,
Expand All @@ -715,12 +755,12 @@ Pool.prototype.reset = function(callback) {

resetPoolState(this);

// create an initial connection, and kick off execution again
createConnection(this);

if (typeof callback === 'function') {
callback(null, null);
}
// create a new connection, this will ultimately trigger execution
createConnection(this, () => {
if (typeof callback === 'function') {
callback(null, null);
}
});
}
);
};
Expand Down Expand Up @@ -788,17 +828,12 @@ Pool.prototype.write = function(command, options, cb) {

// Pool was destroyed error out
if (this.state === DESTROYED || this.state === DESTROYING) {
// Callback with an error
if (cb) {
try {
cb(new MongoError('pool destroyed'));
} catch (err) {
process.nextTick(function() {
throw err;
});
}
}
cb(new MongoError('pool destroyed'));
return;
}

if (this.state === DRAINING) {
cb(new MongoError('pool is draining, new operations prohibited'));
return;
}

Expand Down Expand Up @@ -846,10 +881,6 @@ Pool.prototype.write = function(command, options, cb) {
// Optional per operation socketTimeout
operation.socketTimeout = options.socketTimeout;
operation.monitoring = options.monitoring;
// Custom socket Timeout
if (options.socketTimeout) {
operation.socketTimeout = options.socketTimeout;
}

// Get the requestId
operation.requestId = command.requestId;
Expand Down Expand Up @@ -948,15 +979,6 @@ function createConnection(pool, callback) {
pool.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
}

if (pool.options.legacyCompatMode === false) {
// The unified topology uses the reported `error` from a pool to track what error
// reason is returned to the user during selection timeout. We only want to emit
// this if the pool is active because the listeners are removed on destruction.
if (pool.state !== DESTROYED && pool.state !== DESTROYING) {
pool.emit('error', err);
}
}

// check if reconnect is enabled, and attempt retry if so
if (!pool.reconnectId && pool.options.reconnect) {
if (pool.state === CONNECTING && pool.options.legacyCompatMode) {
Expand Down Expand Up @@ -1047,6 +1069,12 @@ function _execute(self) {
if (self.availableConnections.length === 0) {
// Flush any monitoring operations
flushMonitoringOperations(self.queue);

// Try to create a new connection to execute stuck operation
if (totalConnections < self.options.size && self.queue.length > 0) {
createConnection(self);
}

break;
}

Expand Down Expand Up @@ -1111,10 +1139,7 @@ function _execute(self) {
}

// Re-execute the operation
setTimeout(function() {
_execute(self)();
}, 10);

setTimeout(() => _execute(self)(), 10);
break;
}
}
Expand Down
6 changes: 3 additions & 3 deletions lib/core/sdam/monitoring.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ const ServerDescription = require('./server_description').ServerDescription;
const calculateDurationInMs = require('../utils').calculateDurationInMs;

// pulled from `Server` implementation
const STATE_DISCONNECTED = 'disconnected';
const STATE_DISCONNECTING = 'disconnecting';
const STATE_CLOSED = 'closed';
const STATE_CLOSING = 'closing';

/**
* Published when server description changes, but does NOT include changes to the RTT.
Expand Down Expand Up @@ -180,7 +180,7 @@ function monitorServer(server, options) {

// emit an event indicating that our description has changed
server.emit('descriptionReceived', new ServerDescription(server.description.address, isMaster));
if (server.s.state === STATE_DISCONNECTED || server.s.state === STATE_DISCONNECTING) {
if (server.s.state === STATE_CLOSED || server.s.state === STATE_CLOSING) {
return;
}

Expand Down
Loading