Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/test-utils/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,10 @@ export default class TestUtils {
//@ts-ignore
targetPort: socketOptions.port,
//@ts-ignore
targetHost: socketOptions.host,
targetHost: socketOptions.host ?? '127.0.0.1',
enableLogging: true
});


await proxy.start();
const proxyClient = client.duplicate({
socket: {
Expand Down
60 changes: 58 additions & 2 deletions packages/test-utils/lib/redis-proxy-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { strict as assert } from 'node:assert';
import { Buffer } from 'node:buffer';
import { testUtils, GLOBAL } from './test-utils';
import { RedisProxy } from './redis-proxy';
import { InterceptorFunction, RedisProxy } from './redis-proxy';
import type { RedisClientType } from '@redis/client/lib/client/index.js';

describe('RedisSocketProxy', function () {
Expand Down Expand Up @@ -107,5 +107,61 @@ describe('RedisSocketProxy', function () {
const pingResult = await proxiedClient.ping();
assert.equal(pingResult, 'PONG', 'Client should be able to communicate with Redis through the proxy');

}, GLOBAL.SERVERS.OPEN_RESP_3)
}, GLOBAL.SERVERS.OPEN_RESP_3);

describe("Middleware", () => {
testUtils.testWithProxiedClient(
"Modify request/response via middleware",
async (
proxiedClient: RedisClientType<any, any, any, any, any>,
proxy: RedisProxy,
) => {

// Intercept PING commands and modify the response
const pingInterceptor: InterceptorFunction = async (data, next) => {
if (data.includes('PING')) {
return Buffer.from("+PINGINTERCEPTED\r\n");
}
return next(data);
};

// Only intercept GET responses and double numeric values
// Does not modify other commands or non-numeric GET responses
const doubleNumberGetInterceptor: InterceptorFunction = async (data, next) => {
const response = await next(data);

// Not a GET command, return original response
if (!data.includes("GET")) return response;

const value = (response.toString().split("\r\n"))[1];
const number = Number(value);
// Not a number, return original response
if(isNaN(number)) return response;

const doubled = String(number * 2);
return Buffer.from(`$${doubled.length}\r\n${doubled}\r\n`);
};

proxy.setInterceptors([ pingInterceptor, doubleNumberGetInterceptor ])

const pingResponse = await proxiedClient.ping();
assert.equal(pingResponse, 'PINGINTERCEPTED', 'Response should be modified by middleware');

await proxiedClient.set('foo', 1);
const getResponse1 = await proxiedClient.get('foo');
assert.equal(getResponse1, '2', 'GET response should be doubled for numbers by middleware');

await proxiedClient.set('bar', 'Hi');
const getResponse2 = await proxiedClient.get('bar');
assert.equal(getResponse2, 'Hi', 'GET response should not be modified for strings by middleware');

await proxiedClient.hSet('baz', 'foo', 'dictvalue');
const hgetResponse = await proxiedClient.hGet('baz', 'foo');
assert.equal(hgetResponse, 'dictvalue', 'HGET response should not be modified by middleware');

},
GLOBAL.SERVERS.OPEN_RESP_3,
);
});

});
53 changes: 48 additions & 5 deletions packages/test-utils/lib/redis-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ interface ConnectionInfo {
interface ActiveConnection extends ConnectionInfo {
readonly clientSocket: net.Socket;
readonly serverSocket: net.Socket;
inflightRequestsCount: number
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about borrowing from node-redis's cache stats approach ? We could start by tracking inflightRequestsCount initially and add more metrics later. This design integrates well if you want to hook in traces, metrics, or logs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only to distinguish between request/response and push for now. it will go away in nest iteration

}

type SendResult =
Expand Down Expand Up @@ -49,11 +50,16 @@ interface ProxyEvents {
'close': () => void;
}

export type Interceptor = (data: Buffer) => Promise<Buffer>;
export type InterceptorFunction = (data: Buffer, next: Interceptor) => Promise<Buffer>;
type InterceptorInitializer = (init: Interceptor) => Interceptor;

export class RedisProxy extends EventEmitter {
private readonly server: net.Server;
public readonly config: Required<ProxyConfig>;
private readonly connections: Map<string, ActiveConnection>;
private isRunning: boolean;
private interceptorInitializer?: InterceptorInitializer;

constructor(config: ProxyConfig) {
super();
Expand Down Expand Up @@ -113,6 +119,13 @@ export class RedisProxy extends EventEmitter {
});
}

public setInterceptors(interceptors: Array<InterceptorFunction>) {
this.interceptorInitializer = (init) => interceptors.reduceRight<Interceptor>(
(next, mw) => (data) => mw(data, next),
init
);
}

public getStats(): ProxyStats {
const connections = Array.from(this.connections.values());

Expand Down Expand Up @@ -218,19 +231,22 @@ export class RedisProxy extends EventEmitter {
}

private handleClientConnection(clientSocket: net.Socket): void {
const connectionId = this.generateConnectionId();
clientSocket.pause();
const serverSocket = net.createConnection({
host: this.config.targetHost,
port: this.config.targetPort
});
serverSocket.once('connect', clientSocket.resume.bind(clientSocket));

const connectionId = this.generateConnectionId();
const connectionInfo: ActiveConnection = {
id: connectionId,
clientAddress: clientSocket.remoteAddress || 'unknown',
clientPort: clientSocket.remotePort || 0,
connectedAt: new Date(),
clientSocket,
serverSocket
serverSocket,
inflightRequestsCount: 0
};

this.connections.set(connectionId, connectionInfo);
Expand All @@ -243,12 +259,38 @@ export class RedisProxy extends EventEmitter {
this.emit('connection', connectionInfo);
});

clientSocket.on('data', (data) => {
clientSocket.on('data', async (data) => {
this.emit('data', connectionId, 'client->server', data);
serverSocket.write(data);

if(!this.interceptorInitializer) {
Copy link
Member

@bobymicroby bobymicroby Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can eliminate the branch prediction (if(!this.interceptorInitializer)) entirely by implementing a default "socket write/noop" interceptor that simply performs the socket write when no initializer is provided.

Copy link
Collaborator Author

@nkaradzhov nkaradzhov Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can totally do this, in fact it was like that before, but i thought this is better. i will move it back

serverSocket.write(data);
return;
}

connectionInfo.inflightRequestsCount++;

// next1 -> next2 -> ... -> last -> server
// next1 <- next2 <- ... <- last <- server
const last = (data: Buffer): Promise<Buffer> => {
return new Promise((resolve, reject) => {
serverSocket.write(data);
serverSocket.once('data', (data) => {
connectionInfo.inflightRequestsCount--;
assert(connectionInfo.inflightRequestsCount >= 0, `inflightRequestsCount for connection ${connectionId} went below zero`);
this.emit('data', connectionId, 'server->client', data);
resolve(data);
});
serverSocket.once('error', reject);
});
};

const interceptorChain = this.interceptorInitializer(last);
const response = await interceptorChain(data);
clientSocket.write(response);
});

serverSocket.on('data', (data) => {
if (connectionInfo.inflightRequestsCount > 0) return;
this.emit('data', connectionId, 'server->client', data);
clientSocket.write(data);
});
Expand All @@ -273,6 +315,7 @@ export class RedisProxy extends EventEmitter {
});

serverSocket.on('error', (error) => {
if (connectionInfo.inflightRequestsCount > 0) return;
this.log(`Server error for connection ${connectionId}: ${error.message}`);
this.emit('error', error, connectionId);
clientSocket.destroy();
Expand Down Expand Up @@ -306,6 +349,7 @@ export class RedisProxy extends EventEmitter {
}
}
import { createServer } from 'net';
import assert from 'node:assert';

export function getFreePortNumber(): Promise<number> {
return new Promise((resolve, reject) => {
Expand All @@ -326,4 +370,3 @@ export function getFreePortNumber(): Promise<number> {

export { RedisProxy as RedisTransparentProxy };
export type { ProxyConfig, ConnectionInfo, ProxyEvents, SendResult, DataDirection, ProxyStats };

Loading