diff --git a/lib/client.js b/lib/client.js index feedebbd8..432443793 100644 --- a/lib/client.js +++ b/lib/client.js @@ -36,6 +36,7 @@ var Client = function (config) { this._connecting = false this._connected = false this._connectionError = false + this._queryable = true this.connection = c.connection || new Connection({ stream: c.stream, @@ -52,16 +53,31 @@ var Client = function (config) { util.inherits(Client, EventEmitter) -Client.prototype.connect = function (callback) { +Client.prototype._errorAllQueries = function (err) { + const enqueueError = (query) => { + process.nextTick(() => { + query.handleError(err, this.connection) + }) + } + + if (this.activeQuery) { + enqueueError(this.activeQuery) + this.activeQuery = null + } + + this.queryQueue.forEach(enqueueError) + this.queryQueue.length = 0 +} + +Client.prototype._connect = function (callback) { var self = this var con = this.connection if (this._connecting || this._connected) { const err = new Error('Client has already been connected. You cannot reuse a client.') - if (callback) { + process.nextTick(() => { callback(err) - return undefined - } - return Promise.reject(err) + }) + return } this._connecting = true @@ -126,15 +142,25 @@ Client.prototype.connect = function (callback) { } const connectedErrorHandler = (err) => { - if (this.activeQuery) { - var activeQuery = self.activeQuery - this.activeQuery = null - return activeQuery.handleError(err, con) - } + this._queryable = false + this._errorAllQueries(err) this.emit('error', err) } + const connectedErrorMessageHandler = (msg) => { + const activeQuery = this.activeQuery + + if (!activeQuery) { + connectedErrorHandler(msg) + return + } + + this.activeQuery = null + activeQuery.handleError(msg, con) + } + con.on('error', connectingErrorHandler) + con.on('errorMessage', connectingErrorHandler) // hook up query handling events to connection // after the connection initially becomes ready for queries @@ -143,7 +169,9 @@ Client.prototype.connect = function (callback) { self._connected = true self._attachListeners(con) con.removeListener('error', connectingErrorHandler) + con.removeListener('errorMessage', connectingErrorHandler) con.on('error', connectedErrorHandler) + con.on('errorMessage', connectedErrorMessageHandler) // process possible callback argument to Client#connect if (callback) { @@ -166,43 +194,53 @@ Client.prototype.connect = function (callback) { }) con.once('end', () => { - if (this.activeQuery) { - var disconnectError = new Error('Connection terminated') - this.activeQuery.handleError(disconnectError, con) - this.activeQuery = null - } + const error = this._ending + ? new Error('Connection terminated') + : new Error('Connection terminated unexpectedly') + + this._errorAllQueries(error) + if (!this._ending) { // if the connection is ended without us calling .end() // on this client then we have an unexpected disconnection // treat this as an error unless we've already emitted an error // during connection. - const error = new Error('Connection terminated unexpectedly') if (this._connecting && !this._connectionError) { if (callback) { callback(error) } else { - this.emit('error', error) + connectedErrorHandler(error) } } else if (!this._connectionError) { - this.emit('error', error) + connectedErrorHandler(error) } } - this.emit('end') + + process.nextTick(() => { + this.emit('end') + }) }) con.on('notice', function (msg) { self.emit('notice', msg) }) +} - if (!callback) { - return new global.Promise((resolve, reject) => { - this.once('error', reject) - this.once('connect', () => { - this.removeListener('error', reject) +Client.prototype.connect = function (callback) { + if (callback) { + this._connect(callback) + return + } + + return new Promise((resolve, reject) => { + this._connect((error) => { + if (error) { + reject(error) + } else { resolve() - }) + } }) - } + }) } Client.prototype._attachListeners = function (con) { @@ -340,7 +378,15 @@ Client.prototype._pulseQueryQueue = function () { if (this.activeQuery) { this.readyForQuery = false this.hasExecuted = true - this.activeQuery.submit(this.connection) + + const queryError = this.activeQuery.submit(this.connection) + if (queryError) { + process.nextTick(() => { + this.activeQuery.handleError(queryError, this.connection) + this.readyForQuery = true + this._pulseQueryQueue() + }) + } } else if (this.hasExecuted) { this.activeQuery = null this.emit('drain') @@ -379,6 +425,20 @@ Client.prototype.query = function (config, values, callback) { query._result._getTypeParser = this._types.getTypeParser.bind(this._types) } + if (!this._queryable) { + process.nextTick(() => { + query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) + }) + return result + } + + if (this._ending) { + process.nextTick(() => { + query.handleError(new Error('Client was closed and is not queryable'), this.connection) + }) + return result + } + this.queryQueue.push(query) this._pulseQueryQueue() return result @@ -386,18 +446,19 @@ Client.prototype.query = function (config, values, callback) { Client.prototype.end = function (cb) { this._ending = true + if (this.activeQuery) { // if we have an active query we need to force a disconnect // on the socket - otherwise a hung query could block end forever - this.connection.stream.destroy(new Error('Connection terminated by user')) - return cb ? cb() : Promise.resolve() + this.connection.stream.destroy() + } else { + this.connection.end() } + if (cb) { - this.connection.end() this.connection.once('end', cb) } else { - return new global.Promise((resolve, reject) => { - this.connection.end() + return new Promise((resolve) => { this.connection.once('end', resolve) }) } diff --git a/lib/connection.js b/lib/connection.js index 799ab4ed8..177739c32 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -117,10 +117,11 @@ Connection.prototype.attachListeners = function (stream) { var packet = self._reader.read() while (packet) { var msg = self.parseMessage(packet) + var eventName = msg.name === 'error' ? 'errorMessage' : msg.name if (self._emitMessage) { self.emit('message', msg) } - self.emit(msg.name, msg) + self.emit(eventName, msg) packet = self._reader.read() } }) diff --git a/lib/native/client.js b/lib/native/client.js index bed548ad8..b18ff6ffa 100644 --- a/lib/native/client.js +++ b/lib/native/client.js @@ -32,8 +32,10 @@ var Client = module.exports = function (config) { }) this._queryQueue = [] - this._connected = false + this._ending = false this._connecting = false + this._connected = false + this._queryable = true // keep these on the object for legacy reasons // for the time being. TODO: deprecate all this jazz @@ -52,50 +54,48 @@ Client.Query = NativeQuery util.inherits(Client, EventEmitter) +Client.prototype._errorAllQueries = function (err) { + const enqueueError = (query) => { + process.nextTick(() => { + query.native = this.native + query.handleError(err) + }) + } + + if (this._hasActiveQuery()) { + enqueueError(this._activeQuery) + this._activeQuery = null + } + + this._queryQueue.forEach(enqueueError) + this._queryQueue.length = 0 +} + // connect to the backend // pass an optional callback to be called once connected // or with an error if there was a connection error -// if no callback is passed and there is a connection error -// the client will emit an error event. -Client.prototype.connect = function (cb) { +Client.prototype._connect = function (cb) { var self = this - var onError = function (err) { - if (cb) return cb(err) - return self.emit('error', err) - } - - var result - if (!cb) { - var resolveOut, rejectOut - cb = (err) => err ? rejectOut(err) : resolveOut() - result = new global.Promise(function (resolve, reject) { - resolveOut = resolve - rejectOut = reject - }) - } - if (this._connecting) { process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.'))) - return result + return } this._connecting = true this.connectionParameters.getLibpqConnectionString(function (err, conString) { - if (err) return onError(err) + if (err) return cb(err) self.native.connect(conString, function (err) { - if (err) return onError(err) + if (err) return cb(err) // set internal states to connected self._connected = true // handle connection errors from the native layer self.native.on('error', function (err) { - // error will be handled by active query - if (self._activeQuery && self._activeQuery.state !== 'end') { - return - } + self._queryable = false + self._errorAllQueries(err) self.emit('error', err) }) @@ -110,12 +110,26 @@ Client.prototype.connect = function (cb) { self.emit('connect') self._pulseQueryQueue(true) - // possibly call the optional callback - if (cb) cb() + cb() }) }) +} - return result +Client.prototype.connect = function (callback) { + if (callback) { + this._connect(callback) + return + } + + return new Promise((resolve, reject) => { + this._connect((error) => { + if (error) { + reject(error) + } else { + resolve() + } + }) + }) } // send a query to the server @@ -129,26 +143,43 @@ Client.prototype.connect = function (cb) { // optional string rowMode = 'array' for an array of results // } Client.prototype.query = function (config, values, callback) { + var query + var result + if (typeof config.submit === 'function') { + result = query = config // accept query(new Query(...), (err, res) => { }) style if (typeof values === 'function') { config.callback = values } - this._queryQueue.push(config) - this._pulseQueryQueue() - return config + } else { + query = new NativeQuery(config, values, callback) + if (!query.callback) { + let resolveOut, rejectOut + result = new Promise((resolve, reject) => { + resolveOut = resolve + rejectOut = reject + }) + query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res) + } } - var query = new NativeQuery(config, values, callback) - var result - if (!query.callback) { - let resolveOut, rejectOut - result = new Promise((resolve, reject) => { - resolveOut = resolve - rejectOut = reject + if (!this._queryable) { + query.native = this.native + process.nextTick(() => { + query.handleError(new Error('Client has encountered a connection error and is not queryable')) + }) + return result + } + + if (this._ending) { + query.native = this.native + process.nextTick(() => { + query.handleError(new Error('Client was closed and is not queryable')) }) - query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res) + return result } + this._queryQueue.push(query) this._pulseQueryQueue() return result @@ -157,6 +188,9 @@ Client.prototype.query = function (config, values, callback) { // disconnect from the backend server Client.prototype.end = function (cb) { var self = this + + this._ending = true + if (!this._connected) { this.once('connect', this.end.bind(this, cb)) } @@ -170,14 +204,12 @@ Client.prototype.end = function (cb) { }) } this.native.end(function () { - // send an error to the active query - if (self._hasActiveQuery()) { - var msg = 'Connection terminated' - self._queryQueue.length = 0 - self._activeQuery.handleError(new Error(msg)) - } - self.emit('end') - if (cb) cb() + self._errorAllQueries(new Error('Connection terminated')) + + process.nextTick(() => { + self.emit('end') + if (cb) cb() + }) }) return result } diff --git a/lib/query.js b/lib/query.js index fe82061e3..94c2bc3c1 100644 --- a/lib/query.js +++ b/lib/query.js @@ -146,22 +146,17 @@ Query.prototype.handleError = function (err, connection) { Query.prototype.submit = function (connection) { if (typeof this.text !== 'string' && typeof this.name !== 'string') { - const err = new Error('A query must have either text or a name. Supplying neither is unsupported.') - connection.emit('error', err) - connection.emit('readyForQuery') - return + return new Error('A query must have either text or a name. Supplying neither is unsupported.') } if (this.values && !Array.isArray(this.values)) { - const err = new Error('Query values must be an array') - connection.emit('error', err) - connection.emit('readyForQuery') - return + return new Error('Query values must be an array') } if (this.requiresPreparation()) { this.prepare(connection) } else { connection.query(this.text) } + return null } Query.prototype.hasBeenParsed = function (connection) { diff --git a/test/integration/client/error-handling-tests.js b/test/integration/client/error-handling-tests.js index 55372d141..97b0ce83f 100644 --- a/test/integration/client/error-handling-tests.js +++ b/test/integration/client/error-handling-tests.js @@ -50,6 +50,18 @@ suite.test('re-using connections results in promise rejection', (done) => { }) }) +suite.test('using a client after closing it results in error', (done) => { + const client = new Client() + client.connect(() => { + client.end(assert.calls(() => { + client.query('SELECT 1', assert.calls((err) => { + assert.equal(err.message, 'Client was closed and is not queryable') + done() + })) + })) + }) +}) + suite.test('query receives error on client shutdown', function (done) { var client = new Client() client.connect(assert.success(function () { @@ -139,6 +151,9 @@ suite.test('when connecting to an invalid host with callback', function (done) { var client = new Client({ user: 'very invalid username' }) + client.on('error', () => { + assert.fail('unexpected error event when connecting') + }) client.connect(function (error, client) { assert(error instanceof Error) done() @@ -149,6 +164,9 @@ suite.test('when connecting to invalid host with promise', function (done) { var client = new Client({ user: 'very invalid username' }) + client.on('error', () => { + assert.fail('unexpected error event when connecting') + }) client.connect().catch((e) => done()) }) diff --git a/test/integration/connection-pool/error-tests.js b/test/integration/connection-pool/error-tests.js index 17cdcc47f..cadffe3db 100644 --- a/test/integration/connection-pool/error-tests.js +++ b/test/integration/connection-pool/error-tests.js @@ -2,9 +2,6 @@ var helper = require('./test-helper') const pg = helper.pg -// make pool hold 2 clients -const pool = new pg.Pool({ max: 2 }) - const suite = new helper.Suite() suite.test('connecting to invalid port', (cb) => { const pool = new pg.Pool({ port: 13801 }) @@ -12,6 +9,8 @@ suite.test('connecting to invalid port', (cb) => { }) suite.test('errors emitted on pool', (cb) => { + // make pool hold 2 clients + const pool = new pg.Pool({ max: 2 }) // get first client pool.connect(assert.success(function (client, done) { client.id = 1 @@ -46,3 +45,57 @@ suite.test('errors emitted on pool', (cb) => { }) })) }) + +suite.test('connection-level errors cause queued queries to fail', (cb) => { + const pool = new pg.Pool() + pool.connect(assert.success((client, done) => { + client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => { + if (helper.args.native) { + assert.ok(err) + } else { + assert.equal(err.code, '57P01') + } + })) + + pool.once('error', assert.calls((err, brokenClient) => { + assert.equal(client, brokenClient) + })) + + client.query('SELECT 1', assert.calls((err) => { + if (helper.args.native) { + assert.ok(err) + } else { + assert.equal(err.message, 'Connection terminated unexpectedly') + } + + done() + pool.end() + cb() + })) + })) +}) + +suite.test('connection-level errors cause future queries to fail', (cb) => { + const pool = new pg.Pool() + pool.connect(assert.success((client, done) => { + client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => { + if (helper.args.native) { + assert.ok(err) + } else { + assert.equal(err.code, '57P01') + } + })) + + pool.once('error', assert.calls((err, brokenClient) => { + assert.equal(client, brokenClient) + + client.query('SELECT 1', assert.calls((err) => { + assert.equal(err.message, 'Client has encountered a connection error and is not queryable') + + done() + pool.end() + cb() + })) + })) + })) +})