diff --git a/packages/client/lib/tests/test-scenario/test-command-runner.ts b/packages/client/lib/tests/test-scenario/test-command-runner.ts index d6aeba0902..9e1acc3a8a 100644 --- a/packages/client/lib/tests/test-scenario/test-command-runner.ts +++ b/packages/client/lib/tests/test-scenario/test-command-runner.ts @@ -22,12 +22,11 @@ type FireCommandsUntilStopSignalOptions = { ) => Array<() => Promise>; }; +/** + * Utility class for running test commands until a stop signal is received + */ export class TestCommandRunner { - constructor( - private client: ReturnType> - ) {} - - private defaultOptions: FireCommandsUntilStopSignalOptions = { + private static readonly defaultOptions: FireCommandsUntilStopSignalOptions = { batchSize: 60, timeoutMs: 10, createCommands: ( @@ -38,7 +37,7 @@ export class TestCommandRunner { ], }; - #toSettled(p: Promise) { + static #toSettled(p: Promise) { return p .then((value) => ({ status: "fulfilled" as const, value, error: null })) .catch((reason) => ({ @@ -48,7 +47,7 @@ export class TestCommandRunner { })); } - async #racePromises({ + static async #racePromises({ timeout, stopper, }: { @@ -56,26 +55,31 @@ export class TestCommandRunner { stopper: Promise; }) { return Promise.race([ - this.#toSettled(timeout).then((result) => ({ + TestCommandRunner.#toSettled(timeout).then((result) => ({ ...result, stop: false, })), - this.#toSettled(stopper).then((result) => ({ ...result, stop: true })), + TestCommandRunner.#toSettled(stopper).then((result) => ({ + ...result, + stop: true, + })), ]); } /** - * Fires commands until a stop signal is received. - * @param stopSignalPromise Promise that resolves when the command execution should stop - * @param options Options for the command execution - * @returns Promise that resolves when the stop signal is received + * Fires a batch of test commands until a stop signal is received + * @param client - The Redis client to use + * @param stopSignalPromise - Promise that resolves when the execution should stop + * @param options - Options for the command execution + * @returns An object containing the promises of all executed commands and the result of the stop signal */ - async fireCommandsUntilStopSignal( + static async fireCommandsUntilStopSignal( + client: ReturnType>, stopSignalPromise: Promise, options?: Partial ) { const executeOptions = { - ...this.defaultOptions, + ...TestCommandRunner.defaultOptions, ...options, }; @@ -83,12 +87,12 @@ export class TestCommandRunner { while (true) { for (let i = 0; i < executeOptions.batchSize; i++) { - for (const command of executeOptions.createCommands(this.client)) { - commandPromises.push(this.#toSettled(command())); + for (const command of executeOptions.createCommands(client)) { + commandPromises.push(TestCommandRunner.#toSettled(command())); } } - const result = await this.#racePromises({ + const result = await TestCommandRunner.#racePromises({ timeout: setTimeout(executeOptions.timeoutMs), stopper: stopSignalPromise, }); diff --git a/packages/client/lib/tests/test-scenario/test-scenario.util.ts b/packages/client/lib/tests/test-scenario/test-scenario.util.ts index 82333be8f0..b130cdc538 100644 --- a/packages/client/lib/tests/test-scenario/test-scenario.util.ts +++ b/packages/client/lib/tests/test-scenario/test-scenario.util.ts @@ -1,4 +1,6 @@ import { readFileSync } from "fs"; +import { createClient, RedisClientOptions } from "../../.."; +import { stub } from "sinon"; type DatabaseEndpoint = { addr: string[]; @@ -108,3 +110,88 @@ export function getDatabaseConfig( }; } +// TODO this should be moved in the tests utils package +export async function blockSetImmediate(fn: () => Promise) { + let setImmediateStub: any; + + try { + setImmediateStub = stub(global, "setImmediate"); + setImmediateStub.callsFake(() => { + //Dont call the callback, effectively blocking execution + }); + await fn(); + } finally { + if (setImmediateStub) { + setImmediateStub.restore(); + } + } +} + +/** + * Factory class for creating and managing Redis clients + */ +export class ClientFactory { + private readonly clients = new Map< + string, + ReturnType> + >(); + + constructor(private readonly config: RedisConnectionConfig) {} + + /** + * Creates a new client with the specified options and connects it to the database + * @param key - The key to store the client under + * @param options - Optional client options + * @returns The created and connected client + */ + async create(key: string, options: Partial = {}) { + const client = createClient({ + socket: { + host: this.config.host, + port: this.config.port, + ...(this.config.tls === true ? { tls: true } : {}), + }, + password: this.config.password, + username: this.config.username, + RESP: 3, + maintPushNotifications: "auto", + maintMovingEndpointType: "auto", + ...options, + }); + + client.on("error", (err: Error) => { + throw new Error(`Client error: ${err.message}`); + }); + + await client.connect(); + + this.clients.set(key, client); + + return client; + } + + /** + * Gets an existing client by key or the first one if no key is provided + * @param key - The key of the client to retrieve + * @returns The client if found, undefined otherwise + */ + get(key?: string) { + if (key) { + return this.clients.get(key); + } + + // Get the first one if no key is provided + return this.clients.values().next().value; + } + + /** + * Destroys all created clients + */ + destroyAll() { + this.clients.forEach((client) => { + if (client && client.isOpen) { + client.destroy(); + } + }); + } +} diff --git a/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts b/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts index fbb63aa9fc..7bdf23fcb1 100644 --- a/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts +++ b/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts @@ -1,11 +1,13 @@ import assert from "node:assert"; -import { setTimeout } from "node:timers/promises"; + import { FaultInjectorClient } from "./fault-injector-client"; import { + ClientFactory, getDatabaseConfig, getDatabaseConfigFromEnv, getEnvConfig, RedisConnectionConfig, + blockSetImmediate } from "./test-scenario.util"; import { createClient } from "../../.."; import { before } from "mocha"; @@ -13,9 +15,9 @@ import { TestCommandRunner } from "./test-command-runner"; describe("Timeout Handling During Notifications", () => { let clientConfig: RedisConnectionConfig; - let client: ReturnType>; + let clientFactory: ClientFactory; let faultInjectorClient: FaultInjectorClient; - let commandRunner: TestCommandRunner; + let defaultClient: ReturnType>; before(() => { const envConfig = getEnvConfig(); @@ -23,41 +25,27 @@ describe("Timeout Handling During Notifications", () => { envConfig.redisEndpointsConfigPath ); - faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); clientConfig = getDatabaseConfig(redisConfig); + faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); + clientFactory = new ClientFactory(clientConfig); }); beforeEach(async () => { - client = createClient({ - socket: { - host: clientConfig.host, - port: clientConfig.port, - ...(clientConfig.tls === true ? { tls: true } : {}), - }, - password: clientConfig.password, - username: clientConfig.username, - RESP: 3, - maintPushNotifications: "auto", - maintMovingEndpointType: "auto", - }); - - client.on("error", (err: Error) => { - throw new Error(`Client error: ${err.message}`); - }); - - commandRunner = new TestCommandRunner(client); + defaultClient = await clientFactory.create("default"); - await client.connect(); + await defaultClient.flushAll(); }); - afterEach(() => { - client.destroy(); + afterEach(async () => { + clientFactory.destroyAll(); }); it("should relax command timeout on MOVING, MIGRATING, and MIGRATED", async () => { // PART 1 // Set very low timeout to trigger errors - client.options!.maintRelaxedCommandTimeout = 50; + const lowTimeoutClient = await clientFactory.create("lowTimeout", { + maintRelaxedCommandTimeout: 50, + }); const { action_id: lowTimeoutBindAndMigrateActionId } = await faultInjectorClient.migrateAndBindAction({ @@ -70,7 +58,10 @@ describe("Timeout Handling During Notifications", () => { ); const lowTimeoutCommandPromises = - await commandRunner.fireCommandsUntilStopSignal(lowTimeoutWaitPromise); + await TestCommandRunner.fireCommandsUntilStopSignal( + lowTimeoutClient, + lowTimeoutWaitPromise + ); const lowTimeoutRejectedCommands = ( await Promise.all(lowTimeoutCommandPromises.commandPromises) @@ -90,7 +81,9 @@ describe("Timeout Handling During Notifications", () => { // PART 2 // Set high timeout to avoid errors - client.options!.maintRelaxedCommandTimeout = 10000; + const highTimeoutClient = await clientFactory.create("highTimeout", { + maintRelaxedCommandTimeout: 10000, + }); const { action_id: highTimeoutBindAndMigrateActionId } = await faultInjectorClient.migrateAndBindAction({ @@ -103,7 +96,10 @@ describe("Timeout Handling During Notifications", () => { ); const highTimeoutCommandPromises = - await commandRunner.fireCommandsUntilStopSignal(highTimeoutWaitPromise); + await TestCommandRunner.fireCommandsUntilStopSignal( + highTimeoutClient, + highTimeoutWaitPromise + ); const highTimeoutRejectedCommands = ( await Promise.all(highTimeoutCommandPromises.commandPromises) @@ -112,13 +108,15 @@ describe("Timeout Handling During Notifications", () => { assert.strictEqual(highTimeoutRejectedCommands.length, 0); }); - // TODO this is WIP - it.skip("should unrelax command timeout after MAINTENANCE", async () => { - client.options!.maintRelaxedCommandTimeout = 10000; - client.options!.commandOptions = { - ...client.options!.commandOptions, - timeout: 1, // Set very low timeout to trigger errors - }; + it("should unrelax command timeout after MAINTENANCE", async () => { + const clientWithCommandTimeout = await clientFactory.create( + "clientWithCommandTimeout", + { + commandOptions: { + timeout: 100, + }, + } + ); const { action_id: bindAndMigrateActionId } = await faultInjectorClient.migrateAndBindAction({ @@ -131,25 +129,31 @@ describe("Timeout Handling During Notifications", () => { ); const relaxedTimeoutCommandPromises = - await commandRunner.fireCommandsUntilStopSignal(lowTimeoutWaitPromise); + await TestCommandRunner.fireCommandsUntilStopSignal( + clientWithCommandTimeout, + lowTimeoutWaitPromise + ); const relaxedTimeoutRejectedCommands = ( await Promise.all(relaxedTimeoutCommandPromises.commandPromises) ).filter((result) => result.status === "rejected"); - console.log( - "relaxedTimeoutRejectedCommands", - relaxedTimeoutRejectedCommands - ); assert.ok(relaxedTimeoutRejectedCommands.length === 0); - const unrelaxedCommandPromises = - await commandRunner.fireCommandsUntilStopSignal(setTimeout(1 * 1000)); + const start = performance.now(); - const unrelaxedRejectedCommands = ( - await Promise.all(unrelaxedCommandPromises.commandPromises) - ).filter((result) => result.status === "rejected"); + let error: any; + await blockSetImmediate(async () => { + try { + await clientWithCommandTimeout.set("key", "value"); + } catch (err: any) { + error = err; + } + }); - assert.ok(unrelaxedRejectedCommands.length > 0); + // Make sure it took less than 1sec to fail + assert.ok(performance.now() - start < 1000); + assert.ok(error instanceof Error); + assert.ok(error.constructor.name === "TimeoutError"); }); });