diff --git a/packages/client/lib/tests/test-scenario/fault-injector-client.ts b/packages/client/lib/tests/test-scenario/fault-injector-client.ts index 7db75e3081..d6635ac42e 100644 --- a/packages/client/lib/tests/test-scenario/fault-injector-client.ts +++ b/packages/client/lib/tests/test-scenario/fault-injector-client.ts @@ -108,6 +108,41 @@ export class FaultInjectorClient { throw new Error(`Timeout waiting for action ${actionId}`); } + async migrateAndBindAction({ + bdbId, + clusterIndex, + }: { + bdbId: string | number; + clusterIndex: string | number; + }) { + const bdbIdStr = bdbId.toString(); + const clusterIndexStr = clusterIndex.toString(); + + return this.triggerAction<{ + action_id: string; + }>({ + type: "sequence_of_actions", + parameters: { + bdbId: bdbIdStr, + actions: [ + { + type: "migrate", + params: { + cluster_index: clusterIndexStr, + }, + }, + { + type: "bind", + params: { + cluster_index: clusterIndexStr, + bdb_id: bdbIdStr, + }, + }, + ], + }, + }); + } + async #request( method: string, path: string, diff --git a/packages/client/lib/tests/test-scenario/test-command-runner.ts b/packages/client/lib/tests/test-scenario/test-command-runner.ts new file mode 100644 index 0000000000..d6aeba0902 --- /dev/null +++ b/packages/client/lib/tests/test-scenario/test-command-runner.ts @@ -0,0 +1,104 @@ +import { randomUUID } from "node:crypto"; +import { setTimeout } from "node:timers/promises"; +import { createClient } from "../../.."; + +/** + * Options for the `fireCommandsUntilStopSignal` method + */ +type FireCommandsUntilStopSignalOptions = { + /** + * Number of commands to fire in each batch + */ + batchSize: number; + /** + * Timeout between batches in milliseconds + */ + timeoutMs: number; + /** + * Function that creates the commands to be executed + */ + createCommands: ( + client: ReturnType> + ) => Array<() => Promise>; +}; + +export class TestCommandRunner { + constructor( + private client: ReturnType> + ) {} + + private defaultOptions: FireCommandsUntilStopSignalOptions = { + batchSize: 60, + timeoutMs: 10, + createCommands: ( + client: ReturnType> + ) => [ + () => client.set(randomUUID(), Date.now()), + () => client.get(randomUUID()), + ], + }; + + #toSettled(p: Promise) { + return p + .then((value) => ({ status: "fulfilled" as const, value, error: null })) + .catch((reason) => ({ + status: "rejected" as const, + value: null, + error: reason, + })); + } + + async #racePromises({ + timeout, + stopper, + }: { + timeout: Promise; + stopper: Promise; + }) { + return Promise.race([ + this.#toSettled(timeout).then((result) => ({ + ...result, + stop: false, + })), + this.#toSettled(stopper).then((result) => ({ ...result, stop: true })), + ]); + } + + /** + * Fires commands until a stop signal is received. + * @param stopSignalPromise Promise that resolves when the command execution should stop + * @param options Options for the command execution + * @returns Promise that resolves when the stop signal is received + */ + async fireCommandsUntilStopSignal( + stopSignalPromise: Promise, + options?: Partial + ) { + const executeOptions = { + ...this.defaultOptions, + ...options, + }; + + const commandPromises = []; + + while (true) { + for (let i = 0; i < executeOptions.batchSize; i++) { + for (const command of executeOptions.createCommands(this.client)) { + commandPromises.push(this.#toSettled(command())); + } + } + + const result = await this.#racePromises({ + timeout: setTimeout(executeOptions.timeoutMs), + stopper: stopSignalPromise, + }); + + if (result.stop) { + return { + commandPromises, + stopResult: result, + }; + } + } + } +} diff --git a/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts b/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts new file mode 100644 index 0000000000..fbb63aa9fc --- /dev/null +++ b/packages/client/lib/tests/test-scenario/timeout-during-notifications.e2e.ts @@ -0,0 +1,155 @@ +import assert from "node:assert"; +import { setTimeout } from "node:timers/promises"; +import { FaultInjectorClient } from "./fault-injector-client"; +import { + getDatabaseConfig, + getDatabaseConfigFromEnv, + getEnvConfig, + RedisConnectionConfig, +} from "./test-scenario.util"; +import { createClient } from "../../.."; +import { before } from "mocha"; +import { TestCommandRunner } from "./test-command-runner"; + +describe("Timeout Handling During Notifications", () => { + let clientConfig: RedisConnectionConfig; + let client: ReturnType>; + let faultInjectorClient: FaultInjectorClient; + let commandRunner: TestCommandRunner; + + before(() => { + const envConfig = getEnvConfig(); + const redisConfig = getDatabaseConfigFromEnv( + envConfig.redisEndpointsConfigPath + ); + + faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); + clientConfig = getDatabaseConfig(redisConfig); + }); + + beforeEach(async () => { + client = createClient({ + socket: { + host: clientConfig.host, + port: clientConfig.port, + ...(clientConfig.tls === true ? { tls: true } : {}), + }, + password: clientConfig.password, + username: clientConfig.username, + RESP: 3, + maintPushNotifications: "auto", + maintMovingEndpointType: "auto", + }); + + client.on("error", (err: Error) => { + throw new Error(`Client error: ${err.message}`); + }); + + commandRunner = new TestCommandRunner(client); + + await client.connect(); + }); + + afterEach(() => { + client.destroy(); + }); + + it("should relax command timeout on MOVING, MIGRATING, and MIGRATED", async () => { + // PART 1 + // Set very low timeout to trigger errors + client.options!.maintRelaxedCommandTimeout = 50; + + const { action_id: lowTimeoutBindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + const lowTimeoutWaitPromise = faultInjectorClient.waitForAction( + lowTimeoutBindAndMigrateActionId + ); + + const lowTimeoutCommandPromises = + await commandRunner.fireCommandsUntilStopSignal(lowTimeoutWaitPromise); + + const lowTimeoutRejectedCommands = ( + await Promise.all(lowTimeoutCommandPromises.commandPromises) + ).filter((result) => result.status === "rejected"); + + assert.ok(lowTimeoutRejectedCommands.length > 0); + assert.strictEqual( + lowTimeoutRejectedCommands.filter((rejected) => { + return ( + // TODO instanceof doesn't work for some reason + rejected.error.constructor.name === + "CommandTimeoutDuringMaintananceError" + ); + }).length, + lowTimeoutRejectedCommands.length + ); + + // PART 2 + // Set high timeout to avoid errors + client.options!.maintRelaxedCommandTimeout = 10000; + + const { action_id: highTimeoutBindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + const highTimeoutWaitPromise = faultInjectorClient.waitForAction( + highTimeoutBindAndMigrateActionId + ); + + const highTimeoutCommandPromises = + await commandRunner.fireCommandsUntilStopSignal(highTimeoutWaitPromise); + + const highTimeoutRejectedCommands = ( + await Promise.all(highTimeoutCommandPromises.commandPromises) + ).filter((result) => result.status === "rejected"); + + assert.strictEqual(highTimeoutRejectedCommands.length, 0); + }); + + // TODO this is WIP + it.skip("should unrelax command timeout after MAINTENANCE", async () => { + client.options!.maintRelaxedCommandTimeout = 10000; + client.options!.commandOptions = { + ...client.options!.commandOptions, + timeout: 1, // Set very low timeout to trigger errors + }; + + const { action_id: bindAndMigrateActionId } = + await faultInjectorClient.migrateAndBindAction({ + bdbId: clientConfig.bdbId, + clusterIndex: 0, + }); + + const lowTimeoutWaitPromise = faultInjectorClient.waitForAction( + bindAndMigrateActionId + ); + + const relaxedTimeoutCommandPromises = + await commandRunner.fireCommandsUntilStopSignal(lowTimeoutWaitPromise); + + const relaxedTimeoutRejectedCommands = ( + await Promise.all(relaxedTimeoutCommandPromises.commandPromises) + ).filter((result) => result.status === "rejected"); + console.log( + "relaxedTimeoutRejectedCommands", + relaxedTimeoutRejectedCommands + ); + + assert.ok(relaxedTimeoutRejectedCommands.length === 0); + + const unrelaxedCommandPromises = + await commandRunner.fireCommandsUntilStopSignal(setTimeout(1 * 1000)); + + const unrelaxedRejectedCommands = ( + await Promise.all(unrelaxedCommandPromises.commandPromises) + ).filter((result) => result.status === "rejected"); + + assert.ok(unrelaxedRejectedCommands.length > 0); + }); +});