From b29bf4bd0455058a7fef9ddabd13d56fa127c1e5 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Mon, 25 Aug 2025 17:58:19 +0300 Subject: [PATCH 1/2] test: add E2E test infrastructure for Redis maintenance scenarios --- .../client/enterprise-maintenance-manager.ts | 30 ++++ .../test-scenario/fault-injector-client.ts | 152 ++++++++++++++++++ .../test-scenario/push-notification.e2e.ts | 94 +++++++++++ .../tests/test-scenario/test-scenario.util.ts | 110 +++++++++++++ 4 files changed, 386 insertions(+) create mode 100644 packages/client/lib/tests/test-scenario/fault-injector-client.ts create mode 100644 packages/client/lib/tests/test-scenario/push-notification.e2e.ts create mode 100644 packages/client/lib/tests/test-scenario/test-scenario.util.ts diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index 969d560adb..a9482d5335 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -6,6 +6,7 @@ import { lookup } from "dns/promises"; import assert from "node:assert"; import { setTimeout } from "node:timers/promises"; import RedisSocket from "./socket"; +import diagnostics_channel from "node:diagnostics_channel"; export const MAINTENANCE_EVENTS = { PAUSE_WRITING: "pause-writing", @@ -21,11 +22,24 @@ const PN = { FAILED_OVER: "FAILED_OVER", }; +export type DiagnosticsEvent = { + type: string; + timestamp: number; + data?: Object; +}; + export const dbgMaintenance = (...args: any[]) => { if (!process.env.DEBUG_MAINTENANCE) return; return console.log("[MNT]", ...args); }; +export const emitDiagnostics = (event: DiagnosticsEvent) => { + if (!process.env.EMIT_DIAGNOSTICS) return; + + const channel = diagnostics_channel.channel("redis.maintenance"); + channel.publish(event); +}; + export interface MaintenanceUpdate { relaxedCommandTimeout?: number; relaxedSocketTimeout?: number; @@ -113,18 +127,34 @@ export default class EnterpriseMaintenanceManager { const afterSeconds = push[2]; const url: string | null = push[3] ? String(push[3]) : null; dbgMaintenance("Received MOVING:", afterSeconds, url); + emitDiagnostics({ + type: PN.MOVING, + timestamp: Date.now(), + data: { + afterSeconds, + url, + }, + }); this.#onMoving(afterSeconds, url); return true; } case PN.MIGRATING: case PN.FAILING_OVER: { dbgMaintenance("Received MIGRATING|FAILING_OVER"); + emitDiagnostics({ + type: PN.MIGRATING, + timestamp: Date.now(), + }); this.#onMigrating(); return true; } case PN.MIGRATED: case PN.FAILED_OVER: { dbgMaintenance("Received MIGRATED|FAILED_OVER"); + emitDiagnostics({ + type: PN.MIGRATED, + timestamp: Date.now(), + }); this.#onMigrated(); return true; } diff --git a/packages/client/lib/tests/test-scenario/fault-injector-client.ts b/packages/client/lib/tests/test-scenario/fault-injector-client.ts new file mode 100644 index 0000000000..7db75e3081 --- /dev/null +++ b/packages/client/lib/tests/test-scenario/fault-injector-client.ts @@ -0,0 +1,152 @@ +import { setTimeout } from "node:timers/promises"; + +export type ActionType = + | "dmc_restart" + | "failover" + | "reshard" + | "sequence_of_actions" + | "network_failure" + | "execute_rlutil_command" + | "execute_rladmin_command" + | "migrate" + | "bind"; + +export interface ActionRequest { + type: ActionType; + parameters?: { + bdb_id?: string; + [key: string]: unknown; + }; +} + +export interface ActionStatus { + status: string; + error: unknown; + output: string; +} + +export class FaultInjectorClient { + private baseUrl: string; + #fetch: typeof fetch; + + constructor(baseUrl: string, fetchImpl: typeof fetch = fetch) { + this.baseUrl = baseUrl.replace(/\/+$/, ""); // trim trailing slash + this.#fetch = fetchImpl; + } + + /** + * Lists all available actions. + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public listActions(): Promise { + return this.#request("GET", "/action"); + } + + /** + * Triggers a specific action. + * @param action The action request to trigger + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public triggerAction(action: ActionRequest): Promise { + return this.#request("POST", "/action", action); + } + + /** + * Gets the status of a specific action. + * @param actionId The ID of the action to check + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public getActionStatus(actionId: string): Promise { + return this.#request("GET", `/action/${actionId}`); + } + + /** + * Executes an rladmin command. + * @param command The rladmin command to execute + * @param bdbId Optional database ID to target + * @throws {Error} When the HTTP request fails or response cannot be parsed as JSON + */ + public executeRladminCommand( + command: string, + bdbId?: string + ): Promise { + const cmd = bdbId ? `rladmin -b ${bdbId} ${command}` : `rladmin ${command}`; + return this.#request("POST", "/rladmin", cmd); + } + + /** + * Waits for an action to complete. + * @param actionId The ID of the action to wait for + * @param options Optional timeout and max wait time + * @throws {Error} When the action does not complete within the max wait time + */ + public async waitForAction( + actionId: string, + { + timeoutMs, + maxWaitTimeMs, + }: { + timeoutMs?: number; + maxWaitTimeMs?: number; + } = {} + ): Promise { + const timeout = timeoutMs || 1000; + const maxWaitTime = maxWaitTimeMs || 60000; + + const startTime = Date.now(); + + while (Date.now() - startTime < maxWaitTime) { + const action = await this.getActionStatus(actionId); + + if (["finished", "failed", "success"].includes(action.status)) { + return action; + } + + await setTimeout(timeout); + } + + throw new Error(`Timeout waiting for action ${actionId}`); + } + + async #request( + method: string, + path: string, + body?: Object | string + ): Promise { + const url = `${this.baseUrl}${path}`; + const headers: Record = { + "Content-Type": "application/json", + }; + + let payload: string | undefined; + + if (body) { + if (typeof body === "string") { + headers["Content-Type"] = "text/plain"; + payload = body; + } else { + headers["Content-Type"] = "application/json"; + payload = JSON.stringify(body); + } + } + + const response = await this.#fetch(url, { method, headers, body: payload }); + + if (!response.ok) { + try { + const text = await response.text(); + throw new Error(`HTTP ${response.status} - ${text}`); + } catch { + throw new Error(`HTTP ${response.status}`); + } + } + + try { + return (await response.json()) as T; + } catch { + throw new Error( + `HTTP ${response.status} - Unable to parse response as JSON` + ); + } + } +} diff --git a/packages/client/lib/tests/test-scenario/push-notification.e2e.ts b/packages/client/lib/tests/test-scenario/push-notification.e2e.ts new file mode 100644 index 0000000000..3408931728 --- /dev/null +++ b/packages/client/lib/tests/test-scenario/push-notification.e2e.ts @@ -0,0 +1,94 @@ +import assert from "node:assert"; +import diagnostics_channel from "node:diagnostics_channel"; +import { FaultInjectorClient } from "./fault-injector-client"; +import { + getDatabaseConfig, + getDatabaseConfigFromEnv, + getEnvConfig, + RedisConnectionConfig, +} from "./test-scenario.util"; +import { createClient } from "../../.."; +import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager"; +import { before } from "mocha"; + +describe("Push Notifications", () => { + const diagnosticsLog: DiagnosticsEvent[] = []; + + const onMessageHandler = (message: unknown) => { + diagnosticsLog.push(message as DiagnosticsEvent); + }; + + let clientConfig: RedisConnectionConfig; + let client: ReturnType>; + let faultInjectorClient: FaultInjectorClient; + + before(() => { + const envConfig = getEnvConfig(); + const redisConfig = getDatabaseConfigFromEnv( + envConfig.redisEndpointsConfigPath + ); + + faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl); + clientConfig = getDatabaseConfig(redisConfig); + }); + + beforeEach(async () => { + diagnosticsLog.length = 0; + diagnostics_channel.subscribe("redis.maintenance", onMessageHandler); + + client = createClient({ + socket: { + host: clientConfig.host, + port: clientConfig.port, + ...(clientConfig.tls === true ? { tls: true } : {}), + }, + password: clientConfig.password, + username: clientConfig.username, + RESP: 3, + maintPushNotifications: "auto", + maintMovingEndpointType: "external-ip", + maintRelaxedCommandTimeout: 10000, + maintRelaxedSocketTimeout: 10000, + }); + + client.on("error", (err: Error) => { + throw new Error(`Client error: ${err.message}`); + }); + + await client.connect(); + }); + + afterEach(() => { + diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler); + client.destroy(); + }); + + it("should receive MOVING, MIGRATING, and MIGRATED push notifications", async () => { + const { action_id: migrateActionId } = + await faultInjectorClient.triggerAction<{ action_id: string }>({ + type: "migrate", + parameters: { + cluster_index: "0", + }, + }); + + await faultInjectorClient.waitForAction(migrateActionId); + + const { action_id: bindActionId } = + await faultInjectorClient.triggerAction<{ action_id: string }>({ + type: "bind", + parameters: { + cluster_index: "0", + bdb_id: `${clientConfig.bdbId}`, + }, + }); + + await faultInjectorClient.waitForAction(bindActionId); + + const pushNotificationLogs = diagnosticsLog.filter((log) => { + return ["MOVING", "MIGRATING", "MIGRATED"].includes(log?.type); + }); + + assert.strictEqual(pushNotificationLogs.length, 3); + }); +}); diff --git a/packages/client/lib/tests/test-scenario/test-scenario.util.ts b/packages/client/lib/tests/test-scenario/test-scenario.util.ts new file mode 100644 index 0000000000..82333be8f0 --- /dev/null +++ b/packages/client/lib/tests/test-scenario/test-scenario.util.ts @@ -0,0 +1,110 @@ +import { readFileSync } from "fs"; + +type DatabaseEndpoint = { + addr: string[]; + addr_type: string; + dns_name: string; + oss_cluster_api_preferred_endpoint_type: string; + oss_cluster_api_preferred_ip_type: string; + port: number; + proxy_policy: string; + uid: string; +}; + +type DatabaseConfig = { + bdb_id: number; + username: string; + password: string; + tls: boolean; + raw_endpoints: DatabaseEndpoint[]; + endpoints: string[]; +}; + +type DatabasesConfig = { + [databaseName: string]: DatabaseConfig; +}; + +type EnvConfig = { + redisEndpointsConfigPath: string; + faultInjectorUrl: string; +}; + +/** + * Reads environment variables required for the test scenario + * @returns Environment configuration object + * @throws Error if required environment variables are not set + */ +export function getEnvConfig(): EnvConfig { + if (!process.env.REDIS_ENDPOINTS_CONFIG_PATH) { + throw new Error( + "REDIS_ENDPOINTS_CONFIG_PATH environment variable must be set" + ); + } + + if (!process.env.FAULT_INJECTION_API_URL) { + throw new Error("FAULT_INJECTION_API_URL environment variable must be set"); + } + + return { + redisEndpointsConfigPath: process.env.REDIS_ENDPOINTS_CONFIG_PATH, + faultInjectorUrl: process.env.FAULT_INJECTION_API_URL, + }; +} + +/** + * Reads database configuration from a file + * @param filePath - The path to the database configuration file + * @returns Parsed database configuration object + * @throws Error if file doesn't exist or JSON is invalid + */ +export function getDatabaseConfigFromEnv(filePath: string): DatabasesConfig { + try { + const fileContent = readFileSync(filePath, "utf8"); + return JSON.parse(fileContent) as DatabasesConfig; + } catch (error) { + throw new Error(`Failed to read or parse database config from ${filePath}`); + } +} + +export interface RedisConnectionConfig { + host: string; + port: number; + username: string; + password: string; + tls: boolean; + bdbId: number; +} + +/** + * Gets Redis connection parameters for a specific database + * @param databasesConfig - The parsed database configuration object + * @param databaseName - Optional name of the database to retrieve (defaults to the first one) + * @returns Redis connection configuration with host, port, username, password, and tls + * @throws Error if the specified database is not found in the configuration + */ +export function getDatabaseConfig( + databasesConfig: DatabasesConfig, + databaseName?: string +): RedisConnectionConfig { + const dbConfig = databaseName + ? databasesConfig[databaseName] + : Object.values(databasesConfig)[0]; + + if (!dbConfig) { + throw new Error( + `Database ${databaseName ? databaseName : ""} not found in configuration` + ); + } + + const endpoint = dbConfig.raw_endpoints[0]; // Use the first endpoint + + return { + host: endpoint.dns_name, + port: endpoint.port, + username: dbConfig.username, + password: dbConfig.password, + tls: dbConfig.tls, + bdbId: dbConfig.bdb_id, + }; +} + From 053aa9c760787421dfdebf2b425d8c16b9e9a90b Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Tue, 26 Aug 2025 13:23:13 +0300 Subject: [PATCH 2/2] refactor: improve enterprise manager push notification handling --- .../client/enterprise-maintenance-manager.ts | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/packages/client/lib/client/enterprise-maintenance-manager.ts b/packages/client/lib/client/enterprise-maintenance-manager.ts index a9482d5335..98a95ccb1c 100644 --- a/packages/client/lib/client/enterprise-maintenance-manager.ts +++ b/packages/client/lib/client/enterprise-maintenance-manager.ts @@ -120,41 +120,39 @@ export default class EnterpriseMaintenanceManager { #onPush = (push: Array): boolean => { dbgMaintenance("ONPUSH:", push.map(String)); - switch (push[0].toString()) { + + if (!Array.isArray(push) || !["MOVING", "MIGRATING", "MIGRATED", "FAILING_OVER", "FAILED_OVER"].includes(String(push[0]))) { + return false; + } + + const type = String(push[0]); + + emitDiagnostics({ + type, + timestamp: Date.now(), + data: { + push: push.map(String), + }, + }); + switch (type) { case PN.MOVING: { // [ 'MOVING', '17', '15', '54.78.247.156:12075' ] // ^seq ^after ^new ip const afterSeconds = push[2]; const url: string | null = push[3] ? String(push[3]) : null; dbgMaintenance("Received MOVING:", afterSeconds, url); - emitDiagnostics({ - type: PN.MOVING, - timestamp: Date.now(), - data: { - afterSeconds, - url, - }, - }); this.#onMoving(afterSeconds, url); return true; } case PN.MIGRATING: case PN.FAILING_OVER: { dbgMaintenance("Received MIGRATING|FAILING_OVER"); - emitDiagnostics({ - type: PN.MIGRATING, - timestamp: Date.now(), - }); this.#onMigrating(); return true; } case PN.MIGRATED: case PN.FAILED_OVER: { dbgMaintenance("Received MIGRATED|FAILED_OVER"); - emitDiagnostics({ - type: PN.MIGRATED, - timestamp: Date.now(), - }); this.#onMigrated(); return true; }