diff --git a/index.js b/index.js index f33455e3f4..ceae6df7dc 100644 --- a/index.js +++ b/index.js @@ -2,30 +2,24 @@ const SqlString = require('sqlstring'); -const Connection = require('./lib/connection.js'); const ConnectionConfig = require('./lib/connection_config.js'); -const parserCache = require('./lib/parsers/parser_cache'); +const parserCache = require('./lib/parsers/parser_cache.js'); -exports.createConnection = function(opts) { - return new Connection({ config: new ConnectionConfig(opts) }); -}; +const Connection = require('./lib/connection.js'); +exports.createConnection = require('./lib/create_connection.js'); exports.connect = exports.createConnection; exports.Connection = Connection; exports.ConnectionConfig = ConnectionConfig; const Pool = require('./lib/pool.js'); const PoolCluster = require('./lib/pool_cluster.js'); +const createPool = require('./lib/create_pool.js'); +const createPoolCluster = require('./lib/create_pool_cluster.js'); -exports.createPool = function(config) { - const PoolConfig = require('./lib/pool_config.js'); - return new Pool({ config: new PoolConfig(config) }); -}; +exports.createPool = createPool; -exports.createPoolCluster = function(config) { - const PoolCluster = require('./lib/pool_cluster.js'); - return new PoolCluster(config); -}; +exports.createPoolCluster = createPoolCluster; exports.createQuery = Connection.createQuery; @@ -33,7 +27,7 @@ exports.Pool = Pool; exports.PoolCluster = PoolCluster; -exports.createServer = function(handler) { +exports.createServer = function (handler) { const Server = require('./lib/server.js'); const s = new Server(); if (handler) { @@ -42,7 +36,7 @@ exports.createServer = function(handler) { return s; }; -exports.PoolConnection = require('./lib/pool_connection'); +exports.PoolConnection = require('./lib/pool_connection.js'); exports.authPlugins = require('./lib/auth_plugins'); exports.escape = SqlString.escape; exports.escapeId = SqlString.escapeId; @@ -51,33 +45,33 @@ exports.raw = SqlString.raw; exports.__defineGetter__( 'createConnectionPromise', - () => require('./promise.js').createConnection + () => require('./promise.js').createConnection, ); exports.__defineGetter__( 'createPoolPromise', - () => require('./promise.js').createPool + () => require('./promise.js').createPool, ); exports.__defineGetter__( 'createPoolClusterPromise', - () => require('./promise.js').createPoolCluster + () => require('./promise.js').createPoolCluster, ); exports.__defineGetter__('Types', () => require('./lib/constants/types.js')); exports.__defineGetter__('Charsets', () => - require('./lib/constants/charsets.js') + require('./lib/constants/charsets.js'), ); exports.__defineGetter__('CharsetToEncoding', () => - require('./lib/constants/charset_encodings.js') + require('./lib/constants/charset_encodings.js'), ); -exports.setMaxParserCache = function(max) { +exports.setMaxParserCache = function (max) { parserCache.setMaxCache(max); }; -exports.clearParserCache = function() { +exports.clearParserCache = function () { parserCache.clearCache(); }; diff --git a/lib/base/connection.js b/lib/base/connection.js new file mode 100644 index 0000000000..d00ce8c1f6 --- /dev/null +++ b/lib/base/connection.js @@ -0,0 +1,945 @@ +// This file was modified by Oracle on June 1, 2021. +// The changes involve new logic to handle an additional ERR Packet sent by +// the MySQL server when the connection is closed unexpectedly. +// Modifications copyright (c) 2021, Oracle and/or its affiliates. + +// This file was modified by Oracle on June 17, 2021. +// The changes involve logic to ensure the socket connection is closed when +// there is a fatal error. +// Modifications copyright (c) 2021, Oracle and/or its affiliates. + +// This file was modified by Oracle on September 21, 2021. +// The changes involve passing additional authentication factor passwords +// to the ChangeUser Command instance. +// Modifications copyright (c) 2021, Oracle and/or its affiliates. + +'use strict'; + +const Net = require('net'); +const Tls = require('tls'); +const Timers = require('timers'); +const EventEmitter = require('events').EventEmitter; +const Readable = require('stream').Readable; +const Queue = require('denque'); +const SqlString = require('sqlstring'); +const { createLRU } = require('lru.min'); +const PacketParser = require('../packet_parser.js'); +const Packets = require('../packets/index.js'); +const Commands = require('../commands/index.js'); +const ConnectionConfig = require('../connection_config.js'); +const CharsetToEncoding = require('../constants/charset_encodings.js'); + +let _connectionId = 0; + +let convertNamedPlaceholders = null; + +class BaseConnection extends EventEmitter { + constructor(opts) { + super(); + this.config = opts.config; + // TODO: fill defaults + // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX ) + // if host is given, connect to host:3306 + // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath + // if there is no host/port and no socketPath parameters? + if (!opts.config.stream) { + if (opts.config.socketPath) { + this.stream = Net.connect(opts.config.socketPath); + } else { + this.stream = Net.connect(opts.config.port, opts.config.host); + + // Optionally enable keep-alive on the socket. + if (this.config.enableKeepAlive) { + this.stream.on('connect', () => { + this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay); + }); + } + + // Enable TCP_NODELAY flag. This is needed so that the network packets + // are sent immediately to the server + this.stream.setNoDelay(true); + } + // if stream is a function, treat it as "stream agent / factory" + } else if (typeof opts.config.stream === 'function') { + this.stream = opts.config.stream(opts); + } else { + this.stream = opts.config.stream; + } + + this._internalId = _connectionId++; + this._commands = new Queue(); + this._command = null; + this._paused = false; + this._paused_packets = new Queue(); + this._statements = createLRU({ + max: this.config.maxPreparedStatements, + onEviction: function (_, statement) { + statement.close(); + }, + }); + this.serverCapabilityFlags = 0; + this.authorized = false; + this.sequenceId = 0; + this.compressedSequenceId = 0; + this.threadId = null; + this._handshakePacket = null; + this._fatalError = null; + this._protocolError = null; + this._outOfOrderPackets = []; + this.clientEncoding = CharsetToEncoding[this.config.charsetNumber]; + this.stream.on('error', this._handleNetworkError.bind(this)); + // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind + this.packetParser = new PacketParser((p) => { + this.handlePacket(p); + }); + this.stream.on('data', (data) => { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + this.packetParser.execute(data); + }); + this.stream.on('end', () => { + // emit the end event so that the pooled connection can close the connection + this.emit('end'); + }); + this.stream.on('close', () => { + // we need to set this flag everywhere where we want connection to close + if (this._closing) { + return; + } + if (!this._protocolError) { + // no particular error message before disconnect + this._protocolError = new Error( + 'Connection lost: The server closed the connection.', + ); + this._protocolError.fatal = true; + this._protocolError.code = 'PROTOCOL_CONNECTION_LOST'; + } + this._notifyError(this._protocolError); + }); + let handshakeCommand; + if (!this.config.isServer) { + handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags); + handshakeCommand.on('end', () => { + // this happens when handshake finishes early either because there was + // some fatal error or the server sent an error packet instead of + // an hello packet (for example, 'Too many connections' error) + if ( + !handshakeCommand.handshake || + this._fatalError || + this._protocolError + ) { + return; + } + this._handshakePacket = handshakeCommand.handshake; + this.threadId = handshakeCommand.handshake.connectionId; + this.emit('connect', handshakeCommand.handshake); + }); + handshakeCommand.on('error', (err) => { + this._closing = true; + this._notifyError(err); + }); + this.addCommand(handshakeCommand); + } + // in case there was no initial handshake but we need to read sting, assume it utf-8 + // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet) + // will be overwritten with actual encoding value as soon as server handshake packet is received + this.serverEncoding = 'utf8'; + if (this.config.connectTimeout) { + const timeoutHandler = this._handleTimeoutError.bind(this); + this.connectTimeout = Timers.setTimeout( + timeoutHandler, + this.config.connectTimeout, + ); + } + } + + _addCommandClosedState(cmd) { + const err = new Error( + "Can't add new command when connection is in closed state", + ); + err.fatal = true; + if (cmd.onResult) { + cmd.onResult(err); + } else { + this.emit('error', err); + } + } + + _handleFatalError(err) { + err.fatal = true; + // stop receiving packets + this.stream.removeAllListeners('data'); + this.addCommand = this._addCommandClosedState; + this.write = () => { + this.emit('error', new Error("Can't write in closed state")); + }; + this._notifyError(err); + this._fatalError = err; + } + + _handleNetworkError(err) { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + // Do not throw an error when a connection ends with a RST,ACK packet + if (err.code === 'ECONNRESET' && this._closing) { + return; + } + this._handleFatalError(err); + } + + _handleTimeoutError() { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + this.stream.destroy && this.stream.destroy(); + const err = new Error('connect ETIMEDOUT'); + err.errorno = 'ETIMEDOUT'; + err.code = 'ETIMEDOUT'; + err.syscall = 'connect'; + this._handleNetworkError(err); + } + + // notify all commands in the queue and bubble error as connection "error" + // called on stream error or unexpected termination + _notifyError(err) { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET + if (this._fatalError) { + return; + } + let command; + // if there is no active command, notify connection + // if there are commands and all of them have callbacks, pass error via callback + let bubbleErrorToConnection = !this._command; + if (this._command && this._command.onResult) { + this._command.onResult(err); + this._command = null; + // connection handshake is special because we allow it to be implicit + // if error happened during handshake, but there are others commands in queue + // then bubble error to other commands and not to connection + } else if ( + !( + this._command && + this._command.constructor === Commands.ClientHandshake && + this._commands.length > 0 + ) + ) { + bubbleErrorToConnection = true; + } + while ((command = this._commands.shift())) { + if (command.onResult) { + command.onResult(err); + } else { + bubbleErrorToConnection = true; + } + } + // notify connection if some comands in the queue did not have callbacks + // or if this is pool connection ( so it can be removed from pool ) + if (bubbleErrorToConnection || this._pool) { + this.emit('error', err); + } + // close connection after emitting the event in case of a fatal error + if (err.fatal) { + this.close(); + } + } + + write(buffer) { + const result = this.stream.write(buffer, (err) => { + if (err) { + this._handleNetworkError(err); + } + }); + + if (!result) { + this.stream.emit('pause'); + } + } + + // http://dev.mysql.com/doc/internals/en/sequence-id.html + // + // The sequence-id is incremented with each packet and may wrap around. + // It starts at 0 and is reset to 0 when a new command + // begins in the Command Phase. + // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html + _resetSequenceId() { + this.sequenceId = 0; + this.compressedSequenceId = 0; + } + + _bumpCompressedSequenceId(numPackets) { + this.compressedSequenceId += numPackets; + this.compressedSequenceId %= 256; + } + + _bumpSequenceId(numPackets) { + this.sequenceId += numPackets; + this.sequenceId %= 256; + } + + writePacket(packet) { + const MAX_PACKET_LENGTH = 16777215; + const length = packet.length(); + let chunk, offset, header; + if (length < MAX_PACKET_LENGTH) { + packet.writeHeader(this.sequenceId); + if (this.config.debug) { + console.log( + `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`, + ); + console.log( + `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`, + ); + } + this._bumpSequenceId(1); + this.write(packet.buffer); + } else { + if (this.config.debug) { + console.log( + `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`, + ); + console.log( + `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`, + ); + } + for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) { + chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH); + if (chunk.length === MAX_PACKET_LENGTH) { + header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]); + } else { + header = Buffer.from([ + chunk.length & 0xff, + (chunk.length >> 8) & 0xff, + (chunk.length >> 16) & 0xff, + this.sequenceId, + ]); + } + this._bumpSequenceId(1); + this.write(header); + this.write(chunk); + } + } + } + + // 0.11+ environment + startTLS(onSecure) { + if (this.config.debug) { + console.log('Upgrading connection to TLS'); + } + const secureContext = Tls.createSecureContext({ + ca: this.config.ssl.ca, + cert: this.config.ssl.cert, + ciphers: this.config.ssl.ciphers, + key: this.config.ssl.key, + passphrase: this.config.ssl.passphrase, + minVersion: this.config.ssl.minVersion, + maxVersion: this.config.ssl.maxVersion, + }); + const rejectUnauthorized = this.config.ssl.rejectUnauthorized; + const verifyIdentity = this.config.ssl.verifyIdentity; + const servername = this.config.host; + + let secureEstablished = false; + this.stream.removeAllListeners('data'); + const secureSocket = Tls.connect( + { + rejectUnauthorized, + requestCert: rejectUnauthorized, + checkServerIdentity: verifyIdentity + ? Tls.checkServerIdentity + : function () { + return undefined; + }, + secureContext, + isServer: false, + socket: this.stream, + servername, + }, + () => { + secureEstablished = true; + if (rejectUnauthorized) { + if (typeof servername === 'string' && verifyIdentity) { + const cert = secureSocket.getPeerCertificate(true); + const serverIdentityCheckError = Tls.checkServerIdentity( + servername, + cert, + ); + if (serverIdentityCheckError) { + onSecure(serverIdentityCheckError); + return; + } + } + } + onSecure(); + }, + ); + // error handler for secure socket + secureSocket.on('error', (err) => { + if (secureEstablished) { + this._handleNetworkError(err); + } else { + onSecure(err); + } + }); + secureSocket.on('data', (data) => { + this.packetParser.execute(data); + }); + this.write = (buffer) => secureSocket.write(buffer); + } + + protocolError(message, code) { + // Starting with MySQL 8.0.24, if the client closes the connection + // unexpectedly, the server will send a last ERR Packet, which we can + // safely ignore. + // https://dev.mysql.com/worklog/task/?id=12999 + if (this._closing) { + return; + } + + const err = new Error(message); + err.fatal = true; + err.code = code || 'PROTOCOL_ERROR'; + this.emit('error', err); + } + + get fatalError() { + return this._fatalError; + } + + handlePacket(packet) { + if (this._paused) { + this._paused_packets.push(packet); + return; + } + if (this.config.debug) { + if (packet) { + console.log( + ` raw: ${packet.buffer + .slice(packet.offset, packet.offset + packet.length()) + .toString('hex')}`, + ); + console.trace(); + const commandName = this._command + ? this._command._commandName + : '(no command)'; + const stateName = this._command + ? this._command.stateName() + : '(no command)'; + console.log( + `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`, + ); + } + } + if (!this._command) { + const marker = packet.peekByte(); + // If it's an Err Packet, we should use it. + if (marker === 0xff) { + const error = Packets.Error.fromPacket(packet); + this.protocolError(error.message, error.code); + } else { + // Otherwise, it means it's some other unexpected packet. + this.protocolError( + 'Unexpected packet while no commands in the queue', + 'PROTOCOL_UNEXPECTED_PACKET', + ); + } + this.close(); + return; + } + if (packet) { + // Note: when server closes connection due to inactivity, Err packet ER_CLIENT_INTERACTION_TIMEOUT from MySQL 8.0.24, sequenceId will be 0 + if (this.sequenceId !== packet.sequenceId) { + const err = new Error( + `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`, + ); + err.expected = this.sequenceId; + err.received = packet.sequenceId; + this.emit('warn', err); // REVIEW + console.error(err.message); + } + this._bumpSequenceId(packet.numPackets); + } + try { + if (this._fatalError) { + // skip remaining packets after client is in the error state + return; + } + const done = this._command.execute(packet, this); + if (done) { + this._command = this._commands.shift(); + if (this._command) { + this.sequenceId = 0; + this.compressedSequenceId = 0; + this.handlePacket(); + } + } + } catch (err) { + this._handleFatalError(err); + this.stream.destroy(); + } + } + + addCommand(cmd) { + // this.compressedSequenceId = 0; + // this.sequenceId = 0; + if (this.config.debug) { + const commandName = cmd.constructor.name; + console.log(`Add command: ${commandName}`); + cmd._commandName = commandName; + } + if (!this._command) { + this._command = cmd; + this.handlePacket(); + } else { + this._commands.push(cmd); + } + return cmd; + } + + format(sql, values) { + if (typeof this.config.queryFormat === 'function') { + return this.config.queryFormat.call( + this, + sql, + values, + this.config.timezone, + ); + } + const opts = { + sql: sql, + values: values, + }; + this._resolveNamedPlaceholders(opts); + return SqlString.format( + opts.sql, + opts.values, + this.config.stringifyObjects, + this.config.timezone, + ); + } + + escape(value) { + return SqlString.escape(value, false, this.config.timezone); + } + + escapeId(value) { + return SqlString.escapeId(value, false); + } + + raw(sql) { + return SqlString.raw(sql); + } + + _resolveNamedPlaceholders(options) { + let unnamed; + if (this.config.namedPlaceholders || options.namedPlaceholders) { + if (Array.isArray(options.values)) { + // if an array is provided as the values, assume the conversion is not necessary. + // this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled. + return; + } + if (convertNamedPlaceholders === null) { + convertNamedPlaceholders = require('named-placeholders')(); + } + unnamed = convertNamedPlaceholders(options.sql, options.values); + options.sql = unnamed[0]; + options.values = unnamed[1]; + } + } + + query(sql, values, cb) { + let cmdQuery; + if (sql.constructor === Commands.Query) { + cmdQuery = sql; + } else { + cmdQuery = BaseConnection.createQuery(sql, values, cb, this.config); + } + this._resolveNamedPlaceholders(cmdQuery); + const rawSql = this.format( + cmdQuery.sql, + cmdQuery.values !== undefined ? cmdQuery.values : [], + ); + cmdQuery.sql = rawSql; + return this.addCommand(cmdQuery); + } + + pause() { + this._paused = true; + this.stream.pause(); + } + + resume() { + let packet; + this._paused = false; + while ((packet = this._paused_packets.shift())) { + this.handlePacket(packet); + // don't resume if packet handler paused connection + if (this._paused) { + return; + } + } + this.stream.resume(); + } + + // TODO: named placeholders support + prepare(options, cb) { + if (typeof options === 'string') { + options = { sql: options }; + } + return this.addCommand(new Commands.Prepare(options, cb)); + } + + unprepare(sql) { + let options = {}; + if (typeof sql === 'object') { + options = sql; + } else { + options.sql = sql; + } + const key = BaseConnection.statementKey(options); + const stmt = this._statements.get(key); + if (stmt) { + this._statements.delete(key); + stmt.close(); + } + return stmt; + } + + execute(sql, values, cb) { + let options = { + infileStreamFactory: this.config.infileStreamFactory, + }; + if (typeof sql === 'object') { + // execute(options, cb) + options = { + ...options, + ...sql, + sql: sql.sql, + values: sql.values, + }; + if (typeof values === 'function') { + cb = values; + } else { + options.values = options.values || values; + } + } else if (typeof values === 'function') { + // execute(sql, cb) + cb = values; + options.sql = sql; + options.values = undefined; + } else { + // execute(sql, values, cb) + options.sql = sql; + options.values = values; + } + this._resolveNamedPlaceholders(options); + // check for values containing undefined + if (options.values) { + //If namedPlaceholder is not enabled and object is passed as bind parameters + if (!Array.isArray(options.values)) { + throw new TypeError( + 'Bind parameters must be array if namedPlaceholders parameter is not enabled', + ); + } + options.values.forEach((val) => { + //If namedPlaceholder is not enabled and object is passed as bind parameters + if (!Array.isArray(options.values)) { + throw new TypeError( + 'Bind parameters must be array if namedPlaceholders parameter is not enabled', + ); + } + if (val === undefined) { + throw new TypeError( + 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null', + ); + } + if (typeof val === 'function') { + throw new TypeError( + 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first', + ); + } + }); + } + const executeCommand = new Commands.Execute(options, cb); + const prepareCommand = new Commands.Prepare(options, (err, stmt) => { + if (err) { + // skip execute command if prepare failed, we have main + // combined callback here + executeCommand.start = function () { + return null; + }; + if (cb) { + cb(err); + } else { + executeCommand.emit('error', err); + } + executeCommand.emit('end'); + return; + } + executeCommand.statement = stmt; + }); + this.addCommand(prepareCommand); + this.addCommand(executeCommand); + return executeCommand; + } + + changeUser(options, callback) { + if (!callback && typeof options === 'function') { + callback = options; + options = {}; + } + const charsetNumber = options.charset + ? ConnectionConfig.getCharsetNumber(options.charset) + : this.config.charsetNumber; + return this.addCommand( + new Commands.ChangeUser( + { + user: options.user || this.config.user, + // for the purpose of multi-factor authentication, or not, the main + // password (used for the 1st authentication factor) can also be + // provided via the "password1" option + password: + options.password || + options.password1 || + this.config.password || + this.config.password1, + password2: options.password2 || this.config.password2, + password3: options.password3 || this.config.password3, + passwordSha1: options.passwordSha1 || this.config.passwordSha1, + database: options.database || this.config.database, + timeout: options.timeout, + charsetNumber: charsetNumber, + currentConfig: this.config, + }, + (err) => { + if (err) { + err.fatal = true; + } + if (callback) { + callback(err); + } + }, + ), + ); + } + + // transaction helpers + beginTransaction(cb) { + return this.query('START TRANSACTION', cb); + } + + commit(cb) { + return this.query('COMMIT', cb); + } + + rollback(cb) { + return this.query('ROLLBACK', cb); + } + + ping(cb) { + return this.addCommand(new Commands.Ping(cb)); + } + + _registerSlave(opts, cb) { + return this.addCommand(new Commands.RegisterSlave(opts, cb)); + } + + _binlogDump(opts, cb) { + return this.addCommand(new Commands.BinlogDump(opts, cb)); + } + + // currently just alias to close + destroy() { + this.close(); + } + + close() { + if (this.connectTimeout) { + Timers.clearTimeout(this.connectTimeout); + this.connectTimeout = null; + } + this._closing = true; + this.stream.end(); + this.addCommand = this._addCommandClosedState; + } + + createBinlogStream(opts) { + // TODO: create proper stream class + // TODO: use through2 + let test = 1; + const stream = new Readable({ objectMode: true }); + stream._read = function () { + return { + data: test++, + }; + }; + this._registerSlave(opts, () => { + const dumpCmd = this._binlogDump(opts); + dumpCmd.on('event', (ev) => { + stream.push(ev); + }); + dumpCmd.on('eof', () => { + stream.push(null); + // if non-blocking, then close stream to prevent errors + if (opts.flags && opts.flags & 0x01) { + this.close(); + } + }); + // TODO: pipe errors as well + }); + return stream; + } + + connect(cb) { + if (!cb) { + return; + } + if (this._fatalError || this._protocolError) { + return cb(this._fatalError || this._protocolError); + } + if (this._handshakePacket) { + return cb(null, this); + } + let connectCalled = 0; + function callbackOnce(isErrorHandler) { + return function (param) { + if (!connectCalled) { + if (isErrorHandler) { + cb(param); + } else { + cb(null, param); + } + } + connectCalled = 1; + }; + } + this.once('error', callbackOnce(true)); + this.once('connect', callbackOnce(false)); + } + + // =================================== + // outgoing server connection methods + // =================================== + writeColumns(columns) { + this.writePacket(Packets.ResultSetHeader.toPacket(columns.length)); + columns.forEach((column) => { + this.writePacket( + Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding), + ); + }); + this.writeEof(); + } + + // row is array of columns, not hash + writeTextRow(column) { + this.writePacket( + Packets.TextRow.toPacket(column, this.serverConfig.encoding), + ); + } + + writeBinaryRow(column) { + this.writePacket( + Packets.BinaryRow.toPacket(column, this.serverConfig.encoding), + ); + } + + writeTextResult(rows, columns, binary = false) { + this.writeColumns(columns); + rows.forEach((row) => { + const arrayRow = new Array(columns.length); + columns.forEach((column) => { + arrayRow.push(row[column.name]); + }); + if (binary) { + this.writeBinaryRow(arrayRow); + } else this.writeTextRow(arrayRow); + }); + this.writeEof(); + } + + writeEof(warnings, statusFlags) { + this.writePacket(Packets.EOF.toPacket(warnings, statusFlags)); + } + + writeOk(args) { + if (!args) { + args = { affectedRows: 0 }; + } + this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding)); + } + + writeError(args) { + // if we want to send error before initial hello was sent, use default encoding + const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8'; + this.writePacket(Packets.Error.toPacket(args, encoding)); + } + + serverHandshake(args) { + this.serverConfig = args; + this.serverConfig.encoding = + CharsetToEncoding[this.serverConfig.characterSet]; + return this.addCommand(new Commands.ServerHandshake(args)); + } + + // =============================================================== + end(callback) { + if (this.config.isServer) { + this._closing = true; + const quitCmd = new EventEmitter(); + setImmediate(() => { + this.stream.end(); + quitCmd.emit('end'); + }); + return quitCmd; + } + // trigger error if more commands enqueued after end command + const quitCmd = this.addCommand(new Commands.Quit(callback)); + this.addCommand = this._addCommandClosedState; + return quitCmd; + } + + static createQuery(sql, values, cb, config) { + let options = { + rowsAsArray: config.rowsAsArray, + infileStreamFactory: config.infileStreamFactory, + }; + if (typeof sql === 'object') { + // query(options, cb) + options = { + ...options, + ...sql, + sql: sql.sql, + values: sql.values, + }; + if (typeof values === 'function') { + cb = values; + } else if (values !== undefined) { + options.values = values; + } + } else if (typeof values === 'function') { + // query(sql, cb) + cb = values; + options.sql = sql; + options.values = undefined; + } else { + // query(sql, values, cb) + options.sql = sql; + options.values = values; + } + return new Commands.Query(options, cb); + } + + static statementKey(options) { + return `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`; + } +} + +module.exports = BaseConnection; diff --git a/lib/base/pool.js b/lib/base/pool.js new file mode 100644 index 0000000000..47981f1943 --- /dev/null +++ b/lib/base/pool.js @@ -0,0 +1,233 @@ +'use strict'; + +const process = require('process'); +const SqlString = require('sqlstring'); +const EventEmitter = require('events').EventEmitter; +const PoolConnection = require('../pool_connection.js'); +const Queue = require('denque'); +const BaseConnection = require('./connection.js'); + +function spliceConnection(queue, connection) { + const len = queue.length; + for (let i = 0; i < len; i++) { + if (queue.get(i) === connection) { + queue.removeOne(i); + break; + } + } +} + +class BasePool extends EventEmitter { + constructor(options) { + super(); + this.config = options.config; + this.config.connectionConfig.pool = this; + this._allConnections = new Queue(); + this._freeConnections = new Queue(); + this._connectionQueue = new Queue(); + this._closed = false; + if (this.config.maxIdle < this.config.connectionLimit) { + // create idle connection timeout automatically release job + this._removeIdleTimeoutConnections(); + } + } + + getConnection(cb) { + if (this._closed) { + return process.nextTick(() => cb(new Error('Pool is closed.'))); + } + let connection; + if (this._freeConnections.length > 0) { + connection = this._freeConnections.pop(); + this.emit('acquire', connection); + return process.nextTick(() => cb(null, connection)); + } + if ( + this.config.connectionLimit === 0 || + this._allConnections.length < this.config.connectionLimit + ) { + connection = new PoolConnection(this, { + config: this.config.connectionConfig, + }); + this._allConnections.push(connection); + return connection.connect((err) => { + if (this._closed) { + return cb(new Error('Pool is closed.')); + } + if (err) { + return cb(err); + } + this.emit('connection', connection); + this.emit('acquire', connection); + return cb(null, connection); + }); + } + if (!this.config.waitForConnections) { + return process.nextTick(() => cb(new Error('No connections available.'))); + } + if ( + this.config.queueLimit && + this._connectionQueue.length >= this.config.queueLimit + ) { + return cb(new Error('Queue limit reached.')); + } + this.emit('enqueue'); + return this._connectionQueue.push(cb); + } + + releaseConnection(connection) { + let cb; + if (!connection._pool) { + // The connection has been removed from the pool and is no longer good. + if (this._connectionQueue.length) { + cb = this._connectionQueue.shift(); + process.nextTick(this.getConnection.bind(this, cb)); + } + } else if (this._connectionQueue.length) { + cb = this._connectionQueue.shift(); + process.nextTick(cb.bind(null, null, connection)); + } else { + this._freeConnections.push(connection); + this.emit('release', connection); + } + } + + end(cb) { + this._closed = true; + clearTimeout(this._removeIdleTimeoutConnectionsTimer); + if (typeof cb !== 'function') { + cb = function (err) { + if (err) { + throw err; + } + }; + } + let calledBack = false; + let closedConnections = 0; + let connection; + const endCB = function (err) { + if (calledBack) { + return; + } + if (err || ++closedConnections >= this._allConnections.length) { + calledBack = true; + cb(err); + return; + } + }.bind(this); + if (this._allConnections.length === 0) { + endCB(); + return; + } + for (let i = 0; i < this._allConnections.length; i++) { + connection = this._allConnections.get(i); + connection._realEnd(endCB); + } + } + + query(sql, values, cb) { + const cmdQuery = BaseConnection.createQuery( + sql, + values, + cb, + this.config.connectionConfig, + ); + if (typeof cmdQuery.namedPlaceholders === 'undefined') { + cmdQuery.namedPlaceholders = + this.config.connectionConfig.namedPlaceholders; + } + this.getConnection((err, conn) => { + if (err) { + if (typeof cmdQuery.onResult === 'function') { + cmdQuery.onResult(err); + } else { + cmdQuery.emit('error', err); + } + return; + } + try { + conn.query(cmdQuery).once('end', () => { + conn.release(); + }); + } catch (e) { + conn.release(); + throw e; + } + }); + return cmdQuery; + } + + execute(sql, values, cb) { + // TODO construct execute command first here and pass it to connection.execute + // so that polymorphic arguments logic is there in one place + if (typeof values === 'function') { + cb = values; + values = []; + } + this.getConnection((err, conn) => { + if (err) { + return cb(err); + } + try { + conn.execute(sql, values, cb).once('end', () => { + conn.release(); + }); + } catch (e) { + conn.release(); + return cb(e); + } + }); + } + + _removeConnection(connection) { + // Remove connection from all connections + spliceConnection(this._allConnections, connection); + // Remove connection from free connections + spliceConnection(this._freeConnections, connection); + this.releaseConnection(connection); + } + + _removeIdleTimeoutConnections() { + if (this._removeIdleTimeoutConnectionsTimer) { + clearTimeout(this._removeIdleTimeoutConnectionsTimer); + } + + this._removeIdleTimeoutConnectionsTimer = setTimeout(() => { + try { + while ( + this._freeConnections.length > this.config.maxIdle || + (this._freeConnections.length > 0 && + Date.now() - this._freeConnections.get(0).lastActiveTime > + this.config.idleTimeout) + ) { + this._freeConnections.get(0).destroy(); + } + } finally { + this._removeIdleTimeoutConnections(); + } + }, 1000); + } + + format(sql, values) { + return SqlString.format( + sql, + values, + this.config.connectionConfig.stringifyObjects, + this.config.connectionConfig.timezone, + ); + } + + escape(value) { + return SqlString.escape( + value, + this.config.connectionConfig.stringifyObjects, + this.config.connectionConfig.timezone, + ); + } + + escapeId(value) { + return SqlString.escapeId(value, false); + } +} + +module.exports = BasePool; diff --git a/lib/base/pool_connection.js b/lib/base/pool_connection.js new file mode 100644 index 0000000000..74511ce587 --- /dev/null +++ b/lib/base/pool_connection.js @@ -0,0 +1,63 @@ +'use strict'; + +const BaseConnection = require('./connection.js'); + +class BasePoolConnection extends BaseConnection { + constructor(pool, options) { + super(options); + this._pool = pool; + // The last active time of this connection + this.lastActiveTime = Date.now(); + // When a fatal error occurs the connection's protocol ends, which will cause + // the connection to end as well, thus we only need to watch for the end event + // and we will be notified of disconnects. + // REVIEW: Moved to `once` + this.once('end', () => { + this._removeFromPool(); + }); + this.once('error', () => { + this._removeFromPool(); + }); + } + + release() { + if (!this._pool || this._pool._closed) { + return; + } + // update last active time + this.lastActiveTime = Date.now(); + this._pool.releaseConnection(this); + } + + end() { + const err = new Error( + 'Calling conn.end() to release a pooled connection is ' + + 'deprecated. In next version calling conn.end() will be ' + + 'restored to default conn.end() behavior. Use ' + + 'conn.release() instead.', + ); + this.emit('warn', err); + console.warn(err.message); + this.release(); + } + + destroy() { + this._removeFromPool(); + super.destroy(); + } + + _removeFromPool() { + if (!this._pool || this._pool._closed) { + return; + } + const pool = this._pool; + this._pool = null; + pool._removeConnection(this); + } +} + +BasePoolConnection.statementKey = BaseConnection.statementKey; +module.exports = BasePoolConnection; + +// TODO: Remove this when we are removing PoolConnection#end +BasePoolConnection.prototype._realEnd = BaseConnection.prototype.end; diff --git a/lib/connection.js b/lib/connection.js index af6b3d9d68..b21cff85c4 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -1,948 +1,12 @@ -// This file was modified by Oracle on June 1, 2021. -// The changes involve new logic to handle an additional ERR Packet sent by -// the MySQL server when the connection is closed unexpectedly. -// Modifications copyright (c) 2021, Oracle and/or its affiliates. - -// This file was modified by Oracle on June 17, 2021. -// The changes involve logic to ensure the socket connection is closed when -// there is a fatal error. -// Modifications copyright (c) 2021, Oracle and/or its affiliates. - -// This file was modified by Oracle on September 21, 2021. -// The changes involve passing additional authentication factor passwords -// to the ChangeUser Command instance. -// Modifications copyright (c) 2021, Oracle and/or its affiliates. - 'use strict'; -const Net = require('net'); -const Tls = require('tls'); -const Timers = require('timers'); -const EventEmitter = require('events').EventEmitter; -const Readable = require('stream').Readable; -const Queue = require('denque'); -const SqlString = require('sqlstring'); -const { createLRU } = require('lru.min'); - -const PacketParser = require('./packet_parser.js'); -const Packets = require('./packets/index.js'); -const Commands = require('./commands/index.js'); -const ConnectionConfig = require('./connection_config.js'); -const CharsetToEncoding = require('./constants/charset_encodings.js'); - -let _connectionId = 0; - -let convertNamedPlaceholders = null; - -class Connection extends EventEmitter { - constructor(opts) { - super(); - this.config = opts.config; - // TODO: fill defaults - // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX ) - // if host is given, connect to host:3306 - // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath - // if there is no host/port and no socketPath parameters? - if (!opts.config.stream) { - if (opts.config.socketPath) { - this.stream = Net.connect(opts.config.socketPath); - } else { - this.stream = Net.connect( - opts.config.port, - opts.config.host - ); - - // Optionally enable keep-alive on the socket. - if (this.config.enableKeepAlive) { - this.stream.on('connect', () => { - this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay); - }); - } - - // Enable TCP_NODELAY flag. This is needed so that the network packets - // are sent immediately to the server - this.stream.setNoDelay(true); - } - // if stream is a function, treat it as "stream agent / factory" - } else if (typeof opts.config.stream === 'function') { - this.stream = opts.config.stream(opts); - } else { - this.stream = opts.config.stream; - } - - this._internalId = _connectionId++; - this._commands = new Queue(); - this._command = null; - this._paused = false; - this._paused_packets = new Queue(); - this._statements = createLRU({ - max: this.config.maxPreparedStatements, - onEviction: function(_, statement) { - statement.close(); - } - }); - this.serverCapabilityFlags = 0; - this.authorized = false; - this.sequenceId = 0; - this.compressedSequenceId = 0; - this.threadId = null; - this._handshakePacket = null; - this._fatalError = null; - this._protocolError = null; - this._outOfOrderPackets = []; - this.clientEncoding = CharsetToEncoding[this.config.charsetNumber]; - this.stream.on('error', this._handleNetworkError.bind(this)); - // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind - this.packetParser = new PacketParser(p => { - this.handlePacket(p); - }); - this.stream.on('data', data => { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - this.packetParser.execute(data); - }); - this.stream.on('end', () => { - // emit the end event so that the pooled connection can close the connection - this.emit('end'); - }); - this.stream.on('close', () => { - // we need to set this flag everywhere where we want connection to close - if (this._closing) { - return; - } - if (!this._protocolError) { - // no particular error message before disconnect - this._protocolError = new Error( - 'Connection lost: The server closed the connection.' - ); - this._protocolError.fatal = true; - this._protocolError.code = 'PROTOCOL_CONNECTION_LOST'; - } - this._notifyError(this._protocolError); - }); - let handshakeCommand; - if (!this.config.isServer) { - handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags); - handshakeCommand.on('end', () => { - // this happens when handshake finishes early either because there was - // some fatal error or the server sent an error packet instead of - // an hello packet (for example, 'Too many connections' error) - if (!handshakeCommand.handshake || this._fatalError || this._protocolError) { - return; - } - this._handshakePacket = handshakeCommand.handshake; - this.threadId = handshakeCommand.handshake.connectionId; - this.emit('connect', handshakeCommand.handshake); - }); - handshakeCommand.on('error', err => { - this._closing = true; - this._notifyError(err); - }); - this.addCommand(handshakeCommand); - } - // in case there was no initial handshake but we need to read sting, assume it utf-8 - // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet) - // will be overwritten with actual encoding value as soon as server handshake packet is received - this.serverEncoding = 'utf8'; - if (this.config.connectTimeout) { - const timeoutHandler = this._handleTimeoutError.bind(this); - this.connectTimeout = Timers.setTimeout( - timeoutHandler, - this.config.connectTimeout - ); - } - } +const BaseConnection = require('./base/connection.js'); +class Connection extends BaseConnection { promise(promiseImpl) { - const PromiseConnection = require('../promise').PromiseConnection; + const PromiseConnection = require('./promise/connection.js'); return new PromiseConnection(this, promiseImpl); } - - _addCommandClosedState(cmd) { - const err = new Error( - "Can't add new command when connection is in closed state" - ); - err.fatal = true; - if (cmd.onResult) { - cmd.onResult(err); - } else { - this.emit('error', err); - } - } - - _handleFatalError(err) { - err.fatal = true; - // stop receiving packets - this.stream.removeAllListeners('data'); - this.addCommand = this._addCommandClosedState; - this.write = () => { - this.emit('error', new Error("Can't write in closed state")); - }; - this._notifyError(err); - this._fatalError = err; - } - - _handleNetworkError(err) { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - // Do not throw an error when a connection ends with a RST,ACK packet - if (err.code === 'ECONNRESET' && this._closing) { - return; - } - this._handleFatalError(err); - } - - _handleTimeoutError() { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - this.stream.destroy && this.stream.destroy(); - const err = new Error('connect ETIMEDOUT'); - err.errorno = 'ETIMEDOUT'; - err.code = 'ETIMEDOUT'; - err.syscall = 'connect'; - this._handleNetworkError(err); - } - - // notify all commands in the queue and bubble error as connection "error" - // called on stream error or unexpected termination - _notifyError(err) { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET - if (this._fatalError) { - return; - } - let command; - // if there is no active command, notify connection - // if there are commands and all of them have callbacks, pass error via callback - let bubbleErrorToConnection = !this._command; - if (this._command && this._command.onResult) { - this._command.onResult(err); - this._command = null; - // connection handshake is special because we allow it to be implicit - // if error happened during handshake, but there are others commands in queue - // then bubble error to other commands and not to connection - } else if ( - !( - this._command && - this._command.constructor === Commands.ClientHandshake && - this._commands.length > 0 - ) - ) { - bubbleErrorToConnection = true; - } - while ((command = this._commands.shift())) { - if (command.onResult) { - command.onResult(err); - } else { - bubbleErrorToConnection = true; - } - } - // notify connection if some comands in the queue did not have callbacks - // or if this is pool connection ( so it can be removed from pool ) - if (bubbleErrorToConnection || this._pool) { - this.emit('error', err); - } - // close connection after emitting the event in case of a fatal error - if (err.fatal) { - this.close(); - } - } - - write(buffer) { - const result = this.stream.write(buffer, err => { - if (err) { - this._handleNetworkError(err); - } - }); - - if (!result) { - this.stream.emit('pause'); - } - } - - // http://dev.mysql.com/doc/internals/en/sequence-id.html - // - // The sequence-id is incremented with each packet and may wrap around. - // It starts at 0 and is reset to 0 when a new command - // begins in the Command Phase. - // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html - _resetSequenceId() { - this.sequenceId = 0; - this.compressedSequenceId = 0; - } - - _bumpCompressedSequenceId(numPackets) { - this.compressedSequenceId += numPackets; - this.compressedSequenceId %= 256; - } - - _bumpSequenceId(numPackets) { - this.sequenceId += numPackets; - this.sequenceId %= 256; - } - - writePacket(packet) { - const MAX_PACKET_LENGTH = 16777215; - const length = packet.length(); - let chunk, offset, header; - if (length < MAX_PACKET_LENGTH) { - packet.writeHeader(this.sequenceId); - if (this.config.debug) { - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})` - ); - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}` - ); - } - this._bumpSequenceId(1); - this.write(packet.buffer); - } else { - if (this.config.debug) { - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:` - ); - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})` - ); - } - for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) { - chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH); - if (chunk.length === MAX_PACKET_LENGTH) { - header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]); - } else { - header = Buffer.from([ - chunk.length & 0xff, - (chunk.length >> 8) & 0xff, - (chunk.length >> 16) & 0xff, - this.sequenceId - ]); - } - this._bumpSequenceId(1); - this.write(header); - this.write(chunk); - } - } - } - - // 0.11+ environment - startTLS(onSecure) { - if (this.config.debug) { - // eslint-disable-next-line no-console - console.log('Upgrading connection to TLS'); - } - const secureContext = Tls.createSecureContext({ - ca: this.config.ssl.ca, - cert: this.config.ssl.cert, - ciphers: this.config.ssl.ciphers, - key: this.config.ssl.key, - passphrase: this.config.ssl.passphrase, - minVersion: this.config.ssl.minVersion, - maxVersion: this.config.ssl.maxVersion - }); - const rejectUnauthorized = this.config.ssl.rejectUnauthorized; - const verifyIdentity = this.config.ssl.verifyIdentity; - const servername = this.config.host; - - let secureEstablished = false; - this.stream.removeAllListeners('data'); - const secureSocket = Tls.connect({ - rejectUnauthorized, - requestCert: rejectUnauthorized, - checkServerIdentity: verifyIdentity - ? Tls.checkServerIdentity - : function() { return undefined; }, - secureContext, - isServer: false, - socket: this.stream, - servername - }, () => { - secureEstablished = true; - if (rejectUnauthorized) { - if (typeof servername === 'string' && verifyIdentity) { - const cert = secureSocket.getPeerCertificate(true); - const serverIdentityCheckError = Tls.checkServerIdentity(servername, cert); - if (serverIdentityCheckError) { - onSecure(serverIdentityCheckError); - return; - } - } - } - onSecure(); - }); - // error handler for secure socket - secureSocket.on('error', err => { - if (secureEstablished) { - this._handleNetworkError(err); - } else { - onSecure(err); - } - }); - secureSocket.on('data', data => { - this.packetParser.execute(data); - }); - this.write = buffer => secureSocket.write(buffer); - } - - protocolError(message, code) { - // Starting with MySQL 8.0.24, if the client closes the connection - // unexpectedly, the server will send a last ERR Packet, which we can - // safely ignore. - // https://dev.mysql.com/worklog/task/?id=12999 - if (this._closing) { - return; - } - - const err = new Error(message); - err.fatal = true; - err.code = code || 'PROTOCOL_ERROR'; - this.emit('error', err); - } - - get fatalError() { - return this._fatalError; - } - - handlePacket(packet) { - if (this._paused) { - this._paused_packets.push(packet); - return; - } - if (this.config.debug) { - if (packet) { - // eslint-disable-next-line no-console - console.log( - ` raw: ${packet.buffer - .slice(packet.offset, packet.offset + packet.length()) - .toString('hex')}` - ); - // eslint-disable-next-line no-console - console.trace(); - const commandName = this._command - ? this._command._commandName - : '(no command)'; - const stateName = this._command - ? this._command.stateName() - : '(no command)'; - // eslint-disable-next-line no-console - console.log( - `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})` - ); - } - } - if (!this._command) { - const marker = packet.peekByte(); - // If it's an Err Packet, we should use it. - if (marker === 0xff) { - const error = Packets.Error.fromPacket(packet); - this.protocolError(error.message, error.code); - } else { - // Otherwise, it means it's some other unexpected packet. - this.protocolError( - 'Unexpected packet while no commands in the queue', - 'PROTOCOL_UNEXPECTED_PACKET' - ); - } - this.close(); - return; - } - if (packet) { - // Note: when server closes connection due to inactivity, Err packet ER_CLIENT_INTERACTION_TIMEOUT from MySQL 8.0.24, sequenceId will be 0 - if (this.sequenceId !== packet.sequenceId) { - const err = new Error( - `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}` - ); - err.expected = this.sequenceId; - err.received = packet.sequenceId; - this.emit('warn', err); // REVIEW - // eslint-disable-next-line no-console - console.error(err.message); - } - this._bumpSequenceId(packet.numPackets); - } - try { - if (this._fatalError) { - // skip remaining packets after client is in the error state - return; - } - const done = this._command.execute(packet, this); - if (done) { - this._command = this._commands.shift(); - if (this._command) { - this.sequenceId = 0; - this.compressedSequenceId = 0; - this.handlePacket(); - } - } - } catch (err) { - this._handleFatalError(err); - this.stream.destroy(); - } - } - - addCommand(cmd) { - // this.compressedSequenceId = 0; - // this.sequenceId = 0; - if (this.config.debug) { - const commandName = cmd.constructor.name; - // eslint-disable-next-line no-console - console.log(`Add command: ${commandName}`); - cmd._commandName = commandName; - } - if (!this._command) { - this._command = cmd; - this.handlePacket(); - } else { - this._commands.push(cmd); - } - return cmd; - } - - format(sql, values) { - if (typeof this.config.queryFormat === 'function') { - return this.config.queryFormat.call( - this, - sql, - values, - this.config.timezone - ); - } - const opts = { - sql: sql, - values: values - }; - this._resolveNamedPlaceholders(opts); - return SqlString.format( - opts.sql, - opts.values, - this.config.stringifyObjects, - this.config.timezone - ); - } - - escape(value) { - return SqlString.escape(value, false, this.config.timezone); - } - - escapeId(value) { - return SqlString.escapeId(value, false); - } - - raw(sql) { - return SqlString.raw(sql); - } - - _resolveNamedPlaceholders(options) { - let unnamed; - if (this.config.namedPlaceholders || options.namedPlaceholders) { - if (Array.isArray(options.values)) { - // if an array is provided as the values, assume the conversion is not necessary. - // this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled. - return - } - if (convertNamedPlaceholders === null) { - convertNamedPlaceholders = require('named-placeholders')(); - } - unnamed = convertNamedPlaceholders(options.sql, options.values); - options.sql = unnamed[0]; - options.values = unnamed[1]; - } - } - - query(sql, values, cb) { - let cmdQuery; - if (sql.constructor === Commands.Query) { - cmdQuery = sql; - } else { - cmdQuery = Connection.createQuery(sql, values, cb, this.config); - } - this._resolveNamedPlaceholders(cmdQuery); - const rawSql = this.format(cmdQuery.sql, cmdQuery.values !== undefined ? cmdQuery.values : []); - cmdQuery.sql = rawSql; - return this.addCommand(cmdQuery); - } - - pause() { - this._paused = true; - this.stream.pause(); - } - - resume() { - let packet; - this._paused = false; - while ((packet = this._paused_packets.shift())) { - this.handlePacket(packet); - // don't resume if packet handler paused connection - if (this._paused) { - return; - } - } - this.stream.resume(); - } - - // TODO: named placeholders support - prepare(options, cb) { - if (typeof options === 'string') { - options = { sql: options }; - } - return this.addCommand(new Commands.Prepare(options, cb)); - } - - unprepare(sql) { - let options = {}; - if (typeof sql === 'object') { - options = sql; - } else { - options.sql = sql; - } - const key = Connection.statementKey(options); - const stmt = this._statements.get(key); - if (stmt) { - this._statements.delete(key); - stmt.close(); - } - return stmt; - } - - execute(sql, values, cb) { - let options = { - infileStreamFactory: this.config.infileStreamFactory - }; - if (typeof sql === 'object') { - // execute(options, cb) - options = { - ...options, - ...sql, - sql: sql.sql, - values: sql.values - }; - if (typeof values === 'function') { - cb = values; - } else { - options.values = options.values || values; - } - } else if (typeof values === 'function') { - // execute(sql, cb) - cb = values; - options.sql = sql; - options.values = undefined; - } else { - // execute(sql, values, cb) - options.sql = sql; - options.values = values; - } - this._resolveNamedPlaceholders(options); - // check for values containing undefined - if (options.values) { - //If namedPlaceholder is not enabled and object is passed as bind parameters - if (!Array.isArray(options.values)) { - throw new TypeError( - 'Bind parameters must be array if namedPlaceholders parameter is not enabled' - ); - } - options.values.forEach(val => { - //If namedPlaceholder is not enabled and object is passed as bind parameters - if (!Array.isArray(options.values)) { - throw new TypeError( - 'Bind parameters must be array if namedPlaceholders parameter is not enabled' - ); - } - if (val === undefined) { - throw new TypeError( - 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null' - ); - } - if (typeof val === 'function') { - throw new TypeError( - 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first' - ); - } - }); - } - const executeCommand = new Commands.Execute(options, cb); - const prepareCommand = new Commands.Prepare(options, (err, stmt) => { - if (err) { - // skip execute command if prepare failed, we have main - // combined callback here - executeCommand.start = function() { - return null; - }; - if (cb) { - cb(err); - } else { - executeCommand.emit('error', err); - } - executeCommand.emit('end'); - return; - } - executeCommand.statement = stmt; - }); - this.addCommand(prepareCommand); - this.addCommand(executeCommand); - return executeCommand; - } - - changeUser(options, callback) { - if (!callback && typeof options === 'function') { - callback = options; - options = {}; - } - const charsetNumber = options.charset - ? ConnectionConfig.getCharsetNumber(options.charset) - : this.config.charsetNumber; - return this.addCommand( - new Commands.ChangeUser( - { - user: options.user || this.config.user, - // for the purpose of multi-factor authentication, or not, the main - // password (used for the 1st authentication factor) can also be - // provided via the "password1" option - password: options.password || options.password1 || this.config.password || this.config.password1, - password2: options.password2 || this.config.password2, - password3: options.password3 || this.config.password3, - passwordSha1: options.passwordSha1 || this.config.passwordSha1, - database: options.database || this.config.database, - timeout: options.timeout, - charsetNumber: charsetNumber, - currentConfig: this.config - }, - err => { - if (err) { - err.fatal = true; - } - if (callback) { - callback(err); - } - } - ) - ); - } - - // transaction helpers - beginTransaction(cb) { - return this.query('START TRANSACTION', cb); - } - - commit(cb) { - return this.query('COMMIT', cb); - } - - rollback(cb) { - return this.query('ROLLBACK', cb); - } - - ping(cb) { - return this.addCommand(new Commands.Ping(cb)); - } - - _registerSlave(opts, cb) { - return this.addCommand(new Commands.RegisterSlave(opts, cb)); - } - - _binlogDump(opts, cb) { - return this.addCommand(new Commands.BinlogDump(opts, cb)); - } - - // currently just alias to close - destroy() { - this.close(); - } - - close() { - if (this.connectTimeout) { - Timers.clearTimeout(this.connectTimeout); - this.connectTimeout = null; - } - this._closing = true; - this.stream.end(); - this.addCommand = this._addCommandClosedState; - } - - createBinlogStream(opts) { - // TODO: create proper stream class - // TODO: use through2 - let test = 1; - const stream = new Readable({ objectMode: true }); - stream._read = function() { - return { - data: test++ - }; - }; - this._registerSlave(opts, () => { - const dumpCmd = this._binlogDump(opts); - dumpCmd.on('event', ev => { - stream.push(ev); - }); - dumpCmd.on('eof', () => { - stream.push(null); - // if non-blocking, then close stream to prevent errors - if (opts.flags && opts.flags & 0x01) { - this.close(); - } - }); - // TODO: pipe errors as well - }); - return stream; - } - - connect(cb) { - if (!cb) { - return; - } - if (this._fatalError || this._protocolError) { - return cb(this._fatalError || this._protocolError); - } - if (this._handshakePacket) { - return cb(null, this); - } - let connectCalled = 0; - function callbackOnce(isErrorHandler) { - return function(param) { - if (!connectCalled) { - if (isErrorHandler) { - cb(param); - } else { - cb(null, param); - } - } - connectCalled = 1; - }; - } - this.once('error', callbackOnce(true)); - this.once('connect', callbackOnce(false)); - } - - // =================================== - // outgoing server connection methods - // =================================== - writeColumns(columns) { - this.writePacket(Packets.ResultSetHeader.toPacket(columns.length)); - columns.forEach(column => { - this.writePacket( - Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding) - ); - }); - this.writeEof(); - } - - // row is array of columns, not hash - writeTextRow(column) { - this.writePacket( - Packets.TextRow.toPacket(column, this.serverConfig.encoding) - ); - } - - writeBinaryRow(column) { - this.writePacket( - Packets.BinaryRow.toPacket(column, this.serverConfig.encoding) - ); - } - - writeTextResult(rows, columns, binary=false) { - this.writeColumns(columns); - rows.forEach(row => { - const arrayRow = new Array(columns.length); - columns.forEach(column => { - arrayRow.push(row[column.name]); - }); - if(binary) { - this.writeBinaryRow(arrayRow); - } - else this.writeTextRow(arrayRow); - }); - this.writeEof(); - } - - writeEof(warnings, statusFlags) { - this.writePacket(Packets.EOF.toPacket(warnings, statusFlags)); - } - - writeOk(args) { - if (!args) { - args = { affectedRows: 0 }; - } - this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding)); - } - - writeError(args) { - // if we want to send error before initial hello was sent, use default encoding - const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8'; - this.writePacket(Packets.Error.toPacket(args, encoding)); - } - - serverHandshake(args) { - this.serverConfig = args; - this.serverConfig.encoding = - CharsetToEncoding[this.serverConfig.characterSet]; - return this.addCommand(new Commands.ServerHandshake(args)); - } - - // =============================================================== - end(callback) { - if (this.config.isServer) { - this._closing = true; - const quitCmd = new EventEmitter(); - setImmediate(() => { - this.stream.end(); - quitCmd.emit('end'); - }); - return quitCmd; - } - // trigger error if more commands enqueued after end command - const quitCmd = this.addCommand(new Commands.Quit(callback)); - this.addCommand = this._addCommandClosedState; - return quitCmd; - } - - static createQuery(sql, values, cb, config) { - let options = { - rowsAsArray: config.rowsAsArray, - infileStreamFactory: config.infileStreamFactory - }; - if (typeof sql === 'object') { - // query(options, cb) - options = { - ...options, - ...sql, - sql: sql.sql, - values: sql.values - }; - if (typeof values === 'function') { - cb = values; - } else if (values !== undefined) { - options.values = values; - } - } else if (typeof values === 'function') { - // query(sql, cb) - cb = values; - options.sql = sql; - options.values = undefined; - } else { - // query(sql, values, cb) - options.sql = sql; - options.values = values; - } - return new Commands.Query(options, cb); - } - - static statementKey(options) { - return ( - `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}` - ); - } } module.exports = Connection; diff --git a/lib/create_connection.js b/lib/create_connection.js new file mode 100644 index 0000000000..d39618f78f --- /dev/null +++ b/lib/create_connection.js @@ -0,0 +1,10 @@ +'use strict'; + +const Connection = require('./connection.js'); +const ConnectionConfig = require('./connection_config.js'); + +function createConnection(opts) { + return new Connection({ config: new ConnectionConfig(opts) }); +} + +module.exports = createConnection; diff --git a/lib/create_pool.js b/lib/create_pool.js new file mode 100644 index 0000000000..ad19d77b7f --- /dev/null +++ b/lib/create_pool.js @@ -0,0 +1,10 @@ +'use strict'; + +const Pool = require('./pool.js'); +const PoolConfig = require('./pool_config.js'); + +function createPool(config) { + return new Pool({ config: new PoolConfig(config) }); +} + +module.exports = createPool; diff --git a/lib/create_pool_cluster.js b/lib/create_pool_cluster.js new file mode 100644 index 0000000000..4ded78b783 --- /dev/null +++ b/lib/create_pool_cluster.js @@ -0,0 +1,9 @@ +'use strict'; + +const PoolCluster = require('./pool_cluster.js'); + +function createPoolCluster(config) { + return new PoolCluster(config); +} + +module.exports = createPoolCluster; diff --git a/lib/pool.js b/lib/pool.js index dc638d743e..b0f1e2bdc5 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -1,238 +1,12 @@ 'use strict'; -const process = require('process'); -const mysql = require('../index.js'); - -const EventEmitter = require('events').EventEmitter; -const PoolConnection = require('./pool_connection.js'); -const Queue = require('denque'); -const Connection = require('./connection.js'); - -function spliceConnection(queue, connection) { - const len = queue.length; - for (let i = 0; i < len; i++) { - if (queue.get(i) === connection) { - queue.removeOne(i); - break; - } - } -} - -class Pool extends EventEmitter { - constructor(options) { - super(); - this.config = options.config; - this.config.connectionConfig.pool = this; - this._allConnections = new Queue(); - this._freeConnections = new Queue(); - this._connectionQueue = new Queue(); - this._closed = false; - if (this.config.maxIdle < this.config.connectionLimit) { - // create idle connection timeout automatically release job - this._removeIdleTimeoutConnections(); - } - } +const BasePool = require('./base/pool.js'); +class Pool extends BasePool { promise(promiseImpl) { - const PromisePool = require('../promise').PromisePool; + const PromisePool = require('./promise/pool.js'); return new PromisePool(this, promiseImpl); } - - getConnection(cb) { - if (this._closed) { - return process.nextTick(() => cb(new Error('Pool is closed.'))); - } - let connection; - if (this._freeConnections.length > 0) { - connection = this._freeConnections.pop(); - this.emit('acquire', connection); - return process.nextTick(() => cb(null, connection)); - } - if ( - this.config.connectionLimit === 0 || - this._allConnections.length < this.config.connectionLimit - ) { - connection = new PoolConnection(this, { - config: this.config.connectionConfig - }); - this._allConnections.push(connection); - return connection.connect(err => { - if (this._closed) { - return cb(new Error('Pool is closed.')); - } - if (err) { - return cb(err); - } - this.emit('connection', connection); - this.emit('acquire', connection); - return cb(null, connection); - }); - } - if (!this.config.waitForConnections) { - return process.nextTick(() => cb(new Error('No connections available.'))); - } - if ( - this.config.queueLimit && - this._connectionQueue.length >= this.config.queueLimit - ) { - return cb(new Error('Queue limit reached.')); - } - this.emit('enqueue'); - return this._connectionQueue.push(cb); - } - - releaseConnection(connection) { - let cb; - if (!connection._pool) { - // The connection has been removed from the pool and is no longer good. - if (this._connectionQueue.length) { - cb = this._connectionQueue.shift(); - process.nextTick(this.getConnection.bind(this, cb)); - } - } else if (this._connectionQueue.length) { - cb = this._connectionQueue.shift(); - process.nextTick(cb.bind(null, null, connection)); - } else { - this._freeConnections.push(connection); - this.emit('release', connection); - } - } - - end(cb) { - this._closed = true; - clearTimeout(this._removeIdleTimeoutConnectionsTimer); - if (typeof cb !== 'function') { - cb = function(err) { - if (err) { - throw err; - } - }; - } - let calledBack = false; - let closedConnections = 0; - let connection; - const endCB = function(err) { - if (calledBack) { - return; - } - if (err || ++closedConnections >= this._allConnections.length) { - calledBack = true; - cb(err); - return; - } - }.bind(this); - if (this._allConnections.length === 0) { - endCB(); - return; - } - for (let i = 0; i < this._allConnections.length; i++) { - connection = this._allConnections.get(i); - connection._realEnd(endCB); - } - } - - query(sql, values, cb) { - const cmdQuery = Connection.createQuery( - sql, - values, - cb, - this.config.connectionConfig - ); - if (typeof cmdQuery.namedPlaceholders === 'undefined') { - cmdQuery.namedPlaceholders = this.config.connectionConfig.namedPlaceholders; - } - this.getConnection((err, conn) => { - if (err) { - if (typeof cmdQuery.onResult === 'function') { - cmdQuery.onResult(err); - } else { - cmdQuery.emit('error', err); - } - return; - } - try { - conn.query(cmdQuery).once('end', () => { - conn.release(); - }); - } catch (e) { - conn.release(); - throw e; - } - }); - return cmdQuery; - } - - execute(sql, values, cb) { - // TODO construct execute command first here and pass it to connection.execute - // so that polymorphic arguments logic is there in one place - if (typeof values === 'function') { - cb = values; - values = []; - } - this.getConnection((err, conn) => { - if (err) { - return cb(err); - } - try { - conn.execute(sql, values, cb).once('end', () => { - conn.release(); - }); - } catch (e) { - conn.release(); - return cb(e); - } - }); - } - - _removeConnection(connection) { - // Remove connection from all connections - spliceConnection(this._allConnections, connection); - // Remove connection from free connections - spliceConnection(this._freeConnections, connection); - this.releaseConnection(connection); - } - - _removeIdleTimeoutConnections() { - if (this._removeIdleTimeoutConnectionsTimer) { - clearTimeout(this._removeIdleTimeoutConnectionsTimer); - } - - this._removeIdleTimeoutConnectionsTimer = setTimeout(() => { - try { - while ( - this._freeConnections.length > this.config.maxIdle || - (this._freeConnections.length > 0 && - Date.now() - this._freeConnections.get(0).lastActiveTime > - this.config.idleTimeout) - ) { - this._freeConnections.get(0).destroy(); - } - } finally { - this._removeIdleTimeoutConnections(); - } - }, 1000); - } - - format(sql, values) { - return mysql.format( - sql, - values, - this.config.connectionConfig.stringifyObjects, - this.config.connectionConfig.timezone - ); - } - - escape(value) { - return mysql.escape( - value, - this.config.connectionConfig.stringifyObjects, - this.config.connectionConfig.timezone - ); - } - - escapeId(value) { - return mysql.escapeId(value, false); - } } module.exports = Pool; diff --git a/lib/pool_connection.js b/lib/pool_connection.js index 78aac6d6b8..5cc94dc187 100644 --- a/lib/pool_connection.js +++ b/lib/pool_connection.js @@ -1,69 +1,12 @@ 'use strict'; -const Connection = require('../index.js').Connection; - -class PoolConnection extends Connection { - constructor(pool, options) { - super(options); - this._pool = pool; - // The last active time of this connection - this.lastActiveTime = Date.now(); - // When a fatal error occurs the connection's protocol ends, which will cause - // the connection to end as well, thus we only need to watch for the end event - // and we will be notified of disconnects. - // REVIEW: Moved to `once` - this.once('end', () => { - this._removeFromPool(); - }); - this.once('error', () => { - this._removeFromPool(); - }); - } - - release() { - if (!this._pool || this._pool._closed) { - return; - } - // update last active time - this.lastActiveTime = Date.now(); - this._pool.releaseConnection(this); - } +const BasePoolConnection = require('./base/pool_connection.js'); +class PoolConnection extends BasePoolConnection { promise(promiseImpl) { - const PromisePoolConnection = require('../promise').PromisePoolConnection; + const PromisePoolConnection = require('./promise/pool_connection.js'); return new PromisePoolConnection(this, promiseImpl); } - - end() { - const err = new Error( - 'Calling conn.end() to release a pooled connection is ' + - 'deprecated. In next version calling conn.end() will be ' + - 'restored to default conn.end() behavior. Use ' + - 'conn.release() instead.' - ); - this.emit('warn', err); - // eslint-disable-next-line no-console - console.warn(err.message); - this.release(); - } - - destroy() { - this._removeFromPool(); - super.destroy(); - } - - _removeFromPool() { - if (!this._pool || this._pool._closed) { - return; - } - const pool = this._pool; - this._pool = null; - pool._removeConnection(this); - } } -PoolConnection.statementKey = Connection.statementKey; module.exports = PoolConnection; - -// TODO: Remove this when we are removing PoolConnection#end -PoolConnection.prototype._realEnd = Connection.prototype.end; diff --git a/lib/promise/connection.js b/lib/promise/connection.js new file mode 100644 index 0000000000..7cb0628e6e --- /dev/null +++ b/lib/promise/connection.js @@ -0,0 +1,222 @@ +'use strict'; + +const EventEmitter = require('events').EventEmitter; +const PromisePreparedStatementInfo = require('./prepared_statement_info.js'); +const makeDoneCb = require('./make_done_cb.js'); +const inheritEvents = require('./inherit_events.js'); +const BaseConnection = require('../base/connection.js'); + +class PromiseConnection extends EventEmitter { + constructor(connection, promiseImpl) { + super(); + this.connection = connection; + this.Promise = promiseImpl || Promise; + inheritEvents(connection, this, [ + 'error', + 'drain', + 'connect', + 'end', + 'enqueue', + ]); + } + + release() { + this.connection.release(); + } + + query(query, params) { + const c = this.connection; + const localErr = new Error(); + if (typeof params === 'function') { + throw new Error( + 'Callback function is not available with promise clients.', + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (params !== undefined) { + c.query(query, params, done); + } else { + c.query(query, done); + } + }); + } + + execute(query, params) { + const c = this.connection; + const localErr = new Error(); + if (typeof params === 'function') { + throw new Error( + 'Callback function is not available with promise clients.', + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (params !== undefined) { + c.execute(query, params, done); + } else { + c.execute(query, done); + } + }); + } + + end() { + return new this.Promise((resolve) => { + this.connection.end(resolve); + }); + } + + beginTransaction() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + c.beginTransaction(done); + }); + } + + commit() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + c.commit(done); + }); + } + + rollback() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + c.rollback(done); + }); + } + + ping() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.ping((err) => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(true); + } + }); + }); + } + + connect() { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.connect((err, param) => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(param); + } + }); + }); + } + + prepare(options) { + const c = this.connection; + const promiseImpl = this.Promise; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.prepare(options, (err, statement) => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + const wrappedStatement = new PromisePreparedStatementInfo( + statement, + promiseImpl, + ); + resolve(wrappedStatement); + } + }); + }); + } + + changeUser(options) { + const c = this.connection; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + c.changeUser(options, (err) => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(); + } + }); + }); + } + + get config() { + return this.connection.config; + } + + get threadId() { + return this.connection.threadId; + } +} +// patching PromiseConnection +// create facade functions for prototype functions on "Connection" that are not yet +// implemented with PromiseConnection + +// proxy synchronous functions only +(function (functionsToWrap) { + for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { + const func = functionsToWrap[i]; + + if ( + typeof BaseConnection.prototype[func] === 'function' && + PromiseConnection.prototype[func] === undefined + ) { + PromiseConnection.prototype[func] = (function factory(funcName) { + return function () { + return BaseConnection.prototype[funcName].apply( + this.connection, + arguments, + ); + }; + })(func); + } + } +})([ + // synchronous functions + 'close', + 'createBinlogStream', + 'destroy', + 'escape', + 'escapeId', + 'format', + 'pause', + 'pipe', + 'resume', + 'unprepare', +]); + +module.exports = PromiseConnection; diff --git a/lib/promise/inherit_events.js b/lib/promise/inherit_events.js new file mode 100644 index 0000000000..8e59d84fe5 --- /dev/null +++ b/lib/promise/inherit_events.js @@ -0,0 +1,27 @@ +'use strict'; + +function inheritEvents(source, target, events) { + const listeners = {}; + target + .on('newListener', (eventName) => { + if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { + source.on( + eventName, + (listeners[eventName] = function () { + const args = [].slice.call(arguments); + args.unshift(eventName); + + target.emit.apply(target, args); + }), + ); + } + }) + .on('removeListener', (eventName) => { + if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { + source.removeListener(eventName, listeners[eventName]); + delete listeners[eventName]; + } + }); +} + +module.exports = inheritEvents; diff --git a/lib/promise/make_done_cb.js b/lib/promise/make_done_cb.js new file mode 100644 index 0000000000..124303f256 --- /dev/null +++ b/lib/promise/make_done_cb.js @@ -0,0 +1,19 @@ +'use strict'; + +function makeDoneCb(resolve, reject, localErr) { + return function (err, rows, fields) { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sql = err.sql; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve([rows, fields]); + } + }; +} + +module.exports = makeDoneCb; diff --git a/lib/promise/pool.js b/lib/promise/pool.js new file mode 100644 index 0000000000..672caf03a9 --- /dev/null +++ b/lib/promise/pool.js @@ -0,0 +1,112 @@ +'use strict'; + +const EventEmitter = require('events').EventEmitter; +const makeDoneCb = require('./make_done_cb.js'); +const PromisePoolConnection = require('./pool_connection.js'); +const inheritEvents = require('./inherit_events.js'); +const BasePool = require('../base/pool.js'); + +class PromisePool extends EventEmitter { + constructor(pool, thePromise) { + super(); + this.pool = pool; + this.Promise = thePromise || Promise; + inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']); + } + + getConnection() { + const corePool = this.pool; + return new this.Promise((resolve, reject) => { + corePool.getConnection((err, coreConnection) => { + if (err) { + reject(err); + } else { + resolve(new PromisePoolConnection(coreConnection, this.Promise)); + } + }); + }); + } + + releaseConnection(connection) { + if (connection instanceof PromisePoolConnection) connection.release(); + } + + query(sql, args) { + const corePool = this.pool; + const localErr = new Error(); + if (typeof args === 'function') { + throw new Error( + 'Callback function is not available with promise clients.', + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (args !== undefined) { + corePool.query(sql, args, done); + } else { + corePool.query(sql, done); + } + }); + } + + execute(sql, args) { + const corePool = this.pool; + const localErr = new Error(); + if (typeof args === 'function') { + throw new Error( + 'Callback function is not available with promise clients.', + ); + } + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (args) { + corePool.execute(sql, args, done); + } else { + corePool.execute(sql, done); + } + }); + } + + end() { + const corePool = this.pool; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + corePool.end((err) => { + if (err) { + localErr.message = err.message; + localErr.code = err.code; + localErr.errno = err.errno; + localErr.sqlState = err.sqlState; + localErr.sqlMessage = err.sqlMessage; + reject(localErr); + } else { + resolve(); + } + }); + }); + } +} + +(function (functionsToWrap) { + for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { + const func = functionsToWrap[i]; + + if ( + typeof BasePool.prototype[func] === 'function' && + PromisePool.prototype[func] === undefined + ) { + PromisePool.prototype[func] = (function factory(funcName) { + return function () { + return BasePool.prototype[funcName].apply(this.pool, arguments); + }; + })(func); + } + } +})([ + // synchronous functions + 'escape', + 'escapeId', + 'format', +]); + +module.exports = PromisePool; diff --git a/lib/promise/pool_connection.js b/lib/promise/pool_connection.js new file mode 100644 index 0000000000..1da0113c06 --- /dev/null +++ b/lib/promise/pool_connection.js @@ -0,0 +1,19 @@ +'use strict'; + +const PromiseConnection = require('./connection.js'); +const BasePoolConnection = require('../base/pool_connection.js'); + +class PromisePoolConnection extends PromiseConnection { + constructor(connection, promiseImpl) { + super(connection, promiseImpl); + } + + destroy() { + return BasePoolConnection.prototype.destroy.apply( + this.connection, + arguments, + ); + } +} + +module.exports = PromisePoolConnection; diff --git a/lib/promise/prepared_statement_info.js b/lib/promise/prepared_statement_info.js new file mode 100644 index 0000000000..47b9bedf1e --- /dev/null +++ b/lib/promise/prepared_statement_info.js @@ -0,0 +1,32 @@ +'use strict'; + +const makeDoneCb = require('./make_done_cb.js'); + +class PromisePreparedStatementInfo { + constructor(statement, promiseImpl) { + this.statement = statement; + this.Promise = promiseImpl; + } + + execute(parameters) { + const s = this.statement; + const localErr = new Error(); + return new this.Promise((resolve, reject) => { + const done = makeDoneCb(resolve, reject, localErr); + if (parameters) { + s.execute(parameters, done); + } else { + s.execute(done); + } + }); + } + + close() { + return new this.Promise((resolve) => { + this.statement.close(); + resolve(); + }); + } +} + +module.exports = PromisePreparedStatementInfo; diff --git a/promise.js b/promise.js index 5d80e21e72..a0216f5660 100644 --- a/promise.js +++ b/promise.js @@ -1,269 +1,34 @@ 'use strict'; -const core = require('./index.js'); +const SqlString = require('sqlstring'); const EventEmitter = require('events').EventEmitter; const parserCache = require('./lib/parsers/parser_cache.js'); - -function makeDoneCb(resolve, reject, localErr) { - return function (err, rows, fields) { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sql = err.sql; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve([rows, fields]); - } - }; -} - -function inheritEvents(source, target, events) { - const listeners = {}; - target - .on('newListener', eventName => { - if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { - source.on( - eventName, - (listeners[eventName] = function () { - const args = [].slice.call(arguments); - args.unshift(eventName); - - target.emit.apply(target, args); - }) - ); - } - }) - .on('removeListener', eventName => { - if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) { - source.removeListener(eventName, listeners[eventName]); - delete listeners[eventName]; - } - }); -} - -class PromisePreparedStatementInfo { - constructor(statement, promiseImpl) { - this.statement = statement; - this.Promise = promiseImpl; - } - - execute(parameters) { - const s = this.statement; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (parameters) { - s.execute(parameters, done); - } else { - s.execute(done); - } - }); - } - - close() { - return new this.Promise(resolve => { - this.statement.close(); - resolve(); - }); - } -} - -class PromiseConnection extends EventEmitter { - constructor(connection, promiseImpl) { - super(); - this.connection = connection; - this.Promise = promiseImpl || Promise; - inheritEvents(connection, this, [ - 'error', - 'drain', - 'connect', - 'end', - 'enqueue' - ]); - } - - release() { - this.connection.release(); - } - - query(query, params) { - const c = this.connection; - const localErr = new Error(); - if (typeof params === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (params !== undefined) { - c.query(query, params, done); - } else { - c.query(query, done); - } - }); - } - - execute(query, params) { - const c = this.connection; - const localErr = new Error(); - if (typeof params === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (params !== undefined) { - c.execute(query, params, done); - } else { - c.execute(query, done); - } - }); - } - - end() { - return new this.Promise(resolve => { - this.connection.end(resolve); - }); - } - - beginTransaction() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - c.beginTransaction(done); - }); - } - - commit() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - c.commit(done); - }); - } - - rollback() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - c.rollback(done); - }); - } - - ping() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.ping(err => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(true); - } - }); - }); - } - - connect() { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.connect((err, param) => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(param); - } - }); - }); - } - - prepare(options) { - const c = this.connection; - const promiseImpl = this.Promise; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.prepare(options, (err, statement) => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - const wrappedStatement = new PromisePreparedStatementInfo( - statement, - promiseImpl - ); - resolve(wrappedStatement); - } - }); - }); - } - - changeUser(options) { - const c = this.connection; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - c.changeUser(options, err => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(); - } - }); - }); - } - - get config() { - return this.connection.config; - } - - get threadId() { - return this.connection.threadId; - } -} - -function createConnection(opts) { - const coreConnection = core.createConnection(opts); +const PoolCluster = require('./lib/pool_cluster.js'); +const createConnection = require('./lib/create_connection.js'); +const createPool = require('./lib/create_pool.js'); +const createPoolCluster = require('./lib/create_pool_cluster.js'); +const PromiseConnection = require('./lib/promise/connection.js'); +const PromisePool = require('./lib/promise/pool.js'); +const makeDoneCb = require('./lib/promise/make_done_cb.js'); +const PromisePoolConnection = require('./lib/promise/pool_connection.js'); +const inheritEvents = require('./lib/promise/inherit_events.js'); + +function createConnectionPromise(opts) { + const coreConnection = createConnection(opts); const createConnectionErr = new Error(); const thePromise = opts.Promise || Promise; if (!thePromise) { throw new Error( 'no Promise implementation available.' + - 'Use promise-enabled node version or pass userland Promise' + - " implementation as parameter, for example: { Promise: require('bluebird') }" + 'Use promise-enabled node version or pass userland Promise' + + " implementation as parameter, for example: { Promise: require('bluebird') }", ); } return new thePromise((resolve, reject) => { coreConnection.once('connect', () => { resolve(new PromiseConnection(coreConnection, thePromise)); }); - coreConnection.once('error', err => { + coreConnection.once('error', (err) => { createConnectionErr.message = err.message; createConnectionErr.code = err.code; createConnectionErr.errno = err.errno; @@ -276,173 +41,20 @@ function createConnection(opts) { // note: the callback of "changeUser" is not called on success // hence there is no possibility to call "resolve" -// patching PromiseConnection -// create facade functions for prototype functions on "Connection" that are not yet -// implemented with PromiseConnection - -// proxy synchronous functions only -(function (functionsToWrap) { - for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { - const func = functionsToWrap[i]; - - if ( - typeof core.Connection.prototype[func] === 'function' && - PromiseConnection.prototype[func] === undefined - ) { - PromiseConnection.prototype[func] = (function factory(funcName) { - return function () { - return core.Connection.prototype[funcName].apply( - this.connection, - arguments - ); - }; - })(func); - } - } -})([ - // synchronous functions - 'close', - 'createBinlogStream', - 'destroy', - 'escape', - 'escapeId', - 'format', - 'pause', - 'pipe', - 'resume', - 'unprepare' -]); - -class PromisePoolConnection extends PromiseConnection { - constructor(connection, promiseImpl) { - super(connection, promiseImpl); - } - - destroy() { - return core.PoolConnection.prototype.destroy.apply( - this.connection, - arguments - ); - } -} - -class PromisePool extends EventEmitter { - constructor(pool, thePromise) { - super(); - this.pool = pool; - this.Promise = thePromise || Promise; - inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']); - } - - getConnection() { - const corePool = this.pool; - return new this.Promise((resolve, reject) => { - corePool.getConnection((err, coreConnection) => { - if (err) { - reject(err); - } else { - resolve(new PromisePoolConnection(coreConnection, this.Promise)); - } - }); - }); - } - - releaseConnection(connection) { - if (connection instanceof PromisePoolConnection) connection.release(); - } - - query(sql, args) { - const corePool = this.pool; - const localErr = new Error(); - if (typeof args === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (args !== undefined) { - corePool.query(sql, args, done); - } else { - corePool.query(sql, done); - } - }); - } - - execute(sql, args) { - const corePool = this.pool; - const localErr = new Error(); - if (typeof args === 'function') { - throw new Error( - 'Callback function is not available with promise clients.' - ); - } - return new this.Promise((resolve, reject) => { - const done = makeDoneCb(resolve, reject, localErr); - if (args) { - corePool.execute(sql, args, done); - } else { - corePool.execute(sql, done); - } - }); - } - - end() { - const corePool = this.pool; - const localErr = new Error(); - return new this.Promise((resolve, reject) => { - corePool.end(err => { - if (err) { - localErr.message = err.message; - localErr.code = err.code; - localErr.errno = err.errno; - localErr.sqlState = err.sqlState; - localErr.sqlMessage = err.sqlMessage; - reject(localErr); - } else { - resolve(); - } - }); - }); - } -} - -function createPool(opts) { - const corePool = core.createPool(opts); +function createPromisePool(opts) { + const corePool = createPool(opts); const thePromise = opts.Promise || Promise; if (!thePromise) { throw new Error( 'no Promise implementation available.' + - 'Use promise-enabled node version or pass userland Promise' + - " implementation as parameter, for example: { Promise: require('bluebird') }" + 'Use promise-enabled node version or pass userland Promise' + + " implementation as parameter, for example: { Promise: require('bluebird') }", ); } return new PromisePool(corePool, thePromise); } -(function (functionsToWrap) { - for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) { - const func = functionsToWrap[i]; - - if ( - typeof core.Pool.prototype[func] === 'function' && - PromisePool.prototype[func] === undefined - ) { - PromisePool.prototype[func] = (function factory(funcName) { - return function () { - return core.Pool.prototype[funcName].apply(this.pool, arguments); - }; - })(func); - } - } -})([ - // synchronous functions - 'escape', - 'escapeId', - 'format' -]); - class PromisePoolCluster extends EventEmitter { constructor(poolCluster, thePromise) { super(); @@ -454,13 +66,17 @@ class PromisePoolCluster extends EventEmitter { getConnection(pattern, selector) { const corePoolCluster = this.poolCluster; return new this.Promise((resolve, reject) => { - corePoolCluster.getConnection(pattern, selector, (err, coreConnection) => { - if (err) { - reject(err); - } else { - resolve(new PromisePoolConnection(coreConnection, this.Promise)); - } - }); + corePoolCluster.getConnection( + pattern, + selector, + (err, coreConnection) => { + if (err) { + reject(err); + } else { + resolve(new PromisePoolConnection(coreConnection, this.Promise)); + } + }, + ); }); } @@ -469,7 +85,7 @@ class PromisePoolCluster extends EventEmitter { const localErr = new Error(); if (typeof args === 'function') { throw new Error( - 'Callback function is not available with promise clients.' + 'Callback function is not available with promise clients.', ); } return new this.Promise((resolve, reject) => { @@ -483,7 +99,7 @@ class PromisePoolCluster extends EventEmitter { const localErr = new Error(); if (typeof args === 'function') { throw new Error( - 'Callback function is not available with promise clients.' + 'Callback function is not available with promise clients.', ); } return new this.Promise((resolve, reject) => { @@ -495,7 +111,7 @@ class PromisePoolCluster extends EventEmitter { of(pattern, selector) { return new PromisePoolCluster( this.poolCluster.of(pattern, selector), - this.Promise + this.Promise, ); } @@ -503,7 +119,7 @@ class PromisePoolCluster extends EventEmitter { const corePoolCluster = this.poolCluster; const localErr = new Error(); return new this.Promise((resolve, reject) => { - corePoolCluster.end(err => { + corePoolCluster.end((err) => { if (err) { localErr.message = err.message; localErr.code = err.code; @@ -527,40 +143,41 @@ class PromisePoolCluster extends EventEmitter { const func = functionsToWrap[i]; if ( - typeof core.PoolCluster.prototype[func] === 'function' && + typeof PoolCluster.prototype[func] === 'function' && PromisePoolCluster.prototype[func] === undefined ) { PromisePoolCluster.prototype[func] = (function factory(funcName) { return function () { - return core.PoolCluster.prototype[funcName].apply(this.poolCluster, arguments); + return PoolCluster.prototype[funcName].apply( + this.poolCluster, + arguments, + ); }; })(func); } } -})([ - 'add' -]); +})(['add']); -function createPoolCluster(opts) { - const corePoolCluster = core.createPoolCluster(opts); +function createPromisePoolCluster(opts) { + const corePoolCluster = createPoolCluster(opts); const thePromise = (opts && opts.Promise) || Promise; if (!thePromise) { throw new Error( 'no Promise implementation available.' + - 'Use promise-enabled node version or pass userland Promise' + - " implementation as parameter, for example: { Promise: require('bluebird') }" + 'Use promise-enabled node version or pass userland Promise' + + " implementation as parameter, for example: { Promise: require('bluebird') }", ); } return new PromisePoolCluster(corePoolCluster, thePromise); } -exports.createConnection = createConnection; -exports.createPool = createPool; -exports.createPoolCluster = createPoolCluster; -exports.escape = core.escape; -exports.escapeId = core.escapeId; -exports.format = core.format; -exports.raw = core.raw; +exports.createConnection = createConnectionPromise; +exports.createPool = createPromisePool; +exports.createPoolCluster = createPromisePoolCluster; +exports.escape = SqlString.escape; +exports.escapeId = SqlString.escapeId; +exports.format = SqlString.format; +exports.raw = SqlString.raw; exports.PromisePool = PromisePool; exports.PromiseConnection = PromiseConnection; exports.PromisePoolConnection = PromisePoolConnection; @@ -568,17 +185,17 @@ exports.PromisePoolConnection = PromisePoolConnection; exports.__defineGetter__('Types', () => require('./lib/constants/types.js')); exports.__defineGetter__('Charsets', () => - require('./lib/constants/charsets.js') + require('./lib/constants/charsets.js'), ); exports.__defineGetter__('CharsetToEncoding', () => - require('./lib/constants/charset_encodings.js') + require('./lib/constants/charset_encodings.js'), ); -exports.setMaxParserCache = function(max) { +exports.setMaxParserCache = function (max) { parserCache.setMaxCache(max); }; -exports.clearParserCache = function() { +exports.clearParserCache = function () { parserCache.clearCache(); };