Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions packages/client/lib/tests/test-scenario/test-command-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ type FireCommandsUntilStopSignalOptions = {
) => Array<() => Promise<unknown>>;
};

/**
* Utility class for running test commands until a stop signal is received
*/
export class TestCommandRunner {
constructor(
private client: ReturnType<typeof createClient<any, any, any, any>>
) {}

private defaultOptions: FireCommandsUntilStopSignalOptions = {
private static readonly defaultOptions: FireCommandsUntilStopSignalOptions = {
batchSize: 60,
timeoutMs: 10,
createCommands: (
Expand All @@ -38,7 +37,7 @@ export class TestCommandRunner {
],
};

#toSettled<T>(p: Promise<T>) {
static #toSettled<T>(p: Promise<T>) {
return p
.then((value) => ({ status: "fulfilled" as const, value, error: null }))
.catch((reason) => ({
Expand All @@ -48,47 +47,52 @@ export class TestCommandRunner {
}));
}

async #racePromises<S, T>({
static async #racePromises<S, T>({
timeout,
stopper,
}: {
timeout: Promise<S>;
stopper: Promise<T>;
}) {
return Promise.race([
this.#toSettled<S>(timeout).then((result) => ({
TestCommandRunner.#toSettled<S>(timeout).then((result) => ({
...result,
stop: false,
})),
this.#toSettled<T>(stopper).then((result) => ({ ...result, stop: true })),
TestCommandRunner.#toSettled<T>(stopper).then((result) => ({
...result,
stop: true,
})),
]);
}

/**
* Fires commands until a stop signal is received.
* @param stopSignalPromise Promise that resolves when the command execution should stop
* @param options Options for the command execution
* @returns Promise that resolves when the stop signal is received
* Fires a batch of test commands until a stop signal is received
* @param client - The Redis client to use
* @param stopSignalPromise - Promise that resolves when the execution should stop
* @param options - Options for the command execution
* @returns An object containing the promises of all executed commands and the result of the stop signal
*/
async fireCommandsUntilStopSignal(
static async fireCommandsUntilStopSignal(
client: ReturnType<typeof createClient<any, any, any, any>>,
stopSignalPromise: Promise<unknown>,
options?: Partial<FireCommandsUntilStopSignalOptions>
) {
const executeOptions = {
...this.defaultOptions,
...TestCommandRunner.defaultOptions,
...options,
};

const commandPromises = [];

while (true) {
for (let i = 0; i < executeOptions.batchSize; i++) {
for (const command of executeOptions.createCommands(this.client)) {
commandPromises.push(this.#toSettled(command()));
for (const command of executeOptions.createCommands(client)) {
commandPromises.push(TestCommandRunner.#toSettled(command()));
}
}

const result = await this.#racePromises({
const result = await TestCommandRunner.#racePromises({
timeout: setTimeout(executeOptions.timeoutMs),
stopper: stopSignalPromise,
});
Expand Down
87 changes: 87 additions & 0 deletions packages/client/lib/tests/test-scenario/test-scenario.util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { readFileSync } from "fs";
import { createClient, RedisClientOptions } from "../../..";
import { stub } from "sinon";

type DatabaseEndpoint = {
addr: string[];
Expand Down Expand Up @@ -108,3 +110,88 @@ export function getDatabaseConfig(
};
}

// TODO this should be moved in the tests utils package
export async function blockSetImmediate(fn: () => Promise<unknown>) {
let setImmediateStub: any;

try {
setImmediateStub = stub(global, "setImmediate");
setImmediateStub.callsFake(() => {
//Dont call the callback, effectively blocking execution
});
await fn();
} finally {
if (setImmediateStub) {
setImmediateStub.restore();
}
}
}

/**
* Factory class for creating and managing Redis clients
*/
export class ClientFactory {
private readonly clients = new Map<
string,
ReturnType<typeof createClient<any, any, any, any>>
>();

constructor(private readonly config: RedisConnectionConfig) {}

/**
* Creates a new client with the specified options and connects it to the database
* @param key - The key to store the client under
* @param options - Optional client options
* @returns The created and connected client
*/
async create(key: string, options: Partial<RedisClientOptions> = {}) {
const client = createClient({
socket: {
host: this.config.host,
port: this.config.port,
...(this.config.tls === true ? { tls: true } : {}),
},
password: this.config.password,
username: this.config.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "auto",
...options,
});

client.on("error", (err: Error) => {
throw new Error(`Client error: ${err.message}`);
});

await client.connect();

this.clients.set(key, client);

return client;
}

/**
* Gets an existing client by key or the first one if no key is provided
* @param key - The key of the client to retrieve
* @returns The client if found, undefined otherwise
*/
get(key?: string) {
if (key) {
return this.clients.get(key);
}

// Get the first one if no key is provided
return this.clients.values().next().value;
}

/**
* Destroys all created clients
*/
destroyAll() {
this.clients.forEach((client) => {
if (client && client.isOpen) {
client.destroy();
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,63 +1,51 @@
import assert from "node:assert";
import { setTimeout } from "node:timers/promises";

import { FaultInjectorClient } from "./fault-injector-client";
import {
ClientFactory,
getDatabaseConfig,
getDatabaseConfigFromEnv,
getEnvConfig,
RedisConnectionConfig,
blockSetImmediate
} from "./test-scenario.util";
import { createClient } from "../../..";
import { before } from "mocha";
import { TestCommandRunner } from "./test-command-runner";

describe("Timeout Handling During Notifications", () => {
let clientConfig: RedisConnectionConfig;
let client: ReturnType<typeof createClient<any, any, any, 3>>;
let clientFactory: ClientFactory;
let faultInjectorClient: FaultInjectorClient;
let commandRunner: TestCommandRunner;
let defaultClient: ReturnType<typeof createClient<any, any, any, any>>;

before(() => {
const envConfig = getEnvConfig();
const redisConfig = getDatabaseConfigFromEnv(
envConfig.redisEndpointsConfigPath
);

faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
clientConfig = getDatabaseConfig(redisConfig);
faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
clientFactory = new ClientFactory(clientConfig);
});

beforeEach(async () => {
client = createClient({
socket: {
host: clientConfig.host,
port: clientConfig.port,
...(clientConfig.tls === true ? { tls: true } : {}),
},
password: clientConfig.password,
username: clientConfig.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "auto",
});

client.on("error", (err: Error) => {
throw new Error(`Client error: ${err.message}`);
});

commandRunner = new TestCommandRunner(client);
defaultClient = await clientFactory.create("default");

await client.connect();
await defaultClient.flushAll();
});

afterEach(() => {
client.destroy();
afterEach(async () => {
clientFactory.destroyAll();
});

it("should relax command timeout on MOVING, MIGRATING, and MIGRATED", async () => {
// PART 1
// Set very low timeout to trigger errors
client.options!.maintRelaxedCommandTimeout = 50;
const lowTimeoutClient = await clientFactory.create("lowTimeout", {
maintRelaxedCommandTimeout: 50,
});

const { action_id: lowTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
Expand All @@ -70,7 +58,10 @@ describe("Timeout Handling During Notifications", () => {
);

const lowTimeoutCommandPromises =
await commandRunner.fireCommandsUntilStopSignal(lowTimeoutWaitPromise);
await TestCommandRunner.fireCommandsUntilStopSignal(
lowTimeoutClient,
lowTimeoutWaitPromise
);

const lowTimeoutRejectedCommands = (
await Promise.all(lowTimeoutCommandPromises.commandPromises)
Expand All @@ -90,7 +81,9 @@ describe("Timeout Handling During Notifications", () => {

// PART 2
// Set high timeout to avoid errors
client.options!.maintRelaxedCommandTimeout = 10000;
const highTimeoutClient = await clientFactory.create("highTimeout", {
maintRelaxedCommandTimeout: 10000,
});

const { action_id: highTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
Expand All @@ -103,7 +96,10 @@ describe("Timeout Handling During Notifications", () => {
);

const highTimeoutCommandPromises =
await commandRunner.fireCommandsUntilStopSignal(highTimeoutWaitPromise);
await TestCommandRunner.fireCommandsUntilStopSignal(
highTimeoutClient,
highTimeoutWaitPromise
);

const highTimeoutRejectedCommands = (
await Promise.all(highTimeoutCommandPromises.commandPromises)
Expand All @@ -112,13 +108,15 @@ describe("Timeout Handling During Notifications", () => {
assert.strictEqual(highTimeoutRejectedCommands.length, 0);
});

// TODO this is WIP
it.skip("should unrelax command timeout after MAINTENANCE", async () => {
client.options!.maintRelaxedCommandTimeout = 10000;
client.options!.commandOptions = {
...client.options!.commandOptions,
timeout: 1, // Set very low timeout to trigger errors
};
it("should unrelax command timeout after MAINTENANCE", async () => {
const clientWithCommandTimeout = await clientFactory.create(
"clientWithCommandTimeout",
{
commandOptions: {
timeout: 100,
},
}
);

const { action_id: bindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
Expand All @@ -131,25 +129,31 @@ describe("Timeout Handling During Notifications", () => {
);

const relaxedTimeoutCommandPromises =
await commandRunner.fireCommandsUntilStopSignal(lowTimeoutWaitPromise);
await TestCommandRunner.fireCommandsUntilStopSignal(
clientWithCommandTimeout,
lowTimeoutWaitPromise
);

const relaxedTimeoutRejectedCommands = (
await Promise.all(relaxedTimeoutCommandPromises.commandPromises)
).filter((result) => result.status === "rejected");
console.log(
"relaxedTimeoutRejectedCommands",
relaxedTimeoutRejectedCommands
);

assert.ok(relaxedTimeoutRejectedCommands.length === 0);

const unrelaxedCommandPromises =
await commandRunner.fireCommandsUntilStopSignal(setTimeout(1 * 1000));
const start = performance.now();

const unrelaxedRejectedCommands = (
await Promise.all(unrelaxedCommandPromises.commandPromises)
).filter((result) => result.status === "rejected");
let error: any;
await blockSetImmediate(async () => {
try {
await clientWithCommandTimeout.set("key", "value");
} catch (err: any) {
error = err;
}
});

assert.ok(unrelaxedRejectedCommands.length > 0);
// Make sure it took less than 1sec to fail
assert.ok(performance.now() - start < 1000);
assert.ok(error instanceof Error);
assert.ok(error.constructor.name === "TimeoutError");
});
});