Skip to content

Commit 92ec6fa

Browse files
committed
feat(proxy): implement express style middleware
1 parent d7c6544 commit 92ec6fa

File tree

3 files changed

+107
-9
lines changed

3 files changed

+107
-9
lines changed

packages/test-utils/lib/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,11 +313,10 @@ export default class TestUtils {
313313
//@ts-ignore
314314
targetPort: socketOptions.port,
315315
//@ts-ignore
316-
targetHost: socketOptions.host,
316+
targetHost: socketOptions.host ?? '127.0.0.1',
317317
enableLogging: true
318318
});
319319

320-
321320
await proxy.start();
322321
const proxyClient = client.duplicate({
323322
socket: {

packages/test-utils/lib/redis-proxy-spec.ts

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { strict as assert } from 'node:assert';
22
import { Buffer } from 'node:buffer';
33
import { testUtils, GLOBAL } from './test-utils';
4-
import { RedisProxy } from './redis-proxy';
4+
import { InterceptorFunction, RedisProxy } from './redis-proxy';
55
import type { RedisClientType } from '@redis/client/lib/client/index.js';
66

77
describe('RedisSocketProxy', function () {
@@ -107,5 +107,61 @@ describe('RedisSocketProxy', function () {
107107
const pingResult = await proxiedClient.ping();
108108
assert.equal(pingResult, 'PONG', 'Client should be able to communicate with Redis through the proxy');
109109

110-
}, GLOBAL.SERVERS.OPEN_RESP_3)
110+
}, GLOBAL.SERVERS.OPEN_RESP_3);
111+
112+
describe("Middleware", () => {
113+
testUtils.testWithProxiedClient(
114+
"Modify request/response via middleware",
115+
async (
116+
proxiedClient: RedisClientType<any, any, any, any, any>,
117+
proxy: RedisProxy,
118+
) => {
119+
120+
// Intercept PING commands and modify the response
121+
const pingInterceptor: InterceptorFunction = async (data, next) => {
122+
if (data.includes('PING')) {
123+
return Buffer.from("+PINGINTERCEPTED\r\n");
124+
}
125+
return next(data);
126+
};
127+
128+
// Only intercept GET responses and double numeric values
129+
// Does not modify other commands or non-numeric GET responses
130+
const doubleNumberGetInterceptor: InterceptorFunction = async (data, next) => {
131+
const response = await next(data);
132+
133+
// Not a GET command, return original response
134+
if (!data.includes("GET")) return response;
135+
136+
const value = (response.toString().split("\r\n"))[1];
137+
const number = Number(value);
138+
// Not a number, return original response
139+
if(isNaN(number)) return response;
140+
141+
const doubled = String(number * 2);
142+
return Buffer.from(`$${doubled.length}\r\n${doubled}\r\n`);
143+
};
144+
145+
proxy.setInterceptors([ pingInterceptor, doubleNumberGetInterceptor ])
146+
147+
const pingResponse = await proxiedClient.ping();
148+
assert.equal(pingResponse, 'PINGINTERCEPTED', 'Response should be modified by middleware');
149+
150+
await proxiedClient.set('foo', 1);
151+
const getResponse1 = await proxiedClient.get('foo');
152+
assert.equal(getResponse1, '2', 'GET response should be doubled for numbers by middleware');
153+
154+
await proxiedClient.set('bar', 'Hi');
155+
const getResponse2 = await proxiedClient.get('bar');
156+
assert.equal(getResponse2, 'Hi', 'GET response should not be modified for strings by middleware');
157+
158+
await proxiedClient.hSet('baz', 'foo', 'dictvalue');
159+
const hgetResponse = await proxiedClient.hGet('baz', 'foo');
160+
assert.equal(hgetResponse, 'dictvalue', 'HGET response should not be modified by middleware');
161+
162+
},
163+
GLOBAL.SERVERS.OPEN_RESP_3,
164+
);
165+
});
166+
111167
});

packages/test-utils/lib/redis-proxy.ts

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ interface ConnectionInfo {
2020
interface ActiveConnection extends ConnectionInfo {
2121
readonly clientSocket: net.Socket;
2222
readonly serverSocket: net.Socket;
23+
inflightRequestsCount: number
2324
}
2425

2526
type SendResult =
@@ -49,11 +50,16 @@ interface ProxyEvents {
4950
'close': () => void;
5051
}
5152

53+
export type Interceptor = (data: Buffer) => Promise<Buffer>;
54+
export type InterceptorFunction = (data: Buffer, next: Interceptor) => Promise<Buffer>;
55+
type InterceptorInitializer = (init: Interceptor) => Interceptor;
56+
5257
export class RedisProxy extends EventEmitter {
5358
private readonly server: net.Server;
5459
public readonly config: Required<ProxyConfig>;
5560
private readonly connections: Map<string, ActiveConnection>;
5661
private isRunning: boolean;
62+
private interceptorInitializer?: InterceptorInitializer;
5763

5864
constructor(config: ProxyConfig) {
5965
super();
@@ -113,6 +119,13 @@ export class RedisProxy extends EventEmitter {
113119
});
114120
}
115121

122+
public setInterceptors(interceptors: Array<InterceptorFunction>) {
123+
this.interceptorInitializer = (init) => interceptors.reduceRight<Interceptor>(
124+
(next, mw) => (data) => mw(data, next),
125+
init
126+
);
127+
}
128+
116129
public getStats(): ProxyStats {
117130
const connections = Array.from(this.connections.values());
118131

@@ -218,19 +231,22 @@ export class RedisProxy extends EventEmitter {
218231
}
219232

220233
private handleClientConnection(clientSocket: net.Socket): void {
221-
const connectionId = this.generateConnectionId();
234+
clientSocket.pause();
222235
const serverSocket = net.createConnection({
223236
host: this.config.targetHost,
224237
port: this.config.targetPort
225238
});
239+
serverSocket.once('connect', clientSocket.resume.bind(clientSocket));
226240

241+
const connectionId = this.generateConnectionId();
227242
const connectionInfo: ActiveConnection = {
228243
id: connectionId,
229244
clientAddress: clientSocket.remoteAddress || 'unknown',
230245
clientPort: clientSocket.remotePort || 0,
231246
connectedAt: new Date(),
232247
clientSocket,
233-
serverSocket
248+
serverSocket,
249+
inflightRequestsCount: 0
234250
};
235251

236252
this.connections.set(connectionId, connectionInfo);
@@ -243,12 +259,38 @@ export class RedisProxy extends EventEmitter {
243259
this.emit('connection', connectionInfo);
244260
});
245261

246-
clientSocket.on('data', (data) => {
262+
clientSocket.on('data', async (data) => {
247263
this.emit('data', connectionId, 'client->server', data);
248-
serverSocket.write(data);
264+
265+
if(!this.interceptorInitializer) {
266+
serverSocket.write(data);
267+
return;
268+
}
269+
270+
connectionInfo.inflightRequestsCount++;
271+
272+
// next1 -> next2 -> ... -> last -> server
273+
// next1 <- next2 <- ... <- last <- server
274+
const last = (data: Buffer): Promise<Buffer> => {
275+
return new Promise((resolve, reject) => {
276+
serverSocket.write(data);
277+
serverSocket.once('data', (data) => {
278+
connectionInfo.inflightRequestsCount--;
279+
assert(connectionInfo.inflightRequestsCount >= 0, `inflightRequestsCount for connection ${connectionId} went below zero`);
280+
this.emit('data', connectionId, 'server->client', data);
281+
resolve(data);
282+
});
283+
serverSocket.once('error', reject);
284+
});
285+
};
286+
287+
const interceptorChain = this.interceptorInitializer(last);
288+
const response = await interceptorChain(data);
289+
clientSocket.write(response);
249290
});
250291

251292
serverSocket.on('data', (data) => {
293+
if (connectionInfo.inflightRequestsCount > 0) return;
252294
this.emit('data', connectionId, 'server->client', data);
253295
clientSocket.write(data);
254296
});
@@ -273,6 +315,7 @@ export class RedisProxy extends EventEmitter {
273315
});
274316

275317
serverSocket.on('error', (error) => {
318+
if (connectionInfo.inflightRequestsCount > 0) return;
276319
this.log(`Server error for connection ${connectionId}: ${error.message}`);
277320
this.emit('error', error, connectionId);
278321
clientSocket.destroy();
@@ -306,6 +349,7 @@ export class RedisProxy extends EventEmitter {
306349
}
307350
}
308351
import { createServer } from 'net';
352+
import assert from 'node:assert';
309353

310354
export function getFreePortNumber(): Promise<number> {
311355
return new Promise((resolve, reject) => {
@@ -326,4 +370,3 @@ export function getFreePortNumber(): Promise<number> {
326370

327371
export { RedisProxy as RedisTransparentProxy };
328372
export type { ProxyConfig, ConnectionInfo, ProxyEvents, SendResult, DataDirection, ProxyStats };
329-

0 commit comments

Comments
 (0)