Skip to content

Commit bc69bfd

Browse files
authored
feat: basic infrastructure for connection pooling (#489)
Closes #417
1 parent b8c4561 commit bc69bfd

11 files changed

+343
-10
lines changed

connection.ts

+16-2
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ export interface SendCommandOptions {
3434
}
3535

3636
export interface Connection extends TypedEventTarget<ConnectionEventMap> {
37+
/** @deprecated */
3738
name: string | null;
3839
isClosed: boolean;
3940
isConnected: boolean;
4041
close(): void;
42+
[Symbol.dispose](): void;
4143
connect(): Promise<void>;
4244
reconnect(): Promise<void>;
4345
sendCommand(
@@ -95,7 +97,15 @@ interface PendingCommand {
9597
reject: (error: unknown) => void;
9698
}
9799

98-
export class RedisConnection
100+
export function createRedisConnection(
101+
hostname: string,
102+
port: number | string | undefined,
103+
options: RedisConnectionOptions,
104+
): Connection {
105+
return new RedisConnection(hostname, port ?? 6379, options);
106+
}
107+
108+
class RedisConnection
99109
implements Connection, TypedEventTarget<ConnectionEventMap> {
100110
name: string | null = null;
101111
private maxRetryCount = 10;
@@ -306,7 +316,11 @@ export class RedisConnection
306316
}
307317

308318
close() {
309-
this.#close(false);
319+
return this[Symbol.dispose]();
320+
}
321+
322+
[Symbol.dispose](): void {
323+
return this.#close(false);
310324
}
311325

312326
#close(canReconnect = false) {

deno.json

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"version": "0.37.1",
44
"exports": {
55
".": "./mod.ts",
6+
"./experimental/pool": "./experimental/pool/mod.ts",
67
"./experimental/cluster": "./experimental/cluster/mod.ts",
78
"./experimental/web-streams-connection": "./experimental/web_streams_connection/mod.ts"
89
},

executor.ts

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ import type { Connection, SendCommandOptions } from "./connection.ts";
22
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
33

44
export interface CommandExecutor {
5+
/**
6+
* @deprecated
7+
*/
58
readonly connection: Connection;
69
/**
710
* @deprecated

experimental/pool/mod.ts

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "../../pool/mod.ts";

pool/default_pool.ts

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import type { Pool } from "./pool.ts";
2+
3+
class AlreadyRemovedFromPoolError extends Error {
4+
constructor() {
5+
super("This connection has already been removed from the pool.");
6+
}
7+
}
8+
9+
const kDefaultTimeout = 5_000;
10+
class DefaultPool<T extends Disposable> implements Pool<T> {
11+
readonly #idle: Array<T> = [];
12+
readonly #connections: Array<T> = [];
13+
#connectionCount: number = 0;
14+
readonly #deferredQueue: Array<PromiseWithResolvers<T>> = [];
15+
readonly #options: Required<PoolOptions<T>>;
16+
17+
constructor(
18+
{
19+
maxConnections = 8,
20+
acquire,
21+
}: PoolOptions<T>,
22+
) {
23+
this.#options = {
24+
acquire,
25+
maxConnections,
26+
};
27+
}
28+
29+
async acquire(signal?: AbortSignal): Promise<T> {
30+
signal ||= AbortSignal.timeout(kDefaultTimeout);
31+
signal.throwIfAborted();
32+
if (this.#idle.length > 0) {
33+
const conn = this.#idle.shift()!;
34+
return Promise.resolve(conn);
35+
}
36+
37+
if (this.#connectionCount < this.#options.maxConnections) {
38+
this.#connectionCount++;
39+
try {
40+
const connection = await this.#options.acquire();
41+
this.#connections.push(connection);
42+
return connection;
43+
} catch (error) {
44+
this.#connectionCount--;
45+
throw error;
46+
}
47+
}
48+
49+
const deferred = Promise.withResolvers<T>();
50+
this.#deferredQueue.push(deferred);
51+
const { promise, reject } = deferred;
52+
const onAbort = () => {
53+
const i = this.#deferredQueue.indexOf(deferred);
54+
if (i === -1) return;
55+
this.#deferredQueue.splice(i, 1);
56+
reject(signal.reason);
57+
};
58+
signal.addEventListener("abort", onAbort, { once: true });
59+
return promise;
60+
}
61+
62+
#has(conn: T): boolean {
63+
return this.#connections.includes(conn);
64+
}
65+
66+
release(conn: T): void {
67+
if (!this.#has(conn)) {
68+
throw new AlreadyRemovedFromPoolError();
69+
} else if (this.#deferredQueue.length > 0) {
70+
const i = this.#deferredQueue.shift()!;
71+
i.resolve(conn);
72+
} else {
73+
this.#idle.push(conn);
74+
}
75+
}
76+
77+
close() {
78+
const errors: Array<unknown> = [];
79+
for (const x of [...this.#connections]) {
80+
try {
81+
x[Symbol.dispose]();
82+
} catch (error) {
83+
errors.push(error);
84+
}
85+
}
86+
this.#connections.length = 0;
87+
this.#idle.length = 0;
88+
if (errors.length > 0) {
89+
throw new AggregateError(errors);
90+
}
91+
}
92+
}
93+
94+
export interface PoolOptions<T extends Disposable> {
95+
maxConnections?: number;
96+
acquire(): Promise<T>;
97+
}
98+
99+
export function createDefaultPool<T extends Disposable>(
100+
options: PoolOptions<T>,
101+
): Pool<T> {
102+
return new DefaultPool<T>(options);
103+
}

pool/default_pool_test.ts

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { assert, assertEquals, assertRejects } from "../deps/std/assert.ts";
2+
import { createDefaultPool } from "./default_pool.ts";
3+
4+
class FakeConnection implements Disposable {
5+
#isClosed = false;
6+
isClosed() {
7+
return this.#isClosed;
8+
}
9+
[Symbol.dispose]() {
10+
if (this.#isClosed) {
11+
throw new Error("Already closed");
12+
}
13+
this.#isClosed = true;
14+
}
15+
}
16+
17+
Deno.test({
18+
name: "DefaultPool",
19+
permissions: "none",
20+
fn: async () => {
21+
const openConnections: Array<FakeConnection> = [];
22+
const pool = createDefaultPool({
23+
acquire: () => {
24+
const connection = new FakeConnection();
25+
openConnections.push(connection);
26+
return Promise.resolve(connection);
27+
},
28+
maxConnections: 2,
29+
});
30+
assertEquals(openConnections, []);
31+
32+
const signal = AbortSignal.timeout(200);
33+
34+
const conn1 = await pool.acquire(signal);
35+
assertEquals(openConnections, [conn1]);
36+
assert(openConnections.every((x) => !x.isClosed()));
37+
assert(!signal.aborted);
38+
39+
const conn2 = await pool.acquire(signal);
40+
assertEquals(openConnections, [conn1, conn2]);
41+
assert(!conn2.isClosed());
42+
assert(openConnections.every((x) => !x.isClosed()));
43+
assert(!signal.aborted);
44+
45+
{
46+
// Tests timeout handling
47+
await assertRejects(
48+
() => pool.acquire(signal),
49+
"Intentionally aborted",
50+
);
51+
assert(signal.aborted);
52+
assertEquals(openConnections, [conn1, conn2]);
53+
assert(openConnections.every((x) => !x.isClosed()));
54+
}
55+
56+
{
57+
// Tests `release()`
58+
pool.release(conn2);
59+
assertEquals(openConnections, [conn1, conn2]);
60+
61+
const conn = await pool.acquire(new AbortController().signal);
62+
assert(conn === conn2, "A new connection should not be created");
63+
assertEquals(openConnections, [conn1, conn2]);
64+
}
65+
66+
{
67+
// `Pool#acquire` should wait for an active connection to be released.
68+
const signal = AbortSignal.timeout(3_000);
69+
const promise = pool.acquire(signal);
70+
setTimeout(() => {
71+
pool.release(conn1);
72+
}, 50);
73+
const conn = await promise;
74+
assert(conn === conn1, "A new connection should not be created");
75+
assertEquals(openConnections, [conn1, conn2]);
76+
assert(!signal.aborted);
77+
}
78+
79+
{
80+
// `Pool#close` closes all connections
81+
pool.close();
82+
assert(openConnections.every((x) => x.isClosed()));
83+
}
84+
},
85+
});

pool/executor.ts

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import type { Connection, SendCommandOptions } from "../connection.ts";
2+
import type { Pool } from "./pool.ts";
3+
import type { CommandExecutor } from "../executor.ts";
4+
import { DefaultExecutor } from "../executor.ts";
5+
import type { RedisReply, RedisValue } from "../protocol/shared/types.ts";
6+
7+
export function createPooledExecutor(pool: Pool<Connection>): CommandExecutor {
8+
return new PooledExecutor(pool);
9+
}
10+
11+
class PooledExecutor implements CommandExecutor {
12+
readonly #pool: Pool<Connection>;
13+
constructor(pool: Pool<Connection>) {
14+
this.#pool = pool;
15+
}
16+
17+
get connection(): Connection {
18+
throw new Error("PooledExecutor.connection is not supported");
19+
}
20+
21+
async exec(
22+
command: string,
23+
...args: RedisValue[]
24+
): Promise<RedisReply> {
25+
const connection = await this.#pool.acquire();
26+
try {
27+
const executor = new DefaultExecutor(connection);
28+
return await executor.exec(command, ...args);
29+
} finally {
30+
this.#pool.release(connection);
31+
}
32+
}
33+
34+
async sendCommand(
35+
command: string,
36+
args?: RedisValue[],
37+
options?: SendCommandOptions,
38+
): Promise<RedisReply> {
39+
const connection = await this.#pool.acquire();
40+
try {
41+
const executor = new DefaultExecutor(connection);
42+
return await executor.sendCommand(command, args, options);
43+
} finally {
44+
this.#pool.release(connection);
45+
}
46+
}
47+
48+
close(): void {
49+
return this.#pool.close();
50+
}
51+
}

pool/mod.ts

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import type { Redis, RedisConnectOptions } from "../redis.ts";
2+
import { create } from "../redis.ts";
3+
import type { Connection } from "../connection.ts";
4+
import { createRedisConnection } from "../connection.ts";
5+
import { createDefaultPool } from "./default_pool.ts";
6+
import { createPooledExecutor } from "./executor.ts";
7+
8+
export interface CreatePoolClientOptions {
9+
connection: RedisConnectOptions;
10+
maxConnections?: number;
11+
}
12+
13+
export function createPoolClient(
14+
options: CreatePoolClientOptions,
15+
): Promise<Redis> {
16+
const pool = createDefaultPool<Connection>({
17+
acquire,
18+
maxConnections: options.maxConnections ?? 8,
19+
});
20+
const executor = createPooledExecutor(pool);
21+
const client = create(executor);
22+
return Promise.resolve(client);
23+
24+
async function acquire(): Promise<Connection> {
25+
const { hostname, port, ...connectionOptions } = options.connection;
26+
const connection = createRedisConnection(hostname, port, connectionOptions);
27+
await connection.connect();
28+
return connection;
29+
}
30+
}

pool/pool.ts

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export interface Pool<T extends Disposable> {
2+
acquire(signal?: AbortSignal): Promise<T>;
3+
release(item: T): void;
4+
close(): void;
5+
}

redis.ts

+5-8
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import type {
4343
ZScanOpts,
4444
ZUnionstoreOpts,
4545
} from "./command.ts";
46-
import { RedisConnection } from "./connection.ts";
46+
import { createRedisConnection } from "./connection.ts";
4747
import type { Connection, SendCommandOptions } from "./connection.ts";
4848
import type { RedisConnectionOptions } from "./connection.ts";
4949
import type { CommandExecutor } from "./executor.ts";
@@ -2452,7 +2452,8 @@ export interface RedisConnectOptions extends RedisConnectionOptions {
24522452
* ```
24532453
*/
24542454
export async function connect(options: RedisConnectOptions): Promise<Redis> {
2455-
const connection = createRedisConnection(options);
2455+
const { hostname, port, ...connectionOptions } = options;
2456+
const connection = createRedisConnection(hostname, port, connectionOptions);
24562457
await connection.connect();
24572458
const executor = new DefaultExecutor(connection);
24582459
return create(executor);
@@ -2471,7 +2472,8 @@ export async function connect(options: RedisConnectOptions): Promise<Redis> {
24712472
* ```
24722473
*/
24732474
export function createLazyClient(options: RedisConnectOptions): Redis {
2474-
const connection = createRedisConnection(options);
2475+
const { hostname, port, ...connectionOptions } = options;
2476+
const connection = createRedisConnection(hostname, port, connectionOptions);
24752477
const executor = createLazyExecutor(connection);
24762478
return create(executor);
24772479
}
@@ -2519,11 +2521,6 @@ export function parseURL(url: string): RedisConnectOptions {
25192521
};
25202522
}
25212523

2522-
function createRedisConnection(options: RedisConnectOptions): Connection {
2523-
const { hostname, port = 6379, ...opts } = options;
2524-
return new RedisConnection(hostname, port, opts);
2525-
}
2526-
25272524
function createLazyExecutor(connection: Connection): CommandExecutor {
25282525
let executor: CommandExecutor | null = null;
25292526
return {

0 commit comments

Comments
 (0)