diff --git a/lib/DataHandler.ts b/lib/DataHandler.ts index de22b0a2..9cc1c1ed 100644 --- a/lib/DataHandler.ts +++ b/lib/DataHandler.ts @@ -10,20 +10,29 @@ const debug = Debug("dataHandler"); type ReplyData = string | Buffer | number | Array; -interface Condition { +export interface Condition { select: number; - auth: string; + auth?: string | [string, string]; subscriber: false | SubscriptionSet; } -interface DataHandledable extends EventEmitter { +export type FlushQueueOptions = { + offlineQueue?: boolean; + commandQueue?: boolean; +}; + +export interface DataHandledable extends EventEmitter { stream: NetStream; status: string; - condition: Condition; + condition: Condition | null; commandQueue: Deque; disconnect(reconnect: boolean): void; - recoverFromFatalError(commandError: Error, err: Error, options: any): void; + recoverFromFatalError( + commandError: Error, + err: Error, + options: FlushQueueOptions + ): void; handleReconnection(err: Error, item: CommandItem): void; } diff --git a/lib/Redis.ts b/lib/Redis.ts index a220fe06..cdcabb31 100644 --- a/lib/Redis.ts +++ b/lib/Redis.ts @@ -3,6 +3,7 @@ import { EventEmitter } from "events"; import asCallback from "standard-as-callback"; import Cluster from "./cluster"; import Command from "./Command"; +import { DataHandledable, FlushQueueOptions, Condition } from "./DataHandler"; import { StandaloneConnector } from "./connectors"; import AbstractConnector from "./connectors/AbstractConnector"; import SentinelConnector from "./connectors/SentinelConnector"; @@ -60,7 +61,7 @@ type RedisStatus = * } * ``` */ -class Redis extends Commander { +class Redis extends Commander implements DataHandledable { static Cluster = Cluster; static Command = Command; /** @@ -89,14 +90,18 @@ class Redis extends Commander { */ isCluster = false; + /** + * @ignore + */ + condition: Condition | null; + + /** + * @ignore + */ + commandQueue: Deque; + private connector: AbstractConnector; private reconnectTimeout: ReturnType | null = null; - private condition: { - select: number; - auth?: string | [string, string]; - subscriber: boolean; - }; - private commandQueue: Deque; private offlineQueue: Deque; private connectionEpoch = 0; private retryAttempts = 0; @@ -220,9 +225,11 @@ class Redis extends Commander { // Node ignores setKeepAlive before connect, therefore we wait for the event: // https://github.com/nodejs/node/issues/31663 - if (typeof options.keepAlive === 'number') { + if (typeof options.keepAlive === "number") { if (stream.connecting) { - stream.once(CONNECT_EVENT, () => stream.setKeepAlive(true, options.keepAlive)); + stream.once(CONNECT_EVENT, () => { + stream.setKeepAlive(true, options.keepAlive); + }); } else { stream.setKeepAlive(true, options.keepAlive); } @@ -344,10 +351,10 @@ class Redis extends Commander { * One of `"normal"`, `"subscriber"`, or `"monitor"`. When the connection is * not in `"normal"` mode, certain commands are not allowed. */ - get mode(): "normal" | "subscriber" | "monitor" { + get mode(): "normal" | "subscriber" | "monitor" { return this.options.monitor ? "monitor" - : this.condition && this.condition.subscriber + : this.condition?.subscriber ? "subscriber" : "normal"; } @@ -421,7 +428,7 @@ class Redis extends Commander { return command.promise; } if ( - this.condition.subscriber && + this.condition?.subscriber && !Command.checkFlag("VALID_IN_SUBSCRIBER_MODE", command.name) ) { command.reject( @@ -491,7 +498,7 @@ class Redis extends Commander { debug( "write command[%s]: %d -> %s(%o)", this._getDescription(), - this.condition.select, + this.condition?.select, command.name, command.args ); @@ -600,45 +607,22 @@ class Redis extends Commander { } /** - * Get description of the connection. Used for debugging. + * @ignore */ - private _getDescription() { - let description; - if ("path" in this.options && this.options.path) { - description = this.options.path; - } else if ( - this.stream && - this.stream.remoteAddress && - this.stream.remotePort - ) { - description = this.stream.remoteAddress + ":" + this.stream.remotePort; - } else if ("host" in this.options && this.options.host) { - description = this.options.host + ":" + this.options.port; - } else { - // Unexpected - description = ""; - } - if (this.options.connectionName) { - description += ` (${this.options.connectionName})`; - } - return description; - } - - private resetCommandQueue() { - this.commandQueue = new Deque(); - } - - private resetOfflineQueue() { - this.offlineQueue = new Deque(); - } - - private recoverFromFatalError(commandError, err: Error | null, options) { + recoverFromFatalError( + _commandError: Error, + err: Error, + options: FlushQueueOptions + ) { this.flushQueue(err, options); this.silentEmit("error", err); this.disconnect(true); } - private handleReconnection(err: Error, item: CommandItem) { + /** + * @ignore + */ + handleReconnection(err: Error, item: CommandItem) { let needReconnect: ReturnType = false; if (this.options.reconnectOnError) { needReconnect = this.options.reconnectOnError(err); @@ -657,7 +641,7 @@ class Redis extends Commander { this.disconnect(true); } if ( - this.condition.select !== item.select && + this.condition?.select !== item.select && item.command.name !== "select" ) { this.select(item.select); @@ -671,6 +655,39 @@ class Redis extends Commander { } } + /** + * Get description of the connection. Used for debugging. + */ + private _getDescription() { + let description; + if ("path" in this.options && this.options.path) { + description = this.options.path; + } else if ( + this.stream && + this.stream.remoteAddress && + this.stream.remotePort + ) { + description = this.stream.remoteAddress + ":" + this.stream.remotePort; + } else if ("host" in this.options && this.options.host) { + description = this.options.host + ":" + this.options.port; + } else { + // Unexpected + description = ""; + } + if (this.options.connectionName) { + description += ` (${this.options.connectionName})`; + } + return description; + } + + private resetCommandQueue() { + this.commandQueue = new Deque(); + } + + private resetOfflineQueue() { + this.offlineQueue = new Deque(); + } + private parseOptions(...args: unknown[]) { const options: Record = {}; let isTls = false; @@ -744,7 +761,7 @@ class Redis extends Commander { * @param error The error object to send to the commands * @param options options */ - private flushQueue(error: Error, options?: RedisOptions) { + private flushQueue(error: Error, options?: FlushQueueOptions) { options = defaults({}, options, { offlineQueue: true, commandQueue: true, diff --git a/lib/ScanStream.ts b/lib/ScanStream.ts index 293be201..e74a7a2c 100644 --- a/lib/ScanStream.ts +++ b/lib/ScanStream.ts @@ -10,7 +10,7 @@ interface Options extends ReadableOptions { } /** - * Convenient class to convert the process of scaning keys to a readable stream. + * Convenient class to convert the process of scanning keys to a readable stream. */ export default class ScanStream extends Readable { private _redisCursor = "0"; diff --git a/lib/Script.ts b/lib/Script.ts index 695097db..f0682aaa 100644 --- a/lib/Script.ts +++ b/lib/Script.ts @@ -8,7 +8,7 @@ export default class Script { constructor( private lua: string, - private numberOfKeys: number = null, + private numberOfKeys: number | null = null, private keyPrefix: string = "", private readOnly: boolean = false ) { diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index c383d652..8419b1a3 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -564,7 +564,7 @@ class Cluster extends Commander { redis = nodes[0]; } } else { - let key; + let key: string; if (to === "all") { key = sample(nodeKeys); } else if (to === "slave" && nodeKeys.length > 1) { diff --git a/lib/redis/event_handler.ts b/lib/redis/event_handler.ts index 468a56bb..82470e80 100644 --- a/lib/redis/event_handler.ts +++ b/lib/redis/event_handler.ts @@ -116,7 +116,7 @@ function abortError(command: Respondable) { function abortIncompletePipelines(commandQueue: Deque) { let expectedIndex = 0; for (let i = 0; i < commandQueue.length; ) { - const command = commandQueue.peekAt(i).command as Command; + const command = commandQueue.peekAt(i)?.command as Command; const pipelineIndex = command.pipelineIndex; if (pipelineIndex === undefined || pipelineIndex === 0) { expectedIndex = 0; @@ -135,7 +135,7 @@ function abortIncompletePipelines(commandQueue: Deque) { // offline queue function abortTransactionFragments(commandQueue: Deque) { for (let i = 0; i < commandQueue.length; ) { - const command = commandQueue.peekAt(i).command as Command; + const command = commandQueue.peekAt(i)?.command as Command; if (command.name === "multi") { break; } diff --git a/lib/utils/index.ts b/lib/utils/index.ts index 9cd6da3e..46bf2161 100644 --- a/lib/utils/index.ts +++ b/lib/utils/index.ts @@ -43,13 +43,13 @@ export function convertBufferToString(value: any, encoding?: BufferEncoding) { * expect(output).to.eql([[null, 'a'], [null, 'b'], [new Error('c')], [null, 'd']) * ``` */ -export function wrapMultiResult(arr: unknown[] | null): unknown[][] { +export function wrapMultiResult(arr: unknown[] | null): unknown[][] | null { // When using WATCH/EXEC transactions, the EXEC will return // a null instead of an array if (!arr) { return null; } - const result = []; + const result: unknown[][] = []; const length = arr.length; for (let i = 0; i < length; ++i) { const item = arr[i]; @@ -133,7 +133,7 @@ export function timeout( export function convertObjectToArray( obj: Record ): (string | T)[] { - const result = []; + const result: (string | T)[] = []; const keys = Object.keys(obj); // Object.entries requires node 7+ for (let i = 0, l = keys.length; i < l; i++) { @@ -185,7 +185,7 @@ export function optimizeErrorStack( ) { const stacks = friendlyStack.split("\n"); let lines = ""; - let i; + let i: number; for (i = 1; i < stacks.length; ++i) { if (stacks[i].indexOf(filterPath) === -1) { break; @@ -194,8 +194,10 @@ export function optimizeErrorStack( for (let j = i; j < stacks.length; ++j) { lines += "\n" + stacks[j]; } - const pos = error.stack.indexOf("\n"); - error.stack = error.stack.slice(0, pos) + lines; + if (error.stack) { + const pos = error.stack.indexOf("\n"); + error.stack = error.stack.slice(0, pos) + lines; + } return error; } @@ -278,7 +280,7 @@ export function resolveTLSProfile(options: TLSOptions): TLSOptions { export function sample(array: T[], from = 0): T { const length = array.length; if (from >= length) { - return; + return null; } return array[from + Math.floor(Math.random() * (length - from))]; }