From 33da6f768dbb8f37cb450402644fac315a30364e Mon Sep 17 00:00:00 2001 From: Thomas Praxl Date: Fri, 27 Sep 2024 15:43:10 +0200 Subject: [PATCH 1/9] fix(lib): circular dependency index.js <-> lib pool.js and pool_connection.js required index.js, and index.js required pool.js and pool_connection.js. Circular dependency can cause all sorts of problems. This fix aims to provide a step towards fixing a problem with @opentelemetry/instrumentation-mysql2, which looses several exports when using its patched require. For instance, `format` is lost and can cause an instrumented application to crash. --- lib/pool.js | 9 ++++----- lib/pool_connection.js | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/pool.js b/lib/pool.js index dc638d743e..7f73db8b00 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -1,8 +1,7 @@ 'use strict'; const process = require('process'); -const mysql = require('../index.js'); - +const SqlString = require('sqlstring'); const EventEmitter = require('events').EventEmitter; const PoolConnection = require('./pool_connection.js'); const Queue = require('denque'); @@ -214,7 +213,7 @@ class Pool extends EventEmitter { } format(sql, values) { - return mysql.format( + return SqlString.format( sql, values, this.config.connectionConfig.stringifyObjects, @@ -223,7 +222,7 @@ class Pool extends EventEmitter { } escape(value) { - return mysql.escape( + return SqlString.escape( value, this.config.connectionConfig.stringifyObjects, this.config.connectionConfig.timezone @@ -231,7 +230,7 @@ class Pool extends EventEmitter { } escapeId(value) { - return mysql.escapeId(value, false); + return SqlString.escapeId(value, false); } } diff --git a/lib/pool_connection.js b/lib/pool_connection.js index 78aac6d6b8..5760f9a468 100644 --- a/lib/pool_connection.js +++ b/lib/pool_connection.js @@ -1,6 +1,6 @@ 'use strict'; -const Connection = require('../index.js').Connection; +const Connection = require('./connection.js'); class PoolConnection extends Connection { constructor(pool, options) { From 40053ad8097289a736f59fdb3457dea1c14f5e98 Mon Sep 17 00:00:00 2001 From: Thomas Praxl Date: Fri, 27 Sep 2024 18:01:07 +0200 Subject: [PATCH 2/9] fix: circular dependency index.js <-> promise.js index.js and promise.js require each other. Circular dependency can cause all sorts of problems. This commit aims to fix a problem with @opentelemetry/instrumentation-mysql2, which looses several exports when using its patched require. For instance, `format` is lost and can cause an instrumented application to crash. --- common.js | 19 +++++++++++++++++++ index.js | 20 ++++++-------------- package.json | 1 + promise.js | 34 +++++++++++++++++++--------------- 4 files changed, 45 insertions(+), 29 deletions(-) create mode 100644 common.js diff --git a/common.js b/common.js new file mode 100644 index 0000000000..66cd0b2bbb --- /dev/null +++ b/common.js @@ -0,0 +1,19 @@ +'use strict'; + +const Connection = require('./lib/connection.js'); +const ConnectionConfig = require('./lib/connection_config.js'); +const Pool = require('./lib/pool'); +const PoolConfig = require('./lib/pool_config'); +const PoolCluster = require('./lib/pool_cluster'); + +exports.Connection = Connection; +exports.createConnection = function(opts) { + return new Connection({ config: new ConnectionConfig(opts) }); +}; +exports.createPool = function(config) { + return new Pool({ config: new PoolConfig(config) }); +}; +exports.createPoolCluster = function(config) { + const PoolCluster = require('./lib/pool_cluster.js'); + return new PoolCluster(config); +}; diff --git a/index.js b/index.js index f33455e3f4..928eec4d42 100644 --- a/index.js +++ b/index.js @@ -2,32 +2,24 @@ const SqlString = require('sqlstring'); -const Connection = require('./lib/connection.js'); const ConnectionConfig = require('./lib/connection_config.js'); const parserCache = require('./lib/parsers/parser_cache'); -exports.createConnection = function(opts) { - return new Connection({ config: new ConnectionConfig(opts) }); -}; +const common = require('./common'); +exports.createConnection = common.createConnection exports.connect = exports.createConnection; -exports.Connection = Connection; +exports.Connection = common.Connection; exports.ConnectionConfig = ConnectionConfig; const Pool = require('./lib/pool.js'); const PoolCluster = require('./lib/pool_cluster.js'); -exports.createPool = function(config) { - const PoolConfig = require('./lib/pool_config.js'); - return new Pool({ config: new PoolConfig(config) }); -}; +exports.createPool = common.createPool -exports.createPoolCluster = function(config) { - const PoolCluster = require('./lib/pool_cluster.js'); - return new PoolCluster(config); -}; +exports.createPoolCluster = common.createPoolCluster; -exports.createQuery = Connection.createQuery; +exports.createQuery = common.Connection.createQuery; exports.Pool = Pool; diff --git a/package.json b/package.json index b05dd0b628..07ab95d93f 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "typings/mysql", "index.js", "index.d.ts", + "common.js", "promise.js", "promise.d.ts" ], diff --git a/promise.js b/promise.js index 5d80e21e72..04eaff6433 100644 --- a/promise.js +++ b/promise.js @@ -1,8 +1,12 @@ 'use strict'; -const core = require('./index.js'); +const SqlString = require('sqlstring'); +const common = require('./common.js'); const EventEmitter = require('events').EventEmitter; const parserCache = require('./lib/parsers/parser_cache.js'); +const PoolConnection = require('./lib/pool_connection.js'); +const Pool = require('./lib/pool.js'); +const PoolCluster = require('./lib/pool_cluster.js'); function makeDoneCb(resolve, reject, localErr) { return function (err, rows, fields) { @@ -249,7 +253,7 @@ class PromiseConnection extends EventEmitter { } function createConnection(opts) { - const coreConnection = core.createConnection(opts); + const coreConnection = common.createConnection(opts); const createConnectionErr = new Error(); const thePromise = opts.Promise || Promise; if (!thePromise) { @@ -286,12 +290,12 @@ function createConnection(opts) { const func = functionsToWrap[i]; if ( - typeof core.Connection.prototype[func] === 'function' && + typeof common.Connection.prototype[func] === 'function' && PromiseConnection.prototype[func] === undefined ) { PromiseConnection.prototype[func] = (function factory(funcName) { return function () { - return core.Connection.prototype[funcName].apply( + return common.Connection.prototype[funcName].apply( this.connection, arguments ); @@ -319,7 +323,7 @@ class PromisePoolConnection extends PromiseConnection { } destroy() { - return core.PoolConnection.prototype.destroy.apply( + return PoolConnection.prototype.destroy.apply( this.connection, arguments ); @@ -408,7 +412,7 @@ class PromisePool extends EventEmitter { } function createPool(opts) { - const corePool = core.createPool(opts); + const corePool = common.createPool(opts); const thePromise = opts.Promise || Promise; if (!thePromise) { throw new Error( @@ -426,12 +430,12 @@ function createPool(opts) { const func = functionsToWrap[i]; if ( - typeof core.Pool.prototype[func] === 'function' && + typeof Pool.prototype[func] === 'function' && PromisePool.prototype[func] === undefined ) { PromisePool.prototype[func] = (function factory(funcName) { return function () { - return core.Pool.prototype[funcName].apply(this.pool, arguments); + return Pool.prototype[funcName].apply(this.pool, arguments); }; })(func); } @@ -527,12 +531,12 @@ class PromisePoolCluster extends EventEmitter { const func = functionsToWrap[i]; if ( - typeof core.PoolCluster.prototype[func] === 'function' && + typeof PoolCluster.prototype[func] === 'function' && PromisePoolCluster.prototype[func] === undefined ) { PromisePoolCluster.prototype[func] = (function factory(funcName) { return function () { - return core.PoolCluster.prototype[funcName].apply(this.poolCluster, arguments); + return PoolCluster.prototype[funcName].apply(this.poolCluster, arguments); }; })(func); } @@ -542,7 +546,7 @@ class PromisePoolCluster extends EventEmitter { ]); function createPoolCluster(opts) { - const corePoolCluster = core.createPoolCluster(opts); + const corePoolCluster = common.createPoolCluster(opts); const thePromise = (opts && opts.Promise) || Promise; if (!thePromise) { throw new Error( @@ -557,10 +561,10 @@ function createPoolCluster(opts) { exports.createConnection = createConnection; exports.createPool = createPool; exports.createPoolCluster = createPoolCluster; -exports.escape = core.escape; -exports.escapeId = core.escapeId; -exports.format = core.format; -exports.raw = core.raw; +exports.escape = SqlString.escape; +exports.escapeId = SqlString.escapeId; +exports.format = SqlString.format; +exports.raw = SqlString.raw; exports.PromisePool = PromisePool; exports.PromiseConnection = PromiseConnection; exports.PromisePoolConnection = PromisePoolConnection; From fbb8ab11333b075a098093328e06bdc59fdef87a Mon Sep 17 00:00:00 2001 From: Thomas Praxl Date: Sat, 28 Sep 2024 13:50:55 +0200 Subject: [PATCH 3/9] refactor: split common.js into lib modules The proposal to put common.js into lib was good, but it felt weird to arbitrarily stuff just some exported functions in lib/common.js. This made sense when common.js was meant to provide commons for index.js and promise.js. But in the lib, this felt like a weird scope. I therefore split common.js into - lib/create_connection.js - lib/create_pool.js - lib/create_pool_cluster.js Also made `require` more consistent in all affected files: all `require` files now have a js suffix when they refer to a single local file. --- common.js | 19 ------------------- index.js | 18 ++++++++++-------- lib/create_connection.js | 10 ++++++++++ lib/create_pool.js | 10 ++++++++++ lib/create_pool_cluster.js | 9 +++++++++ package.json | 1 - promise.js | 27 +++++++++++++++------------ 7 files changed, 54 insertions(+), 40 deletions(-) delete mode 100644 common.js create mode 100644 lib/create_connection.js create mode 100644 lib/create_pool.js create mode 100644 lib/create_pool_cluster.js diff --git a/common.js b/common.js deleted file mode 100644 index 66cd0b2bbb..0000000000 --- a/common.js +++ /dev/null @@ -1,19 +0,0 @@ -'use strict'; - -const Connection = require('./lib/connection.js'); -const ConnectionConfig = require('./lib/connection_config.js'); -const Pool = require('./lib/pool'); -const PoolConfig = require('./lib/pool_config'); -const PoolCluster = require('./lib/pool_cluster'); - -exports.Connection = Connection; -exports.createConnection = function(opts) { - return new Connection({ config: new ConnectionConfig(opts) }); -}; -exports.createPool = function(config) { - return new Pool({ config: new PoolConfig(config) }); -}; -exports.createPoolCluster = function(config) { - const PoolCluster = require('./lib/pool_cluster.js'); - return new PoolCluster(config); -}; diff --git a/index.js b/index.js index 928eec4d42..0d7179c2a7 100644 --- a/index.js +++ b/index.js @@ -3,23 +3,25 @@ const SqlString = require('sqlstring'); const ConnectionConfig = require('./lib/connection_config.js'); -const parserCache = require('./lib/parsers/parser_cache'); +const parserCache = require('./lib/parsers/parser_cache.js'); -const common = require('./common'); -exports.createConnection = common.createConnection +const Connection = require('./lib/connection.js'); +exports.createConnection = require('./lib/create_connection.js'); exports.connect = exports.createConnection; -exports.Connection = common.Connection; +exports.Connection = Connection exports.ConnectionConfig = ConnectionConfig; const Pool = require('./lib/pool.js'); const PoolCluster = require('./lib/pool_cluster.js'); +const createPool = require('./lib/create_pool.js'); +const createPoolCluster = require('./lib/create_pool_cluster.js'); -exports.createPool = common.createPool +exports.createPool = createPool -exports.createPoolCluster = common.createPoolCluster; +exports.createPoolCluster = createPoolCluster; -exports.createQuery = common.Connection.createQuery; +exports.createQuery = Connection.createQuery; exports.Pool = Pool; @@ -34,7 +36,7 @@ exports.createServer = function(handler) { return s; }; -exports.PoolConnection = require('./lib/pool_connection'); +exports.PoolConnection = require('./lib/pool_connection.js'); exports.authPlugins = require('./lib/auth_plugins'); exports.escape = SqlString.escape; exports.escapeId = SqlString.escapeId; diff --git a/lib/create_connection.js b/lib/create_connection.js new file mode 100644 index 0000000000..d39618f78f --- /dev/null +++ b/lib/create_connection.js @@ -0,0 +1,10 @@ +'use strict'; + +const Connection = require('./connection.js'); +const ConnectionConfig = require('./connection_config.js'); + +function createConnection(opts) { + return new Connection({ config: new ConnectionConfig(opts) }); +} + +module.exports = createConnection; diff --git a/lib/create_pool.js b/lib/create_pool.js new file mode 100644 index 0000000000..7f782aff9a --- /dev/null +++ b/lib/create_pool.js @@ -0,0 +1,10 @@ +'use strict'; + +const Pool = require('./pool.js'); +const PoolConfig = require('./pool_config.js'); + +function createPool(config) { + return new Pool({ config: new PoolConfig(config) }); +} + +module.exports = createPool diff --git a/lib/create_pool_cluster.js b/lib/create_pool_cluster.js new file mode 100644 index 0000000000..4ded78b783 --- /dev/null +++ b/lib/create_pool_cluster.js @@ -0,0 +1,9 @@ +'use strict'; + +const PoolCluster = require('./pool_cluster.js'); + +function createPoolCluster(config) { + return new PoolCluster(config); +} + +module.exports = createPoolCluster; diff --git a/package.json b/package.json index 07ab95d93f..b05dd0b628 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,6 @@ "typings/mysql", "index.js", "index.d.ts", - "common.js", "promise.js", "promise.d.ts" ], diff --git a/promise.js b/promise.js index 04eaff6433..32cd8dc4c2 100644 --- a/promise.js +++ b/promise.js @@ -1,12 +1,15 @@ 'use strict'; const SqlString = require('sqlstring'); -const common = require('./common.js'); const EventEmitter = require('events').EventEmitter; const parserCache = require('./lib/parsers/parser_cache.js'); const PoolConnection = require('./lib/pool_connection.js'); const Pool = require('./lib/pool.js'); const PoolCluster = require('./lib/pool_cluster.js'); +const createConnection = require('./lib/create_connection.js'); +const Connection = require('./lib/connection.js'); +const createPool = require('./lib/create_pool.js'); +const createPoolCluster = require('./lib/create_pool_cluster.js'); function makeDoneCb(resolve, reject, localErr) { return function (err, rows, fields) { @@ -252,8 +255,8 @@ class PromiseConnection extends EventEmitter { } } -function createConnection(opts) { - const coreConnection = common.createConnection(opts); +function createConnectionPromise(opts) { + const coreConnection = createConnection(opts); const createConnectionErr = new Error(); const thePromise = opts.Promise || Promise; if (!thePromise) { @@ -290,12 +293,12 @@ function createConnection(opts) { const func = functionsToWrap[i]; if ( - typeof common.Connection.prototype[func] === 'function' && + typeof Connection.prototype[func] === 'function' && PromiseConnection.prototype[func] === undefined ) { PromiseConnection.prototype[func] = (function factory(funcName) { return function () { - return common.Connection.prototype[funcName].apply( + return Connection.prototype[funcName].apply( this.connection, arguments ); @@ -411,8 +414,8 @@ class PromisePool extends EventEmitter { } } -function createPool(opts) { - const corePool = common.createPool(opts); +function createPromisePool(opts) { + const corePool = createPool(opts); const thePromise = opts.Promise || Promise; if (!thePromise) { throw new Error( @@ -545,8 +548,8 @@ class PromisePoolCluster extends EventEmitter { 'add' ]); -function createPoolCluster(opts) { - const corePoolCluster = common.createPoolCluster(opts); +function createPromisePoolCluster(opts) { + const corePoolCluster = createPoolCluster(opts); const thePromise = (opts && opts.Promise) || Promise; if (!thePromise) { throw new Error( @@ -558,9 +561,9 @@ function createPoolCluster(opts) { return new PromisePoolCluster(corePoolCluster, thePromise); } -exports.createConnection = createConnection; -exports.createPool = createPool; -exports.createPoolCluster = createPoolCluster; +exports.createConnection = createConnectionPromise; +exports.createPool = createPromisePool; +exports.createPoolCluster = createPromisePoolCluster; exports.escape = SqlString.escape; exports.escapeId = SqlString.escapeId; exports.format = SqlString.format; From c11fc8d3e895be30feb1c8f4901008cf38b96191 Mon Sep 17 00:00:00 2001 From: Thomas Praxl Date: Thu, 3 Oct 2024 08:11:54 +0200 Subject: [PATCH 4/9] fix: circular dependency to promise.js promise.js was required by lib sources, and promise.js required the same lib sources eventually. Extracted the respective classes to lib. Also extracted functions that are shared between several of the new files. Decided to put each exported class / function into its own file. This may bloat the lib folder a bit, but it provides clarity (where to find what). --- lib/connection.js | 2 +- lib/inherit_events.js | 27 ++ lib/make_done_cb.js | 19 ++ lib/pool.js | 2 +- lib/pool_connection.js | 2 +- lib/promise_connection.js | 185 +++++++++++++ lib/promise_pool.js | 89 +++++++ lib/promise_pool_connection.js | 19 ++ lib/promise_prepared_statement_info.js | 32 +++ promise.js | 344 +------------------------ 10 files changed, 379 insertions(+), 342 deletions(-) create mode 100644 lib/inherit_events.js create mode 100644 lib/make_done_cb.js create mode 100644 lib/promise_connection.js create mode 100644 lib/promise_pool.js create mode 100644 lib/promise_pool_connection.js create mode 100644 lib/promise_prepared_statement_info.js diff --git a/lib/connection.js b/lib/connection.js index af6b3d9d68..8448522c97 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -156,7 +156,7 @@ class Connection extends EventEmitter { } promise(promiseImpl) { - const PromiseConnection = require('../promise').PromiseConnection; + const PromiseConnection = require('./promise_connection.js'); return new PromiseConnection(this, promiseImpl); } diff --git a/lib/inherit_events.js b/lib/inherit_events.js new file mode 100644 index 0000000000..47122aa16a --- /dev/null +++ b/lib/inherit_events.js @@ -0,0 +1,27 @@ +'use strict'; + +function inheritEvents(source, target, events) { + const listeners = {}; + target + .on('newListener', eventName => { + if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { + source.on( + eventName, + (listeners[eventName] = function () { + const args = [].slice.call(arguments); + args.unshift(eventName); + + target.emit.apply(target, args); + }) + ); + } + }) + .on('removeListener', eventName => { + if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { + source.removeListener(eventName, listeners[eventName]); + delete listeners[eventName]; + } + }); +} + +module.exports = inheritEvents diff --git a/lib/make_done_cb.js b/lib/make_done_cb.js new file mode 100644 index 0000000000..71de9b3797 --- /dev/null +++ b/lib/make_done_cb.js @@ -0,0 +1,19 @@ +'use strict'; + +function makeDoneCb(resolve, reject, localErr) { + return function (err, rows, fields) { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sql = err.sql; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve([rows, fields]); + } + }; +} + +module.exports = makeDoneCb diff --git a/lib/pool.js b/lib/pool.js index 7f73db8b00..534bb62aa2 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -33,7 +33,7 @@ class Pool extends EventEmitter { } promise(promiseImpl) { - const PromisePool = require('../promise').PromisePool; + const PromisePool = require('./promise_pool.js'); return new PromisePool(this, promiseImpl); } diff --git a/lib/pool_connection.js b/lib/pool_connection.js index 5760f9a468..f2e64f0755 100644 --- a/lib/pool_connection.js +++ b/lib/pool_connection.js @@ -30,7 +30,7 @@ class PoolConnection extends Connection { } promise(promiseImpl) { - const PromisePoolConnection = require('../promise').PromisePoolConnection; + const PromisePoolConnection = require('./promise_pool_connection.js'); return new PromisePoolConnection(this, promiseImpl); } diff --git a/lib/promise_connection.js b/lib/promise_connection.js new file mode 100644 index 0000000000..3b6c9426de --- /dev/null +++ b/lib/promise_connection.js @@ -0,0 +1,185 @@ +'use strict'; + +const PromisePreparedStatementInfo = require('./promise_prepared_statement_info.js'); +const makeDoneCb = require('./make_done_cb.js'); +const inheritEvents = require('./inherit_events.js'); +const EventEmitter = require('events').EventEmitter; + +class PromiseConnection extends EventEmitter { + constructor(connection, promiseImpl) { + super(); + this.connection = connection; + this.Promise = promiseImpl || Promise; + inheritEvents(connection, this, [ + 'error', + 'drain', + 'connect', + 'end', + 'enqueue' + ]); + } + + release() { + this.connection.release(); + } + + query(query, params) { + const c = this.connection; + const localErr = new Error(); + if (typeof params === 'function') { + throw new Error( + 'Callback function is not available with promise clients.' + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (params !== undefined) { + c.query(query, params, done); + } else { + c.query(query, done); + } + }); + } + + execute(query, params) { + const c = this.connection; + const localErr = new Error(); + if (typeof params === 'function') { + throw new Error( + 'Callback function is not available with promise clients.' + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (params !== undefined) { + c.execute(query, params, done); + } else { + c.execute(query, done); + } + }); + } + + end() { + return new this.Promise(resolve => { + this.connection.end(resolve); + }); + } + + beginTransaction() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + c.beginTransaction(done); + }); + } + + commit() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + c.commit(done); + }); + } + + rollback() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + c.rollback(done); + }); + } + + ping() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.ping(err => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(true); + } + }); + }); + } + + connect() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.connect((err, param) => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(param); + } + }); + }); + } + + prepare(options) { + const c = this.connection; + const promiseImpl = this.Promise; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.prepare(options, (err, statement) => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + const wrappedStatement = new PromisePreparedStatementInfo( + statement, + promiseImpl + ); + resolve(wrappedStatement); + } + }); + }); + } + + changeUser(options) { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.changeUser(options, err => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(); + } + }); + }); + } + + get config() { + return this.connection.config; + } + + get threadId() { + return this.connection.threadId; + } +} + +module.exports = PromiseConnection diff --git a/lib/promise_pool.js b/lib/promise_pool.js new file mode 100644 index 0000000000..b325ca3d67 --- /dev/null +++ b/lib/promise_pool.js @@ -0,0 +1,89 @@ +'use strict'; + +const EventEmitter = require('events').EventEmitter; +const makeDoneCb = require('./make_done_cb.js'); +const PromisePoolConnection = require('./promise_pool_connection.js'); +const inheritEvents = require('./inherit_events.js'); + +class PromisePool extends EventEmitter { + constructor(pool, thePromise) { + super(); + this.pool = pool; + this.Promise = thePromise || Promise; + inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']); + } + + getConnection() { + const corePool = this.pool; + return new this.Promise((resolve, reject) => { + corePool.getConnection((err, coreConnection) => { + if (err) { + reject(err); + } else { + resolve(new PromisePoolConnection(coreConnection, this.Promise)); + } + }); + }); + } + + releaseConnection(connection) { + if (connection instanceof PromisePoolConnection) connection.release(); + } + + query(sql, args) { + const corePool = this.pool; + const localErr = new Error(); + if (typeof args === 'function') { + throw new Error( + 'Callback function is not available with promise clients.' + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (args !== undefined) { + corePool.query(sql, args, done); + } else { + corePool.query(sql, done); + } + }); + } + + execute(sql, args) { + const corePool = this.pool; + const localErr = new Error(); + if (typeof args === 'function') { + throw new Error( + 'Callback function is not available with promise clients.' + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (args) { + corePool.execute(sql, args, done); + } else { + corePool.execute(sql, done); + } + }); + } + + end() { + const corePool = this.pool; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + corePool.end(err => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(); + } + }); + }); + } +} + +module.exports = PromisePool diff --git a/lib/promise_pool_connection.js b/lib/promise_pool_connection.js new file mode 100644 index 0000000000..88fa0e0c1f --- /dev/null +++ b/lib/promise_pool_connection.js @@ -0,0 +1,19 @@ +'use strict'; + +const PromiseConnection = require('./promise_connection.js'); +const PoolConnection = require('./pool_connection.js'); + +class PromisePoolConnection extends PromiseConnection { + constructor(connection, promiseImpl) { + super(connection, promiseImpl); + } + + destroy() { + return PoolConnection.prototype.destroy.apply( + this.connection, + arguments + ); + } +} + +module.exports = PromisePoolConnection diff --git a/lib/promise_prepared_statement_info.js b/lib/promise_prepared_statement_info.js new file mode 100644 index 0000000000..410f999d3f --- /dev/null +++ b/lib/promise_prepared_statement_info.js @@ -0,0 +1,32 @@ +'use strict'; + +const makeDoneCb = require('./make_done_cb.js'); + +class PromisePreparedStatementInfo { + constructor(statement, promiseImpl) { + this.statement = statement; + this.Promise = promiseImpl; + } + + execute(parameters) { + const s = this.statement; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (parameters) { + s.execute(parameters, done); + } else { + s.execute(done); + } + }); + } + + close() { + return new this.Promise(resolve => { + this.statement.close(); + resolve(); + }); + } +} + +module.exports = PromisePreparedStatementInfo diff --git a/promise.js b/promise.js index 32cd8dc4c2..2e2f5407a8 100644 --- a/promise.js +++ b/promise.js @@ -3,257 +3,17 @@ const SqlString = require('sqlstring'); const EventEmitter = require('events').EventEmitter; const parserCache = require('./lib/parsers/parser_cache.js'); -const PoolConnection = require('./lib/pool_connection.js'); const Pool = require('./lib/pool.js'); const PoolCluster = require('./lib/pool_cluster.js'); const createConnection = require('./lib/create_connection.js'); const Connection = require('./lib/connection.js'); const createPool = require('./lib/create_pool.js'); const createPoolCluster = require('./lib/create_pool_cluster.js'); - -function makeDoneCb(resolve, reject, localErr) { - return function (err, rows, fields) { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sql = err.sql; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve([rows, fields]); - } - }; -} - -function inheritEvents(source, target, events) { - const listeners = {}; - target - .on('newListener', eventName => { - if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { - source.on( - eventName, - (listeners[eventName] = function () { - const args = [].slice.call(arguments); - args.unshift(eventName); - - target.emit.apply(target, args); - }) - ); - } - }) - .on('removeListener', eventName => { - if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { - source.removeListener(eventName, listeners[eventName]); - delete listeners[eventName]; - } - }); -} - -class PromisePreparedStatementInfo { - constructor(statement, promiseImpl) { - this.statement = statement; - this.Promise = promiseImpl; - } - - execute(parameters) { - const s = this.statement; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (parameters) { - s.execute(parameters, done); - } else { - s.execute(done); - } - }); - } - - close() { - return new this.Promise(resolve => { - this.statement.close(); - resolve(); - }); - } -} - -class PromiseConnection extends EventEmitter { - constructor(connection, promiseImpl) { - super(); - this.connection = connection; - this.Promise = promiseImpl || Promise; - inheritEvents(connection, this, [ - 'error', - 'drain', - 'connect', - 'end', - 'enqueue' - ]); - } - - release() { - this.connection.release(); - } - - query(query, params) { - const c = this.connection; - const localErr = new Error(); - if (typeof params === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (params !== undefined) { - c.query(query, params, done); - } else { - c.query(query, done); - } - }); - } - - execute(query, params) { - const c = this.connection; - const localErr = new Error(); - if (typeof params === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (params !== undefined) { - c.execute(query, params, done); - } else { - c.execute(query, done); - } - }); - } - - end() { - return new this.Promise(resolve => { - this.connection.end(resolve); - }); - } - - beginTransaction() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - c.beginTransaction(done); - }); - } - - commit() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - c.commit(done); - }); - } - - rollback() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - c.rollback(done); - }); - } - - ping() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.ping(err => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(true); - } - }); - }); - } - - connect() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.connect((err, param) => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(param); - } - }); - }); - } - - prepare(options) { - const c = this.connection; - const promiseImpl = this.Promise; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.prepare(options, (err, statement) => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - const wrappedStatement = new PromisePreparedStatementInfo( - statement, - promiseImpl - ); - resolve(wrappedStatement); - } - }); - }); - } - - changeUser(options) { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.changeUser(options, err => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(); - } - }); - }); - } - - get config() { - return this.connection.config; - } - - get threadId() { - return this.connection.threadId; - } -} +const PromiseConnection = require('./lib/promise_connection.js'); +const PromisePool = require('./lib/promise_pool.js'); +const makeDoneCb = require('./lib/make_done_cb.js'); +const PromisePoolConnection = require('./lib/promise_pool_connection.js'); +const inheritEvents = require('./lib/inherit_events.js'); function createConnectionPromise(opts) { const coreConnection = createConnection(opts); @@ -320,100 +80,6 @@ function createConnectionPromise(opts) { 'unprepare' ]); -class PromisePoolConnection extends PromiseConnection { - constructor(connection, promiseImpl) { - super(connection, promiseImpl); - } - - destroy() { - return PoolConnection.prototype.destroy.apply( - this.connection, - arguments - ); - } -} - -class PromisePool extends EventEmitter { - constructor(pool, thePromise) { - super(); - this.pool = pool; - this.Promise = thePromise || Promise; - inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']); - } - - getConnection() { - const corePool = this.pool; - return new this.Promise((resolve, reject) => { - corePool.getConnection((err, coreConnection) => { - if (err) { - reject(err); - } else { - resolve(new PromisePoolConnection(coreConnection, this.Promise)); - } - }); - }); - } - - releaseConnection(connection) { - if (connection instanceof PromisePoolConnection) connection.release(); - } - - query(sql, args) { - const corePool = this.pool; - const localErr = new Error(); - if (typeof args === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (args !== undefined) { - corePool.query(sql, args, done); - } else { - corePool.query(sql, done); - } - }); - } - - execute(sql, args) { - const corePool = this.pool; - const localErr = new Error(); - if (typeof args === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (args) { - corePool.execute(sql, args, done); - } else { - corePool.execute(sql, done); - } - }); - } - - end() { - const corePool = this.pool; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - corePool.end(err => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(); - } - }); - }); - } -} - function createPromisePool(opts) { const corePool = createPool(opts); const thePromise = opts.Promise || Promise; From a1ec9267ff611cf8bb7f534291f4f0fb99906c32 Mon Sep 17 00:00:00 2001 From: Thomas Praxl Date: Sat, 19 Oct 2024 14:39:22 +0200 Subject: [PATCH 5/9] fix: missing patched functions The extraction of classes was performed without extracting the patch of methods like `escape`, etc. The patching is now performed in the files that define the respective classes, guaranteeing patched versions when these files are required. --- lib/promise_connection.js | 38 +++++++++++++++++++++++++- lib/promise_pool.js | 23 ++++++++++++++++ promise.js | 56 --------------------------------------- 3 files changed, 60 insertions(+), 57 deletions(-) diff --git a/lib/promise_connection.js b/lib/promise_connection.js index 3b6c9426de..45f4a1af1e 100644 --- a/lib/promise_connection.js +++ b/lib/promise_connection.js @@ -3,6 +3,7 @@ const PromisePreparedStatementInfo = require('./promise_prepared_statement_info.js'); const makeDoneCb = require('./make_done_cb.js'); const inheritEvents = require('./inherit_events.js'); +const { Connection } = require('../index.js'); const EventEmitter = require('events').EventEmitter; class PromiseConnection extends EventEmitter { @@ -181,5 +182,40 @@ class PromiseConnection extends EventEmitter { return this.connection.threadId; } } - +// patching PromiseConnection +// create facade functions for prototype functions on "Connection" that are not yet +// implemented with PromiseConnection + +// proxy synchronous functions only +(function (functionsToWrap) { + for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { + const func = functionsToWrap[i]; + + if ( + typeof Connection.prototype[func] === 'function' && + PromiseConnection.prototype[func] === undefined + ) { + PromiseConnection.prototype[func] = (function factory(funcName) { + return function () { + return Connection.prototype[funcName].apply( + this.connection, + arguments + ); + }; + })(func); + } + } +})([ + // synchronous functions + 'close', + 'createBinlogStream', + 'destroy', + 'escape', + 'escapeId', + 'format', + 'pause', + 'pipe', + 'resume', + 'unprepare' +]); module.exports = PromiseConnection diff --git a/lib/promise_pool.js b/lib/promise_pool.js index b325ca3d67..4858515f90 100644 --- a/lib/promise_pool.js +++ b/lib/promise_pool.js @@ -4,6 +4,7 @@ const EventEmitter = require('events').EventEmitter; const makeDoneCb = require('./make_done_cb.js'); const PromisePoolConnection = require('./promise_pool_connection.js'); const inheritEvents = require('./inherit_events.js'); +const Pool = require('./pool'); class PromisePool extends EventEmitter { constructor(pool, thePromise) { @@ -86,4 +87,26 @@ class PromisePool extends EventEmitter { } } +(function (functionsToWrap) { + for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { + const func = functionsToWrap[i]; + + if ( + typeof Pool.prototype[func] === 'function' && + PromisePool.prototype[func] === undefined + ) { + PromisePool.prototype[func] = (function factory(funcName) { + return function () { + return Pool.prototype[funcName].apply(this.pool, arguments); + }; + })(func); + } + } +})([ + // synchronous functions + 'escape', + 'escapeId', + 'format' +]); + module.exports = PromisePool diff --git a/promise.js b/promise.js index 2e2f5407a8..85db5091c1 100644 --- a/promise.js +++ b/promise.js @@ -6,7 +6,6 @@ const parserCache = require('./lib/parsers/parser_cache.js'); const Pool = require('./lib/pool.js'); const PoolCluster = require('./lib/pool_cluster.js'); const createConnection = require('./lib/create_connection.js'); -const Connection = require('./lib/connection.js'); const createPool = require('./lib/create_pool.js'); const createPoolCluster = require('./lib/create_pool_cluster.js'); const PromiseConnection = require('./lib/promise_connection.js'); @@ -43,42 +42,7 @@ function createConnectionPromise(opts) { // note: the callback of "changeUser" is not called on success // hence there is no possibility to call "resolve" -// patching PromiseConnection -// create facade functions for prototype functions on "Connection" that are not yet -// implemented with PromiseConnection -// proxy synchronous functions only -(function (functionsToWrap) { - for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { - const func = functionsToWrap[i]; - - if ( - typeof Connection.prototype[func] === 'function' && - PromiseConnection.prototype[func] === undefined - ) { - PromiseConnection.prototype[func] = (function factory(funcName) { - return function () { - return Connection.prototype[funcName].apply( - this.connection, - arguments - ); - }; - })(func); - } - } -})([ - // synchronous functions - 'close', - 'createBinlogStream', - 'destroy', - 'escape', - 'escapeId', - 'format', - 'pause', - 'pipe', - 'resume', - 'unprepare' -]); function createPromisePool(opts) { const corePool = createPool(opts); @@ -94,27 +58,7 @@ function createPromisePool(opts) { return new PromisePool(corePool, thePromise); } -(function (functionsToWrap) { - for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { - const func = functionsToWrap[i]; - if ( - typeof Pool.prototype[func] === 'function' && - PromisePool.prototype[func] === undefined - ) { - PromisePool.prototype[func] = (function factory(funcName) { - return function () { - return Pool.prototype[funcName].apply(this.pool, arguments); - }; - })(func); - } - } -})([ - // synchronous functions - 'escape', - 'escapeId', - 'format' -]); class PromisePoolCluster extends EventEmitter { constructor(poolCluster, thePromise) { From 2ec00fef85ed8797569f36c31236585cd8d2c3dc Mon Sep 17 00:00:00 2001 From: Thomas Praxl Date: Sat, 19 Oct 2024 16:37:14 +0200 Subject: [PATCH 6/9] chore: remove unused require --- promise.js | 1 - 1 file changed, 1 deletion(-) diff --git a/promise.js b/promise.js index 85db5091c1..9a29f8d20f 100644 --- a/promise.js +++ b/promise.js @@ -3,7 +3,6 @@ const SqlString = require('sqlstring'); const EventEmitter = require('events').EventEmitter; const parserCache = require('./lib/parsers/parser_cache.js'); -const Pool = require('./lib/pool.js'); const PoolCluster = require('./lib/pool_cluster.js'); const createConnection = require('./lib/create_connection.js'); const createPool = require('./lib/create_pool.js'); From 127df3ce97d5d761b5fbd1465db899519cadb541 Mon Sep 17 00:00:00 2001 From: Thomas Praxl Date: Thu, 24 Oct 2024 04:53:19 +0200 Subject: [PATCH 7/9] style: add missing semicolons and file name suffixes for require Co-authored-by: mknj --- index.js | 4 ++-- lib/inherit_events.js | 2 +- lib/make_done_cb.js | 2 +- lib/promise_pool.js | 2 +- lib/promise_pool_connection.js | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/index.js b/index.js index 0d7179c2a7..1138909b42 100644 --- a/index.js +++ b/index.js @@ -9,7 +9,7 @@ const Connection = require('./lib/connection.js'); exports.createConnection = require('./lib/create_connection.js'); exports.connect = exports.createConnection; -exports.Connection = Connection +exports.Connection = Connection; exports.ConnectionConfig = ConnectionConfig; const Pool = require('./lib/pool.js'); @@ -17,7 +17,7 @@ const PoolCluster = require('./lib/pool_cluster.js'); const createPool = require('./lib/create_pool.js'); const createPoolCluster = require('./lib/create_pool_cluster.js'); -exports.createPool = createPool +exports.createPool = createPool; exports.createPoolCluster = createPoolCluster; diff --git a/lib/inherit_events.js b/lib/inherit_events.js index 47122aa16a..d90b7eb629 100644 --- a/lib/inherit_events.js +++ b/lib/inherit_events.js @@ -24,4 +24,4 @@ function inheritEvents(source, target, events) { }); } -module.exports = inheritEvents +module.exports = inheritEvents; diff --git a/lib/make_done_cb.js b/lib/make_done_cb.js index 71de9b3797..124303f256 100644 --- a/lib/make_done_cb.js +++ b/lib/make_done_cb.js @@ -16,4 +16,4 @@ function makeDoneCb(resolve, reject, localErr) { }; } -module.exports = makeDoneCb +module.exports = makeDoneCb; diff --git a/lib/promise_pool.js b/lib/promise_pool.js index 4858515f90..24b3d32107 100644 --- a/lib/promise_pool.js +++ b/lib/promise_pool.js @@ -109,4 +109,4 @@ class PromisePool extends EventEmitter { 'format' ]); -module.exports = PromisePool +module.exports = PromisePool; diff --git a/lib/promise_pool_connection.js b/lib/promise_pool_connection.js index 88fa0e0c1f..3ea925f635 100644 --- a/lib/promise_pool_connection.js +++ b/lib/promise_pool_connection.js @@ -16,4 +16,4 @@ class PromisePoolConnection extends PromiseConnection { } } -module.exports = PromisePoolConnection +module.exports = PromisePoolConnection; From b765a02205ded3c36f1e894b62b97494f0a817ac Mon Sep 17 00:00:00 2001 From: wellwelwel <46850407+wellwelwel@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:46:27 -0300 Subject: [PATCH 8/9] chore: resolve all remaining circular dependencies --- index.js | 16 +- lib/base_connection.js | 945 +++++++++++++++++++++++++ lib/base_pool.js | 233 ++++++ lib/base_pool_connection.js | 63 ++ lib/connection.js | 940 +----------------------- lib/create_pool.js | 2 +- lib/inherit_events.js | 6 +- lib/pool.js | 229 +----- lib/pool_connection.js | 61 +- lib/promise_connection.js | 29 +- lib/promise_pool.js | 14 +- lib/promise_pool_connection.js | 6 +- lib/promise_prepared_statement_info.js | 4 +- promise.js | 61 +- 14 files changed, 1317 insertions(+), 1292 deletions(-) create mode 100644 lib/base_connection.js create mode 100644 lib/base_pool.js create mode 100644 lib/base_pool_connection.js diff --git a/index.js b/index.js index 1138909b42..ceae6df7dc 100644 --- a/index.js +++ b/index.js @@ -27,7 +27,7 @@ exports.Pool = Pool; exports.PoolCluster = PoolCluster; -exports.createServer = function(handler) { +exports.createServer = function (handler) { const Server = require('./lib/server.js'); const s = new Server(); if (handler) { @@ -45,33 +45,33 @@ exports.raw = SqlString.raw; exports.__defineGetter__( 'createConnectionPromise', - () => require('./promise.js').createConnection + () => require('./promise.js').createConnection, ); exports.__defineGetter__( 'createPoolPromise', - () => require('./promise.js').createPool + () => require('./promise.js').createPool, ); exports.__defineGetter__( 'createPoolClusterPromise', - () => require('./promise.js').createPoolCluster + () => require('./promise.js').createPoolCluster, ); exports.__defineGetter__('Types', () => require('./lib/constants/types.js')); exports.__defineGetter__('Charsets', () => - require('./lib/constants/charsets.js') + require('./lib/constants/charsets.js'), ); exports.__defineGetter__('CharsetToEncoding', () => - require('./lib/constants/charset_encodings.js') + require('./lib/constants/charset_encodings.js'), ); -exports.setMaxParserCache = function(max) { +exports.setMaxParserCache = function (max) { parserCache.setMaxCache(max); }; -exports.clearParserCache = function() { +exports.clearParserCache = function () { parserCache.clearCache(); }; diff --git a/lib/base_connection.js b/lib/base_connection.js new file mode 100644 index 0000000000..bb50135813 --- /dev/null +++ b/lib/base_connection.js @@ -0,0 +1,945 @@ +// This file was modified by Oracle on June 1, 2021. +// The changes involve new logic to handle an additional ERR Packet sent by +// the MySQL server when the connection is closed unexpectedly. +// Modifications copyright (c) 2021, Oracle and/or its affiliates. + +// This file was modified by Oracle on June 17, 2021. +// The changes involve logic to ensure the socket connection is closed when +// there is a fatal error. +// Modifications copyright (c) 2021, Oracle and/or its affiliates. + +// This file was modified by Oracle on September 21, 2021. +// The changes involve passing additional authentication factor passwords +// to the ChangeUser Command instance. +// Modifications copyright (c) 2021, Oracle and/or its affiliates. + +'use strict'; + +const Net = require('net'); +const Tls = require('tls'); +const Timers = require('timers'); +const EventEmitter = require('events').EventEmitter; +const Readable = require('stream').Readable; +const Queue = require('denque'); +const SqlString = require('sqlstring'); +const { createLRU } = require('lru.min'); +const PacketParser = require('./packet_parser.js'); +const Packets = require('./packets/index.js'); +const Commands = require('./commands/index.js'); +const ConnectionConfig = require('./connection_config.js'); +const CharsetToEncoding = require('./constants/charset_encodings.js'); + +let _connectionId = 0; + +let convertNamedPlaceholders = null; + +class BaseConnection extends EventEmitter { + constructor(opts) { + super(); + this.config = opts.config; + // TODO: fill defaults + // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX ) + // if host is given, connect to host:3306 + // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath + // if there is no host/port and no socketPath parameters? + if (!opts.config.stream) { + if (opts.config.socketPath) { + this.stream = Net.connect(opts.config.socketPath); + } else { + this.stream = Net.connect(opts.config.port, opts.config.host); + + // Optionally enable keep-alive on the socket. + if (this.config.enableKeepAlive) { + this.stream.on('connect', () => { + this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay); + }); + } + + // Enable TCP_NODELAY flag. This is needed so that the network packets + // are sent immediately to the server + this.stream.setNoDelay(true); + } + // if stream is a function, treat it as "stream agent / factory" + } else if (typeof opts.config.stream === 'function') { + this.stream = opts.config.stream(opts); + } else { + this.stream = opts.config.stream; + } + + this._internalId = _connectionId++; + this._commands = new Queue(); + this._command = null; + this._paused = false; + this._paused_packets = new Queue(); + this._statements = createLRU({ + max: this.config.maxPreparedStatements, + onEviction: function (_, statement) { + statement.close(); + }, + }); + this.serverCapabilityFlags = 0; + this.authorized = false; + this.sequenceId = 0; + this.compressedSequenceId = 0; + this.threadId = null; + this._handshakePacket = null; + this._fatalError = null; + this._protocolError = null; + this._outOfOrderPackets = []; + this.clientEncoding = CharsetToEncoding[this.config.charsetNumber]; + this.stream.on('error', this._handleNetworkError.bind(this)); + // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind + this.packetParser = new PacketParser((p) => { + this.handlePacket(p); + }); + this.stream.on('data', (data) => { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + this.packetParser.execute(data); + }); + this.stream.on('end', () => { + // emit the end event so that the pooled connection can close the connection + this.emit('end'); + }); + this.stream.on('close', () => { + // we need to set this flag everywhere where we want connection to close + if (this._closing) { + return; + } + if (!this._protocolError) { + // no particular error message before disconnect + this._protocolError = new Error( + 'Connection lost: The server closed the connection.', + ); + this._protocolError.fatal = true; + this._protocolError.code = 'PROTOCOL_CONNECTION_LOST'; + } + this._notifyError(this._protocolError); + }); + let handshakeCommand; + if (!this.config.isServer) { + handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags); + handshakeCommand.on('end', () => { + // this happens when handshake finishes early either because there was + // some fatal error or the server sent an error packet instead of + // an hello packet (for example, 'Too many connections' error) + if ( + !handshakeCommand.handshake || + this._fatalError || + this._protocolError + ) { + return; + } + this._handshakePacket = handshakeCommand.handshake; + this.threadId = handshakeCommand.handshake.connectionId; + this.emit('connect', handshakeCommand.handshake); + }); + handshakeCommand.on('error', (err) => { + this._closing = true; + this._notifyError(err); + }); + this.addCommand(handshakeCommand); + } + // in case there was no initial handshake but we need to read sting, assume it utf-8 + // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet) + // will be overwritten with actual encoding value as soon as server handshake packet is received + this.serverEncoding = 'utf8'; + if (this.config.connectTimeout) { + const timeoutHandler = this._handleTimeoutError.bind(this); + this.connectTimeout = Timers.setTimeout( + timeoutHandler, + this.config.connectTimeout, + ); + } + } + + _addCommandClosedState(cmd) { + const err = new Error( + "Can't add new command when connection is in closed state", + ); + err.fatal = true; + if (cmd.onResult) { + cmd.onResult(err); + } else { + this.emit('error', err); + } + } + + _handleFatalError(err) { + err.fatal = true; + // stop receiving packets + this.stream.removeAllListeners('data'); + this.addCommand = this._addCommandClosedState; + this.write = () => { + this.emit('error', new Error("Can't write in closed state")); + }; + this._notifyError(err); + this._fatalError = err; + } + + _handleNetworkError(err) { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + // Do not throw an error when a connection ends with a RST,ACK packet + if (err.code === 'ECONNRESET' && this._closing) { + return; + } + this._handleFatalError(err); + } + + _handleTimeoutError() { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + this.stream.destroy && this.stream.destroy(); + const err = new Error('connect ETIMEDOUT'); + err.errorno = 'ETIMEDOUT'; + err.code = 'ETIMEDOUT'; + err.syscall = 'connect'; + this._handleNetworkError(err); + } + + // notify all commands in the queue and bubble error as connection "error" + // called on stream error or unexpected termination + _notifyError(err) { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET + if (this._fatalError) { + return; + } + let command; + // if there is no active command, notify connection + // if there are commands and all of them have callbacks, pass error via callback + let bubbleErrorToConnection = !this._command; + if (this._command && this._command.onResult) { + this._command.onResult(err); + this._command = null; + // connection handshake is special because we allow it to be implicit + // if error happened during handshake, but there are others commands in queue + // then bubble error to other commands and not to connection + } else if ( + !( + this._command && + this._command.constructor === Commands.ClientHandshake && + this._commands.length > 0 + ) + ) { + bubbleErrorToConnection = true; + } + while ((command = this._commands.shift())) { + if (command.onResult) { + command.onResult(err); + } else { + bubbleErrorToConnection = true; + } + } + // notify connection if some comands in the queue did not have callbacks + // or if this is pool connection ( so it can be removed from pool ) + if (bubbleErrorToConnection || this._pool) { + this.emit('error', err); + } + // close connection after emitting the event in case of a fatal error + if (err.fatal) { + this.close(); + } + } + + write(buffer) { + const result = this.stream.write(buffer, (err) => { + if (err) { + this._handleNetworkError(err); + } + }); + + if (!result) { + this.stream.emit('pause'); + } + } + + // http://dev.mysql.com/doc/internals/en/sequence-id.html + // + // The sequence-id is incremented with each packet and may wrap around. + // It starts at 0 and is reset to 0 when a new command + // begins in the Command Phase. + // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html + _resetSequenceId() { + this.sequenceId = 0; + this.compressedSequenceId = 0; + } + + _bumpCompressedSequenceId(numPackets) { + this.compressedSequenceId += numPackets; + this.compressedSequenceId %= 256; + } + + _bumpSequenceId(numPackets) { + this.sequenceId += numPackets; + this.sequenceId %= 256; + } + + writePacket(packet) { + const MAX_PACKET_LENGTH = 16777215; + const length = packet.length(); + let chunk, offset, header; + if (length < MAX_PACKET_LENGTH) { + packet.writeHeader(this.sequenceId); + if (this.config.debug) { + console.log( + `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`, + ); + console.log( + `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`, + ); + } + this._bumpSequenceId(1); + this.write(packet.buffer); + } else { + if (this.config.debug) { + console.log( + `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`, + ); + console.log( + `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`, + ); + } + for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) { + chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH); + if (chunk.length === MAX_PACKET_LENGTH) { + header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]); + } else { + header = Buffer.from([ + chunk.length & 0xff, + (chunk.length >> 8) & 0xff, + (chunk.length >> 16) & 0xff, + this.sequenceId, + ]); + } + this._bumpSequenceId(1); + this.write(header); + this.write(chunk); + } + } + } + + // 0.11+ environment + startTLS(onSecure) { + if (this.config.debug) { + console.log('Upgrading connection to TLS'); + } + const secureContext = Tls.createSecureContext({ + ca: this.config.ssl.ca, + cert: this.config.ssl.cert, + ciphers: this.config.ssl.ciphers, + key: this.config.ssl.key, + passphrase: this.config.ssl.passphrase, + minVersion: this.config.ssl.minVersion, + maxVersion: this.config.ssl.maxVersion, + }); + const rejectUnauthorized = this.config.ssl.rejectUnauthorized; + const verifyIdentity = this.config.ssl.verifyIdentity; + const servername = this.config.host; + + let secureEstablished = false; + this.stream.removeAllListeners('data'); + const secureSocket = Tls.connect( + { + rejectUnauthorized, + requestCert: rejectUnauthorized, + checkServerIdentity: verifyIdentity + ? Tls.checkServerIdentity + : function () { + return undefined; + }, + secureContext, + isServer: false, + socket: this.stream, + servername, + }, + () => { + secureEstablished = true; + if (rejectUnauthorized) { + if (typeof servername === 'string' && verifyIdentity) { + const cert = secureSocket.getPeerCertificate(true); + const serverIdentityCheckError = Tls.checkServerIdentity( + servername, + cert, + ); + if (serverIdentityCheckError) { + onSecure(serverIdentityCheckError); + return; + } + } + } + onSecure(); + }, + ); + // error handler for secure socket + secureSocket.on('error', (err) => { + if (secureEstablished) { + this._handleNetworkError(err); + } else { + onSecure(err); + } + }); + secureSocket.on('data', (data) => { + this.packetParser.execute(data); + }); + this.write = (buffer) => secureSocket.write(buffer); + } + + protocolError(message, code) { + // Starting with MySQL 8.0.24, if the client closes the connection + // unexpectedly, the server will send a last ERR Packet, which we can + // safely ignore. + // https://dev.mysql.com/worklog/task/?id=12999 + if (this._closing) { + return; + } + + const err = new Error(message); + err.fatal = true; + err.code = code || 'PROTOCOL_ERROR'; + this.emit('error', err); + } + + get fatalError() { + return this._fatalError; + } + + handlePacket(packet) { + if (this._paused) { + this._paused_packets.push(packet); + return; + } + if (this.config.debug) { + if (packet) { + console.log( + ` raw: ${packet.buffer + .slice(packet.offset, packet.offset + packet.length()) + .toString('hex')}`, + ); + console.trace(); + const commandName = this._command + ? this._command._commandName + : '(no command)'; + const stateName = this._command + ? this._command.stateName() + : '(no command)'; + console.log( + `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`, + ); + } + } + if (!this._command) { + const marker = packet.peekByte(); + // If it's an Err Packet, we should use it. + if (marker === 0xff) { + const error = Packets.Error.fromPacket(packet); + this.protocolError(error.message, error.code); + } else { + // Otherwise, it means it's some other unexpected packet. + this.protocolError( + 'Unexpected packet while no commands in the queue', + 'PROTOCOL_UNEXPECTED_PACKET', + ); + } + this.close(); + return; + } + if (packet) { + // Note: when server closes connection due to inactivity, Err packet ER_CLIENT_INTERACTION_TIMEOUT from MySQL 8.0.24, sequenceId will be 0 + if (this.sequenceId !== packet.sequenceId) { + const err = new Error( + `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`, + ); + err.expected = this.sequenceId; + err.received = packet.sequenceId; + this.emit('warn', err); // REVIEW + console.error(err.message); + } + this._bumpSequenceId(packet.numPackets); + } + try { + if (this._fatalError) { + // skip remaining packets after client is in the error state + return; + } + const done = this._command.execute(packet, this); + if (done) { + this._command = this._commands.shift(); + if (this._command) { + this.sequenceId = 0; + this.compressedSequenceId = 0; + this.handlePacket(); + } + } + } catch (err) { + this._handleFatalError(err); + this.stream.destroy(); + } + } + + addCommand(cmd) { + // this.compressedSequenceId = 0; + // this.sequenceId = 0; + if (this.config.debug) { + const commandName = cmd.constructor.name; + console.log(`Add command: ${commandName}`); + cmd._commandName = commandName; + } + if (!this._command) { + this._command = cmd; + this.handlePacket(); + } else { + this._commands.push(cmd); + } + return cmd; + } + + format(sql, values) { + if (typeof this.config.queryFormat === 'function') { + return this.config.queryFormat.call( + this, + sql, + values, + this.config.timezone, + ); + } + const opts = { + sql: sql, + values: values, + }; + this._resolveNamedPlaceholders(opts); + return SqlString.format( + opts.sql, + opts.values, + this.config.stringifyObjects, + this.config.timezone, + ); + } + + escape(value) { + return SqlString.escape(value, false, this.config.timezone); + } + + escapeId(value) { + return SqlString.escapeId(value, false); + } + + raw(sql) { + return SqlString.raw(sql); + } + + _resolveNamedPlaceholders(options) { + let unnamed; + if (this.config.namedPlaceholders || options.namedPlaceholders) { + if (Array.isArray(options.values)) { + // if an array is provided as the values, assume the conversion is not necessary. + // this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled. + return; + } + if (convertNamedPlaceholders === null) { + convertNamedPlaceholders = require('named-placeholders')(); + } + unnamed = convertNamedPlaceholders(options.sql, options.values); + options.sql = unnamed[0]; + options.values = unnamed[1]; + } + } + + query(sql, values, cb) { + let cmdQuery; + if (sql.constructor === Commands.Query) { + cmdQuery = sql; + } else { + cmdQuery = BaseConnection.createQuery(sql, values, cb, this.config); + } + this._resolveNamedPlaceholders(cmdQuery); + const rawSql = this.format( + cmdQuery.sql, + cmdQuery.values !== undefined ? cmdQuery.values : [], + ); + cmdQuery.sql = rawSql; + return this.addCommand(cmdQuery); + } + + pause() { + this._paused = true; + this.stream.pause(); + } + + resume() { + let packet; + this._paused = false; + while ((packet = this._paused_packets.shift())) { + this.handlePacket(packet); + // don't resume if packet handler paused connection + if (this._paused) { + return; + } + } + this.stream.resume(); + } + + // TODO: named placeholders support + prepare(options, cb) { + if (typeof options === 'string') { + options = { sql: options }; + } + return this.addCommand(new Commands.Prepare(options, cb)); + } + + unprepare(sql) { + let options = {}; + if (typeof sql === 'object') { + options = sql; + } else { + options.sql = sql; + } + const key = BaseConnection.statementKey(options); + const stmt = this._statements.get(key); + if (stmt) { + this._statements.delete(key); + stmt.close(); + } + return stmt; + } + + execute(sql, values, cb) { + let options = { + infileStreamFactory: this.config.infileStreamFactory, + }; + if (typeof sql === 'object') { + // execute(options, cb) + options = { + ...options, + ...sql, + sql: sql.sql, + values: sql.values, + }; + if (typeof values === 'function') { + cb = values; + } else { + options.values = options.values || values; + } + } else if (typeof values === 'function') { + // execute(sql, cb) + cb = values; + options.sql = sql; + options.values = undefined; + } else { + // execute(sql, values, cb) + options.sql = sql; + options.values = values; + } + this._resolveNamedPlaceholders(options); + // check for values containing undefined + if (options.values) { + //If namedPlaceholder is not enabled and object is passed as bind parameters + if (!Array.isArray(options.values)) { + throw new TypeError( + 'Bind parameters must be array if namedPlaceholders parameter is not enabled', + ); + } + options.values.forEach((val) => { + //If namedPlaceholder is not enabled and object is passed as bind parameters + if (!Array.isArray(options.values)) { + throw new TypeError( + 'Bind parameters must be array if namedPlaceholders parameter is not enabled', + ); + } + if (val === undefined) { + throw new TypeError( + 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null', + ); + } + if (typeof val === 'function') { + throw new TypeError( + 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first', + ); + } + }); + } + const executeCommand = new Commands.Execute(options, cb); + const prepareCommand = new Commands.Prepare(options, (err, stmt) => { + if (err) { + // skip execute command if prepare failed, we have main + // combined callback here + executeCommand.start = function () { + return null; + }; + if (cb) { + cb(err); + } else { + executeCommand.emit('error', err); + } + executeCommand.emit('end'); + return; + } + executeCommand.statement = stmt; + }); + this.addCommand(prepareCommand); + this.addCommand(executeCommand); + return executeCommand; + } + + changeUser(options, callback) { + if (!callback && typeof options === 'function') { + callback = options; + options = {}; + } + const charsetNumber = options.charset + ? ConnectionConfig.getCharsetNumber(options.charset) + : this.config.charsetNumber; + return this.addCommand( + new Commands.ChangeUser( + { + user: options.user || this.config.user, + // for the purpose of multi-factor authentication, or not, the main + // password (used for the 1st authentication factor) can also be + // provided via the "password1" option + password: + options.password || + options.password1 || + this.config.password || + this.config.password1, + password2: options.password2 || this.config.password2, + password3: options.password3 || this.config.password3, + passwordSha1: options.passwordSha1 || this.config.passwordSha1, + database: options.database || this.config.database, + timeout: options.timeout, + charsetNumber: charsetNumber, + currentConfig: this.config, + }, + (err) => { + if (err) { + err.fatal = true; + } + if (callback) { + callback(err); + } + }, + ), + ); + } + + // transaction helpers + beginTransaction(cb) { + return this.query('START TRANSACTION', cb); + } + + commit(cb) { + return this.query('COMMIT', cb); + } + + rollback(cb) { + return this.query('ROLLBACK', cb); + } + + ping(cb) { + return this.addCommand(new Commands.Ping(cb)); + } + + _registerSlave(opts, cb) { + return this.addCommand(new Commands.RegisterSlave(opts, cb)); + } + + _binlogDump(opts, cb) { + return this.addCommand(new Commands.BinlogDump(opts, cb)); + } + + // currently just alias to close + destroy() { + this.close(); + } + + close() { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + this._closing = true; + this.stream.end(); + this.addCommand = this._addCommandClosedState; + } + + createBinlogStream(opts) { + // TODO: create proper stream class + // TODO: use through2 + let test = 1; + const stream = new Readable({ objectMode: true }); + stream._read = function () { + return { + data: test++, + }; + }; + this._registerSlave(opts, () => { + const dumpCmd = this._binlogDump(opts); + dumpCmd.on('event', (ev) => { + stream.push(ev); + }); + dumpCmd.on('eof', () => { + stream.push(null); + // if non-blocking, then close stream to prevent errors + if (opts.flags && opts.flags & 0x01) { + this.close(); + } + }); + // TODO: pipe errors as well + }); + return stream; + } + + connect(cb) { + if (!cb) { + return; + } + if (this._fatalError || this._protocolError) { + return cb(this._fatalError || this._protocolError); + } + if (this._handshakePacket) { + return cb(null, this); + } + let connectCalled = 0; + function callbackOnce(isErrorHandler) { + return function (param) { + if (!connectCalled) { + if (isErrorHandler) { + cb(param); + } else { + cb(null, param); + } + } + connectCalled = 1; + }; + } + this.once('error', callbackOnce(true)); + this.once('connect', callbackOnce(false)); + } + + // =================================== + // outgoing server connection methods + // =================================== + writeColumns(columns) { + this.writePacket(Packets.ResultSetHeader.toPacket(columns.length)); + columns.forEach((column) => { + this.writePacket( + Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding), + ); + }); + this.writeEof(); + } + + // row is array of columns, not hash + writeTextRow(column) { + this.writePacket( + Packets.TextRow.toPacket(column, this.serverConfig.encoding), + ); + } + + writeBinaryRow(column) { + this.writePacket( + Packets.BinaryRow.toPacket(column, this.serverConfig.encoding), + ); + } + + writeTextResult(rows, columns, binary = false) { + this.writeColumns(columns); + rows.forEach((row) => { + const arrayRow = new Array(columns.length); + columns.forEach((column) => { + arrayRow.push(row[column.name]); + }); + if (binary) { + this.writeBinaryRow(arrayRow); + } else this.writeTextRow(arrayRow); + }); + this.writeEof(); + } + + writeEof(warnings, statusFlags) { + this.writePacket(Packets.EOF.toPacket(warnings, statusFlags)); + } + + writeOk(args) { + if (!args) { + args = { affectedRows: 0 }; + } + this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding)); + } + + writeError(args) { + // if we want to send error before initial hello was sent, use default encoding + const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8'; + this.writePacket(Packets.Error.toPacket(args, encoding)); + } + + serverHandshake(args) { + this.serverConfig = args; + this.serverConfig.encoding = + CharsetToEncoding[this.serverConfig.characterSet]; + return this.addCommand(new Commands.ServerHandshake(args)); + } + + // =============================================================== + end(callback) { + if (this.config.isServer) { + this._closing = true; + const quitCmd = new EventEmitter(); + setImmediate(() => { + this.stream.end(); + quitCmd.emit('end'); + }); + return quitCmd; + } + // trigger error if more commands enqueued after end command + const quitCmd = this.addCommand(new Commands.Quit(callback)); + this.addCommand = this._addCommandClosedState; + return quitCmd; + } + + static createQuery(sql, values, cb, config) { + let options = { + rowsAsArray: config.rowsAsArray, + infileStreamFactory: config.infileStreamFactory, + }; + if (typeof sql === 'object') { + // query(options, cb) + options = { + ...options, + ...sql, + sql: sql.sql, + values: sql.values, + }; + if (typeof values === 'function') { + cb = values; + } else if (values !== undefined) { + options.values = values; + } + } else if (typeof values === 'function') { + // query(sql, cb) + cb = values; + options.sql = sql; + options.values = undefined; + } else { + // query(sql, values, cb) + options.sql = sql; + options.values = values; + } + return new Commands.Query(options, cb); + } + + static statementKey(options) { + return `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`; + } +} + +module.exports = BaseConnection; diff --git a/lib/base_pool.js b/lib/base_pool.js new file mode 100644 index 0000000000..bcd6e7b1f3 --- /dev/null +++ b/lib/base_pool.js @@ -0,0 +1,233 @@ +'use strict'; + +const process = require('process'); +const SqlString = require('sqlstring'); +const EventEmitter = require('events').EventEmitter; +const PoolConnection = require('./pool_connection.js'); +const Queue = require('denque'); +const BaseConnection = require('./base_connection.js'); + +function spliceConnection(queue, connection) { + const len = queue.length; + for (let i = 0; i < len; i++) { + if (queue.get(i) === connection) { + queue.removeOne(i); + break; + } + } +} + +class BasePool extends EventEmitter { + constructor(options) { + super(); + this.config = options.config; + this.config.connectionConfig.pool = this; + this._allConnections = new Queue(); + this._freeConnections = new Queue(); + this._connectionQueue = new Queue(); + this._closed = false; + if (this.config.maxIdle < this.config.connectionLimit) { + // create idle connection timeout automatically release job + this._removeIdleTimeoutConnections(); + } + } + + getConnection(cb) { + if (this._closed) { + return process.nextTick(() => cb(new Error('Pool is closed.'))); + } + let connection; + if (this._freeConnections.length > 0) { + connection = this._freeConnections.pop(); + this.emit('acquire', connection); + return process.nextTick(() => cb(null, connection)); + } + if ( + this.config.connectionLimit === 0 || + this._allConnections.length < this.config.connectionLimit + ) { + connection = new PoolConnection(this, { + config: this.config.connectionConfig, + }); + this._allConnections.push(connection); + return connection.connect((err) => { + if (this._closed) { + return cb(new Error('Pool is closed.')); + } + if (err) { + return cb(err); + } + this.emit('connection', connection); + this.emit('acquire', connection); + return cb(null, connection); + }); + } + if (!this.config.waitForConnections) { + return process.nextTick(() => cb(new Error('No connections available.'))); + } + if ( + this.config.queueLimit && + this._connectionQueue.length >= this.config.queueLimit + ) { + return cb(new Error('Queue limit reached.')); + } + this.emit('enqueue'); + return this._connectionQueue.push(cb); + } + + releaseConnection(connection) { + let cb; + if (!connection._pool) { + // The connection has been removed from the pool and is no longer good. + if (this._connectionQueue.length) { + cb = this._connectionQueue.shift(); + process.nextTick(this.getConnection.bind(this, cb)); + } + } else if (this._connectionQueue.length) { + cb = this._connectionQueue.shift(); + process.nextTick(cb.bind(null, null, connection)); + } else { + this._freeConnections.push(connection); + this.emit('release', connection); + } + } + + end(cb) { + this._closed = true; + clearTimeout(this._removeIdleTimeoutConnectionsTimer); + if (typeof cb !== 'function') { + cb = function (err) { + if (err) { + throw err; + } + }; + } + let calledBack = false; + let closedConnections = 0; + let connection; + const endCB = function (err) { + if (calledBack) { + return; + } + if (err || ++closedConnections >= this._allConnections.length) { + calledBack = true; + cb(err); + return; + } + }.bind(this); + if (this._allConnections.length === 0) { + endCB(); + return; + } + for (let i = 0; i < this._allConnections.length; i++) { + connection = this._allConnections.get(i); + connection._realEnd(endCB); + } + } + + query(sql, values, cb) { + const cmdQuery = BaseConnection.createQuery( + sql, + values, + cb, + this.config.connectionConfig, + ); + if (typeof cmdQuery.namedPlaceholders === 'undefined') { + cmdQuery.namedPlaceholders = + this.config.connectionConfig.namedPlaceholders; + } + this.getConnection((err, conn) => { + if (err) { + if (typeof cmdQuery.onResult === 'function') { + cmdQuery.onResult(err); + } else { + cmdQuery.emit('error', err); + } + return; + } + try { + conn.query(cmdQuery).once('end', () => { + conn.release(); + }); + } catch (e) { + conn.release(); + throw e; + } + }); + return cmdQuery; + } + + execute(sql, values, cb) { + // TODO construct execute command first here and pass it to connection.execute + // so that polymorphic arguments logic is there in one place + if (typeof values === 'function') { + cb = values; + values = []; + } + this.getConnection((err, conn) => { + if (err) { + return cb(err); + } + try { + conn.execute(sql, values, cb).once('end', () => { + conn.release(); + }); + } catch (e) { + conn.release(); + return cb(e); + } + }); + } + + _removeConnection(connection) { + // Remove connection from all connections + spliceConnection(this._allConnections, connection); + // Remove connection from free connections + spliceConnection(this._freeConnections, connection); + this.releaseConnection(connection); + } + + _removeIdleTimeoutConnections() { + if (this._removeIdleTimeoutConnectionsTimer) { + clearTimeout(this._removeIdleTimeoutConnectionsTimer); + } + + this._removeIdleTimeoutConnectionsTimer = setTimeout(() => { + try { + while ( + this._freeConnections.length > this.config.maxIdle || + (this._freeConnections.length > 0 && + Date.now() - this._freeConnections.get(0).lastActiveTime > + this.config.idleTimeout) + ) { + this._freeConnections.get(0).destroy(); + } + } finally { + this._removeIdleTimeoutConnections(); + } + }, 1000); + } + + format(sql, values) { + return SqlString.format( + sql, + values, + this.config.connectionConfig.stringifyObjects, + this.config.connectionConfig.timezone, + ); + } + + escape(value) { + return SqlString.escape( + value, + this.config.connectionConfig.stringifyObjects, + this.config.connectionConfig.timezone, + ); + } + + escapeId(value) { + return SqlString.escapeId(value, false); + } +} + +module.exports = BasePool; diff --git a/lib/base_pool_connection.js b/lib/base_pool_connection.js new file mode 100644 index 0000000000..97b345efd7 --- /dev/null +++ b/lib/base_pool_connection.js @@ -0,0 +1,63 @@ +'use strict'; + +const BaseConnection = require('./base_connection.js'); + +class BasePoolConnection extends BaseConnection { + constructor(pool, options) { + super(options); + this._pool = pool; + // The last active time of this connection + this.lastActiveTime = Date.now(); + // When a fatal error occurs the connection's protocol ends, which will cause + // the connection to end as well, thus we only need to watch for the end event + // and we will be notified of disconnects. + // REVIEW: Moved to `once` + this.once('end', () => { + this._removeFromPool(); + }); + this.once('error', () => { + this._removeFromPool(); + }); + } + + release() { + if (!this._pool || this._pool._closed) { + return; + } + // update last active time + this.lastActiveTime = Date.now(); + this._pool.releaseConnection(this); + } + + end() { + const err = new Error( + 'Calling conn.end() to release a pooled connection is ' + + 'deprecated. In next version calling conn.end() will be ' + + 'restored to default conn.end() behavior. Use ' + + 'conn.release() instead.', + ); + this.emit('warn', err); + console.warn(err.message); + this.release(); + } + + destroy() { + this._removeFromPool(); + super.destroy(); + } + + _removeFromPool() { + if (!this._pool || this._pool._closed) { + return; + } + const pool = this._pool; + this._pool = null; + pool._removeConnection(this); + } +} + +BasePoolConnection.statementKey = BaseConnection.statementKey; +module.exports = BasePoolConnection; + +// TODO: Remove this when we are removing PoolConnection#end +BasePoolConnection.prototype._realEnd = BaseConnection.prototype.end; diff --git a/lib/connection.js b/lib/connection.js index 8448522c97..fb93a3274a 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -1,948 +1,12 @@ -// This file was modified by Oracle on June 1, 2021. -// The changes involve new logic to handle an additional ERR Packet sent by -// the MySQL server when the connection is closed unexpectedly. -// Modifications copyright (c) 2021, Oracle and/or its affiliates. - -// This file was modified by Oracle on June 17, 2021. -// The changes involve logic to ensure the socket connection is closed when -// there is a fatal error. -// Modifications copyright (c) 2021, Oracle and/or its affiliates. - -// This file was modified by Oracle on September 21, 2021. -// The changes involve passing additional authentication factor passwords -// to the ChangeUser Command instance. -// Modifications copyright (c) 2021, Oracle and/or its affiliates. - 'use strict'; -const Net = require('net'); -const Tls = require('tls'); -const Timers = require('timers'); -const EventEmitter = require('events').EventEmitter; -const Readable = require('stream').Readable; -const Queue = require('denque'); -const SqlString = require('sqlstring'); -const { createLRU } = require('lru.min'); - -const PacketParser = require('./packet_parser.js'); -const Packets = require('./packets/index.js'); -const Commands = require('./commands/index.js'); -const ConnectionConfig = require('./connection_config.js'); -const CharsetToEncoding = require('./constants/charset_encodings.js'); - -let _connectionId = 0; - -let convertNamedPlaceholders = null; - -class Connection extends EventEmitter { - constructor(opts) { - super(); - this.config = opts.config; - // TODO: fill defaults - // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX ) - // if host is given, connect to host:3306 - // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath - // if there is no host/port and no socketPath parameters? - if (!opts.config.stream) { - if (opts.config.socketPath) { - this.stream = Net.connect(opts.config.socketPath); - } else { - this.stream = Net.connect( - opts.config.port, - opts.config.host - ); - - // Optionally enable keep-alive on the socket. - if (this.config.enableKeepAlive) { - this.stream.on('connect', () => { - this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay); - }); - } - - // Enable TCP_NODELAY flag. This is needed so that the network packets - // are sent immediately to the server - this.stream.setNoDelay(true); - } - // if stream is a function, treat it as "stream agent / factory" - } else if (typeof opts.config.stream === 'function') { - this.stream = opts.config.stream(opts); - } else { - this.stream = opts.config.stream; - } - - this._internalId = _connectionId++; - this._commands = new Queue(); - this._command = null; - this._paused = false; - this._paused_packets = new Queue(); - this._statements = createLRU({ - max: this.config.maxPreparedStatements, - onEviction: function(_, statement) { - statement.close(); - } - }); - this.serverCapabilityFlags = 0; - this.authorized = false; - this.sequenceId = 0; - this.compressedSequenceId = 0; - this.threadId = null; - this._handshakePacket = null; - this._fatalError = null; - this._protocolError = null; - this._outOfOrderPackets = []; - this.clientEncoding = CharsetToEncoding[this.config.charsetNumber]; - this.stream.on('error', this._handleNetworkError.bind(this)); - // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind - this.packetParser = new PacketParser(p => { - this.handlePacket(p); - }); - this.stream.on('data', data => { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - this.packetParser.execute(data); - }); - this.stream.on('end', () => { - // emit the end event so that the pooled connection can close the connection - this.emit('end'); - }); - this.stream.on('close', () => { - // we need to set this flag everywhere where we want connection to close - if (this._closing) { - return; - } - if (!this._protocolError) { - // no particular error message before disconnect - this._protocolError = new Error( - 'Connection lost: The server closed the connection.' - ); - this._protocolError.fatal = true; - this._protocolError.code = 'PROTOCOL_CONNECTION_LOST'; - } - this._notifyError(this._protocolError); - }); - let handshakeCommand; - if (!this.config.isServer) { - handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags); - handshakeCommand.on('end', () => { - // this happens when handshake finishes early either because there was - // some fatal error or the server sent an error packet instead of - // an hello packet (for example, 'Too many connections' error) - if (!handshakeCommand.handshake || this._fatalError || this._protocolError) { - return; - } - this._handshakePacket = handshakeCommand.handshake; - this.threadId = handshakeCommand.handshake.connectionId; - this.emit('connect', handshakeCommand.handshake); - }); - handshakeCommand.on('error', err => { - this._closing = true; - this._notifyError(err); - }); - this.addCommand(handshakeCommand); - } - // in case there was no initial handshake but we need to read sting, assume it utf-8 - // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet) - // will be overwritten with actual encoding value as soon as server handshake packet is received - this.serverEncoding = 'utf8'; - if (this.config.connectTimeout) { - const timeoutHandler = this._handleTimeoutError.bind(this); - this.connectTimeout = Timers.setTimeout( - timeoutHandler, - this.config.connectTimeout - ); - } - } +const BaseConnection = require('./base_connection.js'); +class Connection extends BaseConnection { promise(promiseImpl) { const PromiseConnection = require('./promise_connection.js'); return new PromiseConnection(this, promiseImpl); } - - _addCommandClosedState(cmd) { - const err = new Error( - "Can't add new command when connection is in closed state" - ); - err.fatal = true; - if (cmd.onResult) { - cmd.onResult(err); - } else { - this.emit('error', err); - } - } - - _handleFatalError(err) { - err.fatal = true; - // stop receiving packets - this.stream.removeAllListeners('data'); - this.addCommand = this._addCommandClosedState; - this.write = () => { - this.emit('error', new Error("Can't write in closed state")); - }; - this._notifyError(err); - this._fatalError = err; - } - - _handleNetworkError(err) { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - // Do not throw an error when a connection ends with a RST,ACK packet - if (err.code === 'ECONNRESET' && this._closing) { - return; - } - this._handleFatalError(err); - } - - _handleTimeoutError() { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - this.stream.destroy && this.stream.destroy(); - const err = new Error('connect ETIMEDOUT'); - err.errorno = 'ETIMEDOUT'; - err.code = 'ETIMEDOUT'; - err.syscall = 'connect'; - this._handleNetworkError(err); - } - - // notify all commands in the queue and bubble error as connection "error" - // called on stream error or unexpected termination - _notifyError(err) { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET - if (this._fatalError) { - return; - } - let command; - // if there is no active command, notify connection - // if there are commands and all of them have callbacks, pass error via callback - let bubbleErrorToConnection = !this._command; - if (this._command && this._command.onResult) { - this._command.onResult(err); - this._command = null; - // connection handshake is special because we allow it to be implicit - // if error happened during handshake, but there are others commands in queue - // then bubble error to other commands and not to connection - } else if ( - !( - this._command && - this._command.constructor === Commands.ClientHandshake && - this._commands.length > 0 - ) - ) { - bubbleErrorToConnection = true; - } - while ((command = this._commands.shift())) { - if (command.onResult) { - command.onResult(err); - } else { - bubbleErrorToConnection = true; - } - } - // notify connection if some comands in the queue did not have callbacks - // or if this is pool connection ( so it can be removed from pool ) - if (bubbleErrorToConnection || this._pool) { - this.emit('error', err); - } - // close connection after emitting the event in case of a fatal error - if (err.fatal) { - this.close(); - } - } - - write(buffer) { - const result = this.stream.write(buffer, err => { - if (err) { - this._handleNetworkError(err); - } - }); - - if (!result) { - this.stream.emit('pause'); - } - } - - // http://dev.mysql.com/doc/internals/en/sequence-id.html - // - // The sequence-id is incremented with each packet and may wrap around. - // It starts at 0 and is reset to 0 when a new command - // begins in the Command Phase. - // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html - _resetSequenceId() { - this.sequenceId = 0; - this.compressedSequenceId = 0; - } - - _bumpCompressedSequenceId(numPackets) { - this.compressedSequenceId += numPackets; - this.compressedSequenceId %= 256; - } - - _bumpSequenceId(numPackets) { - this.sequenceId += numPackets; - this.sequenceId %= 256; - } - - writePacket(packet) { - const MAX_PACKET_LENGTH = 16777215; - const length = packet.length(); - let chunk, offset, header; - if (length < MAX_PACKET_LENGTH) { - packet.writeHeader(this.sequenceId); - if (this.config.debug) { - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})` - ); - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}` - ); - } - this._bumpSequenceId(1); - this.write(packet.buffer); - } else { - if (this.config.debug) { - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:` - ); - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})` - ); - } - for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) { - chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH); - if (chunk.length === MAX_PACKET_LENGTH) { - header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]); - } else { - header = Buffer.from([ - chunk.length & 0xff, - (chunk.length >> 8) & 0xff, - (chunk.length >> 16) & 0xff, - this.sequenceId - ]); - } - this._bumpSequenceId(1); - this.write(header); - this.write(chunk); - } - } - } - - // 0.11+ environment - startTLS(onSecure) { - if (this.config.debug) { - // eslint-disable-next-line no-console - console.log('Upgrading connection to TLS'); - } - const secureContext = Tls.createSecureContext({ - ca: this.config.ssl.ca, - cert: this.config.ssl.cert, - ciphers: this.config.ssl.ciphers, - key: this.config.ssl.key, - passphrase: this.config.ssl.passphrase, - minVersion: this.config.ssl.minVersion, - maxVersion: this.config.ssl.maxVersion - }); - const rejectUnauthorized = this.config.ssl.rejectUnauthorized; - const verifyIdentity = this.config.ssl.verifyIdentity; - const servername = this.config.host; - - let secureEstablished = false; - this.stream.removeAllListeners('data'); - const secureSocket = Tls.connect({ - rejectUnauthorized, - requestCert: rejectUnauthorized, - checkServerIdentity: verifyIdentity - ? Tls.checkServerIdentity - : function() { return undefined; }, - secureContext, - isServer: false, - socket: this.stream, - servername - }, () => { - secureEstablished = true; - if (rejectUnauthorized) { - if (typeof servername === 'string' && verifyIdentity) { - const cert = secureSocket.getPeerCertificate(true); - const serverIdentityCheckError = Tls.checkServerIdentity(servername, cert); - if (serverIdentityCheckError) { - onSecure(serverIdentityCheckError); - return; - } - } - } - onSecure(); - }); - // error handler for secure socket - secureSocket.on('error', err => { - if (secureEstablished) { - this._handleNetworkError(err); - } else { - onSecure(err); - } - }); - secureSocket.on('data', data => { - this.packetParser.execute(data); - }); - this.write = buffer => secureSocket.write(buffer); - } - - protocolError(message, code) { - // Starting with MySQL 8.0.24, if the client closes the connection - // unexpectedly, the server will send a last ERR Packet, which we can - // safely ignore. - // https://dev.mysql.com/worklog/task/?id=12999 - if (this._closing) { - return; - } - - const err = new Error(message); - err.fatal = true; - err.code = code || 'PROTOCOL_ERROR'; - this.emit('error', err); - } - - get fatalError() { - return this._fatalError; - } - - handlePacket(packet) { - if (this._paused) { - this._paused_packets.push(packet); - return; - } - if (this.config.debug) { - if (packet) { - // eslint-disable-next-line no-console - console.log( - ` raw: ${packet.buffer - .slice(packet.offset, packet.offset + packet.length()) - .toString('hex')}` - ); - // eslint-disable-next-line no-console - console.trace(); - const commandName = this._command - ? this._command._commandName - : '(no command)'; - const stateName = this._command - ? this._command.stateName() - : '(no command)'; - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})` - ); - } - } - if (!this._command) { - const marker = packet.peekByte(); - // If it's an Err Packet, we should use it. - if (marker === 0xff) { - const error = Packets.Error.fromPacket(packet); - this.protocolError(error.message, error.code); - } else { - // Otherwise, it means it's some other unexpected packet. - this.protocolError( - 'Unexpected packet while no commands in the queue', - 'PROTOCOL_UNEXPECTED_PACKET' - ); - } - this.close(); - return; - } - if (packet) { - // Note: when server closes connection due to inactivity, Err packet ER_CLIENT_INTERACTION_TIMEOUT from MySQL 8.0.24, sequenceId will be 0 - if (this.sequenceId !== packet.sequenceId) { - const err = new Error( - `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}` - ); - err.expected = this.sequenceId; - err.received = packet.sequenceId; - this.emit('warn', err); // REVIEW - // eslint-disable-next-line no-console - console.error(err.message); - } - this._bumpSequenceId(packet.numPackets); - } - try { - if (this._fatalError) { - // skip remaining packets after client is in the error state - return; - } - const done = this._command.execute(packet, this); - if (done) { - this._command = this._commands.shift(); - if (this._command) { - this.sequenceId = 0; - this.compressedSequenceId = 0; - this.handlePacket(); - } - } - } catch (err) { - this._handleFatalError(err); - this.stream.destroy(); - } - } - - addCommand(cmd) { - // this.compressedSequenceId = 0; - // this.sequenceId = 0; - if (this.config.debug) { - const commandName = cmd.constructor.name; - // eslint-disable-next-line no-console - console.log(`Add command: ${commandName}`); - cmd._commandName = commandName; - } - if (!this._command) { - this._command = cmd; - this.handlePacket(); - } else { - this._commands.push(cmd); - } - return cmd; - } - - format(sql, values) { - if (typeof this.config.queryFormat === 'function') { - return this.config.queryFormat.call( - this, - sql, - values, - this.config.timezone - ); - } - const opts = { - sql: sql, - values: values - }; - this._resolveNamedPlaceholders(opts); - return SqlString.format( - opts.sql, - opts.values, - this.config.stringifyObjects, - this.config.timezone - ); - } - - escape(value) { - return SqlString.escape(value, false, this.config.timezone); - } - - escapeId(value) { - return SqlString.escapeId(value, false); - } - - raw(sql) { - return SqlString.raw(sql); - } - - _resolveNamedPlaceholders(options) { - let unnamed; - if (this.config.namedPlaceholders || options.namedPlaceholders) { - if (Array.isArray(options.values)) { - // if an array is provided as the values, assume the conversion is not necessary. - // this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled. - return - } - if (convertNamedPlaceholders === null) { - convertNamedPlaceholders = require('named-placeholders')(); - } - unnamed = convertNamedPlaceholders(options.sql, options.values); - options.sql = unnamed[0]; - options.values = unnamed[1]; - } - } - - query(sql, values, cb) { - let cmdQuery; - if (sql.constructor === Commands.Query) { - cmdQuery = sql; - } else { - cmdQuery = Connection.createQuery(sql, values, cb, this.config); - } - this._resolveNamedPlaceholders(cmdQuery); - const rawSql = this.format(cmdQuery.sql, cmdQuery.values !== undefined ? cmdQuery.values : []); - cmdQuery.sql = rawSql; - return this.addCommand(cmdQuery); - } - - pause() { - this._paused = true; - this.stream.pause(); - } - - resume() { - let packet; - this._paused = false; - while ((packet = this._paused_packets.shift())) { - this.handlePacket(packet); - // don't resume if packet handler paused connection - if (this._paused) { - return; - } - } - this.stream.resume(); - } - - // TODO: named placeholders support - prepare(options, cb) { - if (typeof options === 'string') { - options = { sql: options }; - } - return this.addCommand(new Commands.Prepare(options, cb)); - } - - unprepare(sql) { - let options = {}; - if (typeof sql === 'object') { - options = sql; - } else { - options.sql = sql; - } - const key = Connection.statementKey(options); - const stmt = this._statements.get(key); - if (stmt) { - this._statements.delete(key); - stmt.close(); - } - return stmt; - } - - execute(sql, values, cb) { - let options = { - infileStreamFactory: this.config.infileStreamFactory - }; - if (typeof sql === 'object') { - // execute(options, cb) - options = { - ...options, - ...sql, - sql: sql.sql, - values: sql.values - }; - if (typeof values === 'function') { - cb = values; - } else { - options.values = options.values || values; - } - } else if (typeof values === 'function') { - // execute(sql, cb) - cb = values; - options.sql = sql; - options.values = undefined; - } else { - // execute(sql, values, cb) - options.sql = sql; - options.values = values; - } - this._resolveNamedPlaceholders(options); - // check for values containing undefined - if (options.values) { - //If namedPlaceholder is not enabled and object is passed as bind parameters - if (!Array.isArray(options.values)) { - throw new TypeError( - 'Bind parameters must be array if namedPlaceholders parameter is not enabled' - ); - } - options.values.forEach(val => { - //If namedPlaceholder is not enabled and object is passed as bind parameters - if (!Array.isArray(options.values)) { - throw new TypeError( - 'Bind parameters must be array if namedPlaceholders parameter is not enabled' - ); - } - if (val === undefined) { - throw new TypeError( - 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null' - ); - } - if (typeof val === 'function') { - throw new TypeError( - 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first' - ); - } - }); - } - const executeCommand = new Commands.Execute(options, cb); - const prepareCommand = new Commands.Prepare(options, (err, stmt) => { - if (err) { - // skip execute command if prepare failed, we have main - // combined callback here - executeCommand.start = function() { - return null; - }; - if (cb) { - cb(err); - } else { - executeCommand.emit('error', err); - } - executeCommand.emit('end'); - return; - } - executeCommand.statement = stmt; - }); - this.addCommand(prepareCommand); - this.addCommand(executeCommand); - return executeCommand; - } - - changeUser(options, callback) { - if (!callback && typeof options === 'function') { - callback = options; - options = {}; - } - const charsetNumber = options.charset - ? ConnectionConfig.getCharsetNumber(options.charset) - : this.config.charsetNumber; - return this.addCommand( - new Commands.ChangeUser( - { - user: options.user || this.config.user, - // for the purpose of multi-factor authentication, or not, the main - // password (used for the 1st authentication factor) can also be - // provided via the "password1" option - password: options.password || options.password1 || this.config.password || this.config.password1, - password2: options.password2 || this.config.password2, - password3: options.password3 || this.config.password3, - passwordSha1: options.passwordSha1 || this.config.passwordSha1, - database: options.database || this.config.database, - timeout: options.timeout, - charsetNumber: charsetNumber, - currentConfig: this.config - }, - err => { - if (err) { - err.fatal = true; - } - if (callback) { - callback(err); - } - } - ) - ); - } - - // transaction helpers - beginTransaction(cb) { - return this.query('START TRANSACTION', cb); - } - - commit(cb) { - return this.query('COMMIT', cb); - } - - rollback(cb) { - return this.query('ROLLBACK', cb); - } - - ping(cb) { - return this.addCommand(new Commands.Ping(cb)); - } - - _registerSlave(opts, cb) { - return this.addCommand(new Commands.RegisterSlave(opts, cb)); - } - - _binlogDump(opts, cb) { - return this.addCommand(new Commands.BinlogDump(opts, cb)); - } - - // currently just alias to close - destroy() { - this.close(); - } - - close() { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - this._closing = true; - this.stream.end(); - this.addCommand = this._addCommandClosedState; - } - - createBinlogStream(opts) { - // TODO: create proper stream class - // TODO: use through2 - let test = 1; - const stream = new Readable({ objectMode: true }); - stream._read = function() { - return { - data: test++ - }; - }; - this._registerSlave(opts, () => { - const dumpCmd = this._binlogDump(opts); - dumpCmd.on('event', ev => { - stream.push(ev); - }); - dumpCmd.on('eof', () => { - stream.push(null); - // if non-blocking, then close stream to prevent errors - if (opts.flags && opts.flags & 0x01) { - this.close(); - } - }); - // TODO: pipe errors as well - }); - return stream; - } - - connect(cb) { - if (!cb) { - return; - } - if (this._fatalError || this._protocolError) { - return cb(this._fatalError || this._protocolError); - } - if (this._handshakePacket) { - return cb(null, this); - } - let connectCalled = 0; - function callbackOnce(isErrorHandler) { - return function(param) { - if (!connectCalled) { - if (isErrorHandler) { - cb(param); - } else { - cb(null, param); - } - } - connectCalled = 1; - }; - } - this.once('error', callbackOnce(true)); - this.once('connect', callbackOnce(false)); - } - - // =================================== - // outgoing server connection methods - // =================================== - writeColumns(columns) { - this.writePacket(Packets.ResultSetHeader.toPacket(columns.length)); - columns.forEach(column => { - this.writePacket( - Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding) - ); - }); - this.writeEof(); - } - - // row is array of columns, not hash - writeTextRow(column) { - this.writePacket( - Packets.TextRow.toPacket(column, this.serverConfig.encoding) - ); - } - - writeBinaryRow(column) { - this.writePacket( - Packets.BinaryRow.toPacket(column, this.serverConfig.encoding) - ); - } - - writeTextResult(rows, columns, binary=false) { - this.writeColumns(columns); - rows.forEach(row => { - const arrayRow = new Array(columns.length); - columns.forEach(column => { - arrayRow.push(row[column.name]); - }); - if(binary) { - this.writeBinaryRow(arrayRow); - } - else this.writeTextRow(arrayRow); - }); - this.writeEof(); - } - - writeEof(warnings, statusFlags) { - this.writePacket(Packets.EOF.toPacket(warnings, statusFlags)); - } - - writeOk(args) { - if (!args) { - args = { affectedRows: 0 }; - } - this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding)); - } - - writeError(args) { - // if we want to send error before initial hello was sent, use default encoding - const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8'; - this.writePacket(Packets.Error.toPacket(args, encoding)); - } - - serverHandshake(args) { - this.serverConfig = args; - this.serverConfig.encoding = - CharsetToEncoding[this.serverConfig.characterSet]; - return this.addCommand(new Commands.ServerHandshake(args)); - } - - // =============================================================== - end(callback) { - if (this.config.isServer) { - this._closing = true; - const quitCmd = new EventEmitter(); - setImmediate(() => { - this.stream.end(); - quitCmd.emit('end'); - }); - return quitCmd; - } - // trigger error if more commands enqueued after end command - const quitCmd = this.addCommand(new Commands.Quit(callback)); - this.addCommand = this._addCommandClosedState; - return quitCmd; - } - - static createQuery(sql, values, cb, config) { - let options = { - rowsAsArray: config.rowsAsArray, - infileStreamFactory: config.infileStreamFactory - }; - if (typeof sql === 'object') { - // query(options, cb) - options = { - ...options, - ...sql, - sql: sql.sql, - values: sql.values - }; - if (typeof values === 'function') { - cb = values; - } else if (values !== undefined) { - options.values = values; - } - } else if (typeof values === 'function') { - // query(sql, cb) - cb = values; - options.sql = sql; - options.values = undefined; - } else { - // query(sql, values, cb) - options.sql = sql; - options.values = values; - } - return new Commands.Query(options, cb); - } - - static statementKey(options) { - return ( - `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}` - ); - } } module.exports = Connection; diff --git a/lib/create_pool.js b/lib/create_pool.js index 7f782aff9a..ad19d77b7f 100644 --- a/lib/create_pool.js +++ b/lib/create_pool.js @@ -7,4 +7,4 @@ function createPool(config) { return new Pool({ config: new PoolConfig(config) }); } -module.exports = createPool +module.exports = createPool; diff --git a/lib/inherit_events.js b/lib/inherit_events.js index d90b7eb629..8e59d84fe5 100644 --- a/lib/inherit_events.js +++ b/lib/inherit_events.js @@ -3,7 +3,7 @@ function inheritEvents(source, target, events) { const listeners = {}; target - .on('newListener', eventName => { + .on('newListener', (eventName) => { if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { source.on( eventName, @@ -12,11 +12,11 @@ function inheritEvents(source, target, events) { args.unshift(eventName); target.emit.apply(target, args); - }) + }), ); } }) - .on('removeListener', eventName => { + .on('removeListener', (eventName) => { if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { source.removeListener(eventName, listeners[eventName]); delete listeners[eventName]; diff --git a/lib/pool.js b/lib/pool.js index 534bb62aa2..05a619e81b 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -1,237 +1,12 @@ 'use strict'; -const process = require('process'); -const SqlString = require('sqlstring'); -const EventEmitter = require('events').EventEmitter; -const PoolConnection = require('./pool_connection.js'); -const Queue = require('denque'); -const Connection = require('./connection.js'); - -function spliceConnection(queue, connection) { - const len = queue.length; - for (let i = 0; i < len; i++) { - if (queue.get(i) === connection) { - queue.removeOne(i); - break; - } - } -} - -class Pool extends EventEmitter { - constructor(options) { - super(); - this.config = options.config; - this.config.connectionConfig.pool = this; - this._allConnections = new Queue(); - this._freeConnections = new Queue(); - this._connectionQueue = new Queue(); - this._closed = false; - if (this.config.maxIdle < this.config.connectionLimit) { - // create idle connection timeout automatically release job - this._removeIdleTimeoutConnections(); - } - } +const BasePool = require('./base_pool.js'); +class Pool extends BasePool { promise(promiseImpl) { const PromisePool = require('./promise_pool.js'); return new PromisePool(this, promiseImpl); } - - getConnection(cb) { - if (this._closed) { - return process.nextTick(() => cb(new Error('Pool is closed.'))); - } - let connection; - if (this._freeConnections.length > 0) { - connection = this._freeConnections.pop(); - this.emit('acquire', connection); - return process.nextTick(() => cb(null, connection)); - } - if ( - this.config.connectionLimit === 0 || - this._allConnections.length < this.config.connectionLimit - ) { - connection = new PoolConnection(this, { - config: this.config.connectionConfig - }); - this._allConnections.push(connection); - return connection.connect(err => { - if (this._closed) { - return cb(new Error('Pool is closed.')); - } - if (err) { - return cb(err); - } - this.emit('connection', connection); - this.emit('acquire', connection); - return cb(null, connection); - }); - } - if (!this.config.waitForConnections) { - return process.nextTick(() => cb(new Error('No connections available.'))); - } - if ( - this.config.queueLimit && - this._connectionQueue.length >= this.config.queueLimit - ) { - return cb(new Error('Queue limit reached.')); - } - this.emit('enqueue'); - return this._connectionQueue.push(cb); - } - - releaseConnection(connection) { - let cb; - if (!connection._pool) { - // The connection has been removed from the pool and is no longer good. - if (this._connectionQueue.length) { - cb = this._connectionQueue.shift(); - process.nextTick(this.getConnection.bind(this, cb)); - } - } else if (this._connectionQueue.length) { - cb = this._connectionQueue.shift(); - process.nextTick(cb.bind(null, null, connection)); - } else { - this._freeConnections.push(connection); - this.emit('release', connection); - } - } - - end(cb) { - this._closed = true; - clearTimeout(this._removeIdleTimeoutConnectionsTimer); - if (typeof cb !== 'function') { - cb = function(err) { - if (err) { - throw err; - } - }; - } - let calledBack = false; - let closedConnections = 0; - let connection; - const endCB = function(err) { - if (calledBack) { - return; - } - if (err || ++closedConnections >= this._allConnections.length) { - calledBack = true; - cb(err); - return; - } - }.bind(this); - if (this._allConnections.length === 0) { - endCB(); - return; - } - for (let i = 0; i < this._allConnections.length; i++) { - connection = this._allConnections.get(i); - connection._realEnd(endCB); - } - } - - query(sql, values, cb) { - const cmdQuery = Connection.createQuery( - sql, - values, - cb, - this.config.connectionConfig - ); - if (typeof cmdQuery.namedPlaceholders === 'undefined') { - cmdQuery.namedPlaceholders = this.config.connectionConfig.namedPlaceholders; - } - this.getConnection((err, conn) => { - if (err) { - if (typeof cmdQuery.onResult === 'function') { - cmdQuery.onResult(err); - } else { - cmdQuery.emit('error', err); - } - return; - } - try { - conn.query(cmdQuery).once('end', () => { - conn.release(); - }); - } catch (e) { - conn.release(); - throw e; - } - }); - return cmdQuery; - } - - execute(sql, values, cb) { - // TODO construct execute command first here and pass it to connection.execute - // so that polymorphic arguments logic is there in one place - if (typeof values === 'function') { - cb = values; - values = []; - } - this.getConnection((err, conn) => { - if (err) { - return cb(err); - } - try { - conn.execute(sql, values, cb).once('end', () => { - conn.release(); - }); - } catch (e) { - conn.release(); - return cb(e); - } - }); - } - - _removeConnection(connection) { - // Remove connection from all connections - spliceConnection(this._allConnections, connection); - // Remove connection from free connections - spliceConnection(this._freeConnections, connection); - this.releaseConnection(connection); - } - - _removeIdleTimeoutConnections() { - if (this._removeIdleTimeoutConnectionsTimer) { - clearTimeout(this._removeIdleTimeoutConnectionsTimer); - } - - this._removeIdleTimeoutConnectionsTimer = setTimeout(() => { - try { - while ( - this._freeConnections.length > this.config.maxIdle || - (this._freeConnections.length > 0 && - Date.now() - this._freeConnections.get(0).lastActiveTime > - this.config.idleTimeout) - ) { - this._freeConnections.get(0).destroy(); - } - } finally { - this._removeIdleTimeoutConnections(); - } - }, 1000); - } - - format(sql, values) { - return SqlString.format( - sql, - values, - this.config.connectionConfig.stringifyObjects, - this.config.connectionConfig.timezone - ); - } - - escape(value) { - return SqlString.escape( - value, - this.config.connectionConfig.stringifyObjects, - this.config.connectionConfig.timezone - ); - } - - escapeId(value) { - return SqlString.escapeId(value, false); - } } module.exports = Pool; diff --git a/lib/pool_connection.js b/lib/pool_connection.js index f2e64f0755..d7639da9a6 100644 --- a/lib/pool_connection.js +++ b/lib/pool_connection.js @@ -1,69 +1,12 @@ 'use strict'; -const Connection = require('./connection.js'); - -class PoolConnection extends Connection { - constructor(pool, options) { - super(options); - this._pool = pool; - // The last active time of this connection - this.lastActiveTime = Date.now(); - // When a fatal error occurs the connection's protocol ends, which will cause - // the connection to end as well, thus we only need to watch for the end event - // and we will be notified of disconnects. - // REVIEW: Moved to `once` - this.once('end', () => { - this._removeFromPool(); - }); - this.once('error', () => { - this._removeFromPool(); - }); - } - - release() { - if (!this._pool || this._pool._closed) { - return; - } - // update last active time - this.lastActiveTime = Date.now(); - this._pool.releaseConnection(this); - } +const BasePoolConnection = require('./base_pool_connection.js'); +class PoolConnection extends BasePoolConnection { promise(promiseImpl) { const PromisePoolConnection = require('./promise_pool_connection.js'); return new PromisePoolConnection(this, promiseImpl); } - - end() { - const err = new Error( - 'Calling conn.end() to release a pooled connection is ' + - 'deprecated. In next version calling conn.end() will be ' + - 'restored to default conn.end() behavior. Use ' + - 'conn.release() instead.' - ); - this.emit('warn', err); - // eslint-disable-next-line no-console - console.warn(err.message); - this.release(); - } - - destroy() { - this._removeFromPool(); - super.destroy(); - } - - _removeFromPool() { - if (!this._pool || this._pool._closed) { - return; - } - const pool = this._pool; - this._pool = null; - pool._removeConnection(this); - } } -PoolConnection.statementKey = Connection.statementKey; module.exports = PoolConnection; - -// TODO: Remove this when we are removing PoolConnection#end -PoolConnection.prototype._realEnd = Connection.prototype.end; diff --git a/lib/promise_connection.js b/lib/promise_connection.js index 45f4a1af1e..f1dccc08bc 100644 --- a/lib/promise_connection.js +++ b/lib/promise_connection.js @@ -1,10 +1,10 @@ 'use strict'; +const EventEmitter = require('events').EventEmitter; const PromisePreparedStatementInfo = require('./promise_prepared_statement_info.js'); const makeDoneCb = require('./make_done_cb.js'); const inheritEvents = require('./inherit_events.js'); -const { Connection } = require('../index.js'); -const EventEmitter = require('events').EventEmitter; +const BaseConnection = require('./base_connection.js'); class PromiseConnection extends EventEmitter { constructor(connection, promiseImpl) { @@ -16,7 +16,7 @@ class PromiseConnection extends EventEmitter { 'drain', 'connect', 'end', - 'enqueue' + 'enqueue', ]); } @@ -29,7 +29,7 @@ class PromiseConnection extends EventEmitter { const localErr = new Error(); if (typeof params === 'function') { throw new Error( - 'Callback function is not available with promise clients.' + 'Callback function is not available with promise clients.', ); } return new this.Promise((resolve, reject) => { @@ -47,7 +47,7 @@ class PromiseConnection extends EventEmitter { const localErr = new Error(); if (typeof params === 'function') { throw new Error( - 'Callback function is not available with promise clients.' + 'Callback function is not available with promise clients.', ); } return new this.Promise((resolve, reject) => { @@ -61,7 +61,7 @@ class PromiseConnection extends EventEmitter { } end() { - return new this.Promise(resolve => { + return new this.Promise((resolve) => { this.connection.end(resolve); }); } @@ -97,7 +97,7 @@ class PromiseConnection extends EventEmitter { const c = this.connection; const localErr = new Error(); return new this.Promise((resolve, reject) => { - c.ping(err => { + c.ping((err) => { if (err) { localErr.message = err.message; localErr.code = err.code; @@ -147,7 +147,7 @@ class PromiseConnection extends EventEmitter { } else { const wrappedStatement = new PromisePreparedStatementInfo( statement, - promiseImpl + promiseImpl, ); resolve(wrappedStatement); } @@ -159,7 +159,7 @@ class PromiseConnection extends EventEmitter { const c = this.connection; const localErr = new Error(); return new this.Promise((resolve, reject) => { - c.changeUser(options, err => { + c.changeUser(options, (err) => { if (err) { localErr.message = err.message; localErr.code = err.code; @@ -192,14 +192,14 @@ class PromiseConnection extends EventEmitter { const func = functionsToWrap[i]; if ( - typeof Connection.prototype[func] === 'function' && + typeof BaseConnection.prototype[func] === 'function' && PromiseConnection.prototype[func] === undefined ) { PromiseConnection.prototype[func] = (function factory(funcName) { return function () { - return Connection.prototype[funcName].apply( + return BaseConnection.prototype[funcName].apply( this.connection, - arguments + arguments, ); }; })(func); @@ -216,6 +216,7 @@ class PromiseConnection extends EventEmitter { 'pause', 'pipe', 'resume', - 'unprepare' + 'unprepare', ]); -module.exports = PromiseConnection + +module.exports = PromiseConnection; diff --git a/lib/promise_pool.js b/lib/promise_pool.js index 24b3d32107..e75df4b97e 100644 --- a/lib/promise_pool.js +++ b/lib/promise_pool.js @@ -4,7 +4,7 @@ const EventEmitter = require('events').EventEmitter; const makeDoneCb = require('./make_done_cb.js'); const PromisePoolConnection = require('./promise_pool_connection.js'); const inheritEvents = require('./inherit_events.js'); -const Pool = require('./pool'); +const BasePool = require('./base_pool.js'); class PromisePool extends EventEmitter { constructor(pool, thePromise) { @@ -36,7 +36,7 @@ class PromisePool extends EventEmitter { const localErr = new Error(); if (typeof args === 'function') { throw new Error( - 'Callback function is not available with promise clients.' + 'Callback function is not available with promise clients.', ); } return new this.Promise((resolve, reject) => { @@ -54,7 +54,7 @@ class PromisePool extends EventEmitter { const localErr = new Error(); if (typeof args === 'function') { throw new Error( - 'Callback function is not available with promise clients.' + 'Callback function is not available with promise clients.', ); } return new this.Promise((resolve, reject) => { @@ -71,7 +71,7 @@ class PromisePool extends EventEmitter { const corePool = this.pool; const localErr = new Error(); return new this.Promise((resolve, reject) => { - corePool.end(err => { + corePool.end((err) => { if (err) { localErr.message = err.message; localErr.code = err.code; @@ -92,12 +92,12 @@ class PromisePool extends EventEmitter { const func = functionsToWrap[i]; if ( - typeof Pool.prototype[func] === 'function' && + typeof BasePool.prototype[func] === 'function' && PromisePool.prototype[func] === undefined ) { PromisePool.prototype[func] = (function factory(funcName) { return function () { - return Pool.prototype[funcName].apply(this.pool, arguments); + return BasePool.prototype[funcName].apply(this.pool, arguments); }; })(func); } @@ -106,7 +106,7 @@ class PromisePool extends EventEmitter { // synchronous functions 'escape', 'escapeId', - 'format' + 'format', ]); module.exports = PromisePool; diff --git a/lib/promise_pool_connection.js b/lib/promise_pool_connection.js index 3ea925f635..4fafc498dc 100644 --- a/lib/promise_pool_connection.js +++ b/lib/promise_pool_connection.js @@ -1,7 +1,7 @@ 'use strict'; const PromiseConnection = require('./promise_connection.js'); -const PoolConnection = require('./pool_connection.js'); +const BasePoolConnection = require('./base_pool_connection.js'); class PromisePoolConnection extends PromiseConnection { constructor(connection, promiseImpl) { @@ -9,9 +9,9 @@ class PromisePoolConnection extends PromiseConnection { } destroy() { - return PoolConnection.prototype.destroy.apply( + return BasePoolConnection.prototype.destroy.apply( this.connection, - arguments + arguments, ); } } diff --git a/lib/promise_prepared_statement_info.js b/lib/promise_prepared_statement_info.js index 410f999d3f..47b9bedf1e 100644 --- a/lib/promise_prepared_statement_info.js +++ b/lib/promise_prepared_statement_info.js @@ -22,11 +22,11 @@ class PromisePreparedStatementInfo { } close() { - return new this.Promise(resolve => { + return new this.Promise((resolve) => { this.statement.close(); resolve(); }); } } -module.exports = PromisePreparedStatementInfo +module.exports = PromisePreparedStatementInfo; diff --git a/promise.js b/promise.js index 9a29f8d20f..0e51f31cd3 100644 --- a/promise.js +++ b/promise.js @@ -20,15 +20,15 @@ function createConnectionPromise(opts) { if (!thePromise) { throw new Error( 'no Promise implementation available.' + - 'Use promise-enabled node version or pass userland Promise' + - " implementation as parameter, for example: { Promise: require('bluebird') }" + 'Use promise-enabled node version or pass userland Promise' + + " implementation as parameter, for example: { Promise: require('bluebird') }", ); } return new thePromise((resolve, reject) => { coreConnection.once('connect', () => { resolve(new PromiseConnection(coreConnection, thePromise)); }); - coreConnection.once('error', err => { + coreConnection.once('error', (err) => { createConnectionErr.message = err.message; createConnectionErr.code = err.code; createConnectionErr.errno = err.errno; @@ -41,24 +41,20 @@ function createConnectionPromise(opts) { // note: the callback of "changeUser" is not called on success // hence there is no possibility to call "resolve" - - function createPromisePool(opts) { const corePool = createPool(opts); const thePromise = opts.Promise || Promise; if (!thePromise) { throw new Error( 'no Promise implementation available.' + - 'Use promise-enabled node version or pass userland Promise' + - " implementation as parameter, for example: { Promise: require('bluebird') }" + 'Use promise-enabled node version or pass userland Promise' + + " implementation as parameter, for example: { Promise: require('bluebird') }", ); } return new PromisePool(corePool, thePromise); } - - class PromisePoolCluster extends EventEmitter { constructor(poolCluster, thePromise) { super(); @@ -70,13 +66,17 @@ class PromisePoolCluster extends EventEmitter { getConnection(pattern, selector) { const corePoolCluster = this.poolCluster; return new this.Promise((resolve, reject) => { - corePoolCluster.getConnection(pattern, selector, (err, coreConnection) => { - if (err) { - reject(err); - } else { - resolve(new PromisePoolConnection(coreConnection, this.Promise)); - } - }); + corePoolCluster.getConnection( + pattern, + selector, + (err, coreConnection) => { + if (err) { + reject(err); + } else { + resolve(new PromisePoolConnection(coreConnection, this.Promise)); + } + }, + ); }); } @@ -85,7 +85,7 @@ class PromisePoolCluster extends EventEmitter { const localErr = new Error(); if (typeof args === 'function') { throw new Error( - 'Callback function is not available with promise clients.' + 'Callback function is not available with promise clients.', ); } return new this.Promise((resolve, reject) => { @@ -99,7 +99,7 @@ class PromisePoolCluster extends EventEmitter { const localErr = new Error(); if (typeof args === 'function') { throw new Error( - 'Callback function is not available with promise clients.' + 'Callback function is not available with promise clients.', ); } return new this.Promise((resolve, reject) => { @@ -111,7 +111,7 @@ class PromisePoolCluster extends EventEmitter { of(pattern, selector) { return new PromisePoolCluster( this.poolCluster.of(pattern, selector), - this.Promise + this.Promise, ); } @@ -119,7 +119,7 @@ class PromisePoolCluster extends EventEmitter { const corePoolCluster = this.poolCluster; const localErr = new Error(); return new this.Promise((resolve, reject) => { - corePoolCluster.end(err => { + corePoolCluster.end((err) => { if (err) { localErr.message = err.message; localErr.code = err.code; @@ -148,14 +148,15 @@ class PromisePoolCluster extends EventEmitter { ) { PromisePoolCluster.prototype[func] = (function factory(funcName) { return function () { - return PoolCluster.prototype[funcName].apply(this.poolCluster, arguments); + return PoolCluster.prototype[funcName].apply( + this.poolCluster, + arguments, + ); }; })(func); } } -})([ - 'add' -]); +})(['add']); function createPromisePoolCluster(opts) { const corePoolCluster = createPoolCluster(opts); @@ -163,8 +164,8 @@ function createPromisePoolCluster(opts) { if (!thePromise) { throw new Error( 'no Promise implementation available.' + - 'Use promise-enabled node version or pass userland Promise' + - " implementation as parameter, for example: { Promise: require('bluebird') }" + 'Use promise-enabled node version or pass userland Promise' + + " implementation as parameter, for example: { Promise: require('bluebird') }", ); } return new PromisePoolCluster(corePoolCluster, thePromise); @@ -184,17 +185,17 @@ exports.PromisePoolConnection = PromisePoolConnection; exports.__defineGetter__('Types', () => require('./lib/constants/types.js')); exports.__defineGetter__('Charsets', () => - require('./lib/constants/charsets.js') + require('./lib/constants/charsets.js'), ); exports.__defineGetter__('CharsetToEncoding', () => - require('./lib/constants/charset_encodings.js') + require('./lib/constants/charset_encodings.js'), ); -exports.setMaxParserCache = function(max) { +exports.setMaxParserCache = function (max) { parserCache.setMaxCache(max); }; -exports.clearParserCache = function() { +exports.clearParserCache = function () { parserCache.clearCache(); }; From 623e8c91fa61d37244045b917730b54d94cbe0b2 Mon Sep 17 00:00:00 2001 From: wellwelwel <46850407+wellwelwel@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:11:56 -0300 Subject: [PATCH 9/9] chore: better organize new files --- lib/{base_connection.js => base/connection.js} | 10 +++++----- lib/{base_pool.js => base/pool.js} | 4 ++-- .../pool_connection.js} | 2 +- lib/connection.js | 4 ++-- lib/pool.js | 4 ++-- lib/pool_connection.js | 4 ++-- lib/{promise_connection.js => promise/connection.js} | 4 ++-- lib/{ => promise}/inherit_events.js | 0 lib/{ => promise}/make_done_cb.js | 0 lib/{promise_pool.js => promise/pool.js} | 4 ++-- .../pool_connection.js} | 4 ++-- .../prepared_statement_info.js} | 0 promise.js | 10 +++++----- 13 files changed, 25 insertions(+), 25 deletions(-) rename lib/{base_connection.js => base/connection.js} (99%) rename lib/{base_pool.js => base/pool.js} (98%) rename lib/{base_pool_connection.js => base/pool_connection.js} (96%) rename lib/{promise_connection.js => promise/connection.js} (97%) rename lib/{ => promise}/inherit_events.js (100%) rename lib/{ => promise}/make_done_cb.js (100%) rename lib/{promise_pool.js => promise/pool.js} (96%) rename lib/{promise_pool_connection.js => promise/pool_connection.js} (71%) rename lib/{promise_prepared_statement_info.js => promise/prepared_statement_info.js} (100%) diff --git a/lib/base_connection.js b/lib/base/connection.js similarity index 99% rename from lib/base_connection.js rename to lib/base/connection.js index bb50135813..d00ce8c1f6 100644 --- a/lib/base_connection.js +++ b/lib/base/connection.js @@ -23,11 +23,11 @@ const Readable = require('stream').Readable; const Queue = require('denque'); const SqlString = require('sqlstring'); const { createLRU } = require('lru.min'); -const PacketParser = require('./packet_parser.js'); -const Packets = require('./packets/index.js'); -const Commands = require('./commands/index.js'); -const ConnectionConfig = require('./connection_config.js'); -const CharsetToEncoding = require('./constants/charset_encodings.js'); +const PacketParser = require('../packet_parser.js'); +const Packets = require('../packets/index.js'); +const Commands = require('../commands/index.js'); +const ConnectionConfig = require('../connection_config.js'); +const CharsetToEncoding = require('../constants/charset_encodings.js'); let _connectionId = 0; diff --git a/lib/base_pool.js b/lib/base/pool.js similarity index 98% rename from lib/base_pool.js rename to lib/base/pool.js index bcd6e7b1f3..47981f1943 100644 --- a/lib/base_pool.js +++ b/lib/base/pool.js @@ -3,9 +3,9 @@ const process = require('process'); const SqlString = require('sqlstring'); const EventEmitter = require('events').EventEmitter; -const PoolConnection = require('./pool_connection.js'); +const PoolConnection = require('../pool_connection.js'); const Queue = require('denque'); -const BaseConnection = require('./base_connection.js'); +const BaseConnection = require('./connection.js'); function spliceConnection(queue, connection) { const len = queue.length; diff --git a/lib/base_pool_connection.js b/lib/base/pool_connection.js similarity index 96% rename from lib/base_pool_connection.js rename to lib/base/pool_connection.js index 97b345efd7..74511ce587 100644 --- a/lib/base_pool_connection.js +++ b/lib/base/pool_connection.js @@ -1,6 +1,6 @@ 'use strict'; -const BaseConnection = require('./base_connection.js'); +const BaseConnection = require('./connection.js'); class BasePoolConnection extends BaseConnection { constructor(pool, options) { diff --git a/lib/connection.js b/lib/connection.js index fb93a3274a..b21cff85c4 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -1,10 +1,10 @@ 'use strict'; -const BaseConnection = require('./base_connection.js'); +const BaseConnection = require('./base/connection.js'); class Connection extends BaseConnection { promise(promiseImpl) { - const PromiseConnection = require('./promise_connection.js'); + const PromiseConnection = require('./promise/connection.js'); return new PromiseConnection(this, promiseImpl); } } diff --git a/lib/pool.js b/lib/pool.js index 05a619e81b..b0f1e2bdc5 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -1,10 +1,10 @@ 'use strict'; -const BasePool = require('./base_pool.js'); +const BasePool = require('./base/pool.js'); class Pool extends BasePool { promise(promiseImpl) { - const PromisePool = require('./promise_pool.js'); + const PromisePool = require('./promise/pool.js'); return new PromisePool(this, promiseImpl); } } diff --git a/lib/pool_connection.js b/lib/pool_connection.js index d7639da9a6..5cc94dc187 100644 --- a/lib/pool_connection.js +++ b/lib/pool_connection.js @@ -1,10 +1,10 @@ 'use strict'; -const BasePoolConnection = require('./base_pool_connection.js'); +const BasePoolConnection = require('./base/pool_connection.js'); class PoolConnection extends BasePoolConnection { promise(promiseImpl) { - const PromisePoolConnection = require('./promise_pool_connection.js'); + const PromisePoolConnection = require('./promise/pool_connection.js'); return new PromisePoolConnection(this, promiseImpl); } } diff --git a/lib/promise_connection.js b/lib/promise/connection.js similarity index 97% rename from lib/promise_connection.js rename to lib/promise/connection.js index f1dccc08bc..7cb0628e6e 100644 --- a/lib/promise_connection.js +++ b/lib/promise/connection.js @@ -1,10 +1,10 @@ 'use strict'; const EventEmitter = require('events').EventEmitter; -const PromisePreparedStatementInfo = require('./promise_prepared_statement_info.js'); +const PromisePreparedStatementInfo = require('./prepared_statement_info.js'); const makeDoneCb = require('./make_done_cb.js'); const inheritEvents = require('./inherit_events.js'); -const BaseConnection = require('./base_connection.js'); +const BaseConnection = require('../base/connection.js'); class PromiseConnection extends EventEmitter { constructor(connection, promiseImpl) { diff --git a/lib/inherit_events.js b/lib/promise/inherit_events.js similarity index 100% rename from lib/inherit_events.js rename to lib/promise/inherit_events.js diff --git a/lib/make_done_cb.js b/lib/promise/make_done_cb.js similarity index 100% rename from lib/make_done_cb.js rename to lib/promise/make_done_cb.js diff --git a/lib/promise_pool.js b/lib/promise/pool.js similarity index 96% rename from lib/promise_pool.js rename to lib/promise/pool.js index e75df4b97e..672caf03a9 100644 --- a/lib/promise_pool.js +++ b/lib/promise/pool.js @@ -2,9 +2,9 @@ const EventEmitter = require('events').EventEmitter; const makeDoneCb = require('./make_done_cb.js'); -const PromisePoolConnection = require('./promise_pool_connection.js'); +const PromisePoolConnection = require('./pool_connection.js'); const inheritEvents = require('./inherit_events.js'); -const BasePool = require('./base_pool.js'); +const BasePool = require('../base/pool.js'); class PromisePool extends EventEmitter { constructor(pool, thePromise) { diff --git a/lib/promise_pool_connection.js b/lib/promise/pool_connection.js similarity index 71% rename from lib/promise_pool_connection.js rename to lib/promise/pool_connection.js index 4fafc498dc..1da0113c06 100644 --- a/lib/promise_pool_connection.js +++ b/lib/promise/pool_connection.js @@ -1,7 +1,7 @@ 'use strict'; -const PromiseConnection = require('./promise_connection.js'); -const BasePoolConnection = require('./base_pool_connection.js'); +const PromiseConnection = require('./connection.js'); +const BasePoolConnection = require('../base/pool_connection.js'); class PromisePoolConnection extends PromiseConnection { constructor(connection, promiseImpl) { diff --git a/lib/promise_prepared_statement_info.js b/lib/promise/prepared_statement_info.js similarity index 100% rename from lib/promise_prepared_statement_info.js rename to lib/promise/prepared_statement_info.js diff --git a/promise.js b/promise.js index 0e51f31cd3..a0216f5660 100644 --- a/promise.js +++ b/promise.js @@ -7,11 +7,11 @@ const PoolCluster = require('./lib/pool_cluster.js'); const createConnection = require('./lib/create_connection.js'); const createPool = require('./lib/create_pool.js'); const createPoolCluster = require('./lib/create_pool_cluster.js'); -const PromiseConnection = require('./lib/promise_connection.js'); -const PromisePool = require('./lib/promise_pool.js'); -const makeDoneCb = require('./lib/make_done_cb.js'); -const PromisePoolConnection = require('./lib/promise_pool_connection.js'); -const inheritEvents = require('./lib/inherit_events.js'); +const PromiseConnection = require('./lib/promise/connection.js'); +const PromisePool = require('./lib/promise/pool.js'); +const makeDoneCb = require('./lib/promise/make_done_cb.js'); +const PromisePoolConnection = require('./lib/promise/pool_connection.js'); +const inheritEvents = require('./lib/promise/inherit_events.js'); function createConnectionPromise(opts) { const coreConnection = createConnection(opts);