Skip to content

Commit 59767bc

Browse files
committed
feat: connection pooling
1 parent b8c4561 commit 59767bc

File tree

5 files changed

+190
-0
lines changed

5 files changed

+190
-0
lines changed

Diff for: connection.ts

+6
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ export interface SendCommandOptions {
3434
}
3535

3636
export interface Connection extends TypedEventTarget<ConnectionEventMap> {
37+
/** @deprecated */
3738
name: string | null;
3839
isClosed: boolean;
3940
isConnected: boolean;
4041
close(): void;
42+
[Symbol.dispose](): void;
4143
connect(): Promise<void>;
4244
reconnect(): Promise<void>;
4345
sendCommand(
@@ -309,6 +311,10 @@ export class RedisConnection
309311
this.#close(false);
310312
}
311313

314+
[Symbol.dispose](): void {
315+
return this.close();
316+
}
317+
312318
#close(canReconnect = false) {
313319
const isClosedAlready = this._isClosed;
314320

Diff for: executor.ts

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ import type { Connection, SendCommandOptions } from "./connection.ts";
22
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
33

44
export interface CommandExecutor {
5+
/**
6+
* @deprecated
7+
*/
58
readonly connection: Connection;
69
/**
710
* @deprecated

Diff for: pool/default_pool.ts

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import type { Pool } from "./pool.ts";
2+
3+
const kDefaultTimeout = 5_000;
4+
class DefaultPool<T extends Disposable> implements Pool<T> {
5+
readonly #idle: Array<T> = [];
6+
readonly #connections: Array<T> = [];
7+
#connectionCount: number = 0;
8+
readonly #deferredQueue: Array<PromiseWithResolvers<T>> = [];
9+
readonly #options: Required<PoolOptions<T>>;
10+
11+
constructor(
12+
{
13+
maxConnections = 8,
14+
acquire,
15+
}: PoolOptions<T>,
16+
) {
17+
this.#options = {
18+
acquire,
19+
maxConnections,
20+
};
21+
}
22+
23+
async acquire(signal?: AbortSignal): Promise<T> {
24+
signal ||= AbortSignal.timeout(kDefaultTimeout);
25+
signal.throwIfAborted();
26+
if (this.#idle.length > 0) {
27+
const conn = this.#idle.shift()!;
28+
return Promise.resolve(conn);
29+
}
30+
31+
if (this.#connectionCount < this.#options.maxConnections) {
32+
this.#connectionCount++;
33+
try {
34+
const connection = await this.#options.acquire();
35+
this.#connections.push(connection);
36+
return connection;
37+
} catch (error) {
38+
this.#connectionCount--;
39+
throw error;
40+
}
41+
}
42+
43+
const deferred = Promise.withResolvers<T>();
44+
this.#deferredQueue.push(deferred);
45+
const { promise, reject } = deferred;
46+
const onAbort = () => {
47+
const i = this.#deferredQueue.indexOf(deferred);
48+
if (i === -1) return;
49+
this.#deferredQueue.splice(i, 1);
50+
reject(signal.reason);
51+
};
52+
signal.addEventListener("abort", onAbort, { once: true });
53+
return promise;
54+
}
55+
56+
release(conn: T): void {
57+
if (!this.#connections.includes(conn)) {
58+
throw new Error(
59+
"This connection has already been removed from the pool.",
60+
);
61+
} else if (this.#deferredQueue.length > 0) {
62+
const i = this.#deferredQueue.shift()!;
63+
i.resolve(conn);
64+
} else {
65+
this.#idle.push(conn);
66+
}
67+
}
68+
69+
close() {
70+
const errors: Array<unknown> = [];
71+
for (const x of this.#connections) {
72+
try {
73+
x[Symbol.dispose]();
74+
} catch (error) {
75+
errors.push(error);
76+
}
77+
}
78+
this.#connections.length = 0;
79+
this.#idle.length = 0;
80+
if (errors.length > 0) {
81+
throw new AggregateError(errors);
82+
}
83+
}
84+
}
85+
86+
export interface PoolOptions<T extends Disposable> {
87+
maxConnections?: number;
88+
acquire(): Promise<T>;
89+
}
90+
91+
export function createDefaultPool<T extends Disposable>(
92+
options: PoolOptions<T>,
93+
): Pool<T> {
94+
return new DefaultPool<T>(options);
95+
}

Diff for: pool/default_pool_test.ts

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { assert, assertEquals, assertRejects } from "../deps/std/assert.ts";
2+
import { createDefaultPool } from "./default_pool.ts";
3+
4+
class FakeConnection implements Disposable {
5+
#isClosed = false;
6+
isClosed() {
7+
return this.#isClosed;
8+
}
9+
[Symbol.dispose]() {
10+
if (this.#isClosed) {
11+
throw new Error("Already closed");
12+
}
13+
this.#isClosed = true;
14+
}
15+
}
16+
17+
Deno.test("DefaultPool", async () => {
18+
const openConnections: Array<FakeConnection> = [];
19+
const pool = createDefaultPool({
20+
acquire: () => {
21+
const connection = new FakeConnection();
22+
openConnections.push(connection);
23+
return Promise.resolve(connection);
24+
},
25+
maxConnections: 2,
26+
});
27+
assertEquals(openConnections, []);
28+
29+
const signal = AbortSignal.timeout(200);
30+
31+
const conn1 = await pool.acquire(signal);
32+
assertEquals(openConnections, [conn1]);
33+
assert(openConnections.every((x) => !x.isClosed()));
34+
assert(!signal.aborted);
35+
36+
const conn2 = await pool.acquire(signal);
37+
assertEquals(openConnections, [conn1, conn2]);
38+
assert(!conn2.isClosed());
39+
assert(openConnections.every((x) => !x.isClosed()));
40+
assert(!signal.aborted);
41+
42+
{
43+
// Tests timeout handling
44+
await assertRejects(
45+
() => pool.acquire(signal),
46+
"Intentionally aborted",
47+
);
48+
assert(signal.aborted);
49+
assertEquals(openConnections, [conn1, conn2]);
50+
assert(openConnections.every((x) => !x.isClosed()));
51+
}
52+
53+
{
54+
// Tests `release()`
55+
pool.release(conn2);
56+
assertEquals(openConnections, [conn1, conn2]);
57+
58+
const conn = await pool.acquire(new AbortController().signal);
59+
assert(conn === conn2, "A new connection should not be created");
60+
assertEquals(openConnections, [conn1, conn2]);
61+
}
62+
63+
{
64+
// `Pool#acquire` should wait for an active connection to be released.
65+
const signal = AbortSignal.timeout(3_000);
66+
const promise = pool.acquire(signal);
67+
setTimeout(() => {
68+
pool.release(conn1);
69+
}, 50);
70+
const conn = await promise;
71+
assert(conn === conn1, "A new connection should not be created");
72+
assertEquals(openConnections, [conn1, conn2]);
73+
assert(!signal.aborted);
74+
}
75+
76+
{
77+
// `Pool#close` closes all connections
78+
pool.close();
79+
assert(openConnections.every((x) => x.isClosed()));
80+
}
81+
});

Diff for: pool/pool.ts

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export interface Pool<T extends Disposable> {
2+
acquire(signal?: AbortSignal): Promise<T>;
3+
release(conn: T): void;
4+
close(): void;
5+
}

0 commit comments

Comments
 (0)