Skip to content

Commit

Permalink
fix(mongo): fix off by one error in pool
Browse files Browse the repository at this point in the history
also add stress test to see that pools works under load.

Add client.stats to see some stats
  • Loading branch information
marcj committed Oct 18, 2023
1 parent 1584749 commit 666ba86
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 8 deletions.
5 changes: 3 additions & 2 deletions packages/mongo/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* You should have received a copy of the MIT License along with this program.
*/

import { ConnectionRequest, MongoConnection, MongoConnectionPool, MongoDatabaseTransaction } from './connection.js';
import { ConnectionRequest, MongoConnection, MongoConnectionPool, MongoDatabaseTransaction, MongoStats } from './connection.js';
import { isErrorRetryableRead, isErrorRetryableWrite, MongoError } from './error.js';
import { sleep } from '@deepkit/core';
import { Command } from './command/command.js';
Expand All @@ -23,14 +23,15 @@ export class MongoClient {

public readonly config: MongoClientConfig;
public connectionPool: MongoConnectionPool;
public stats: MongoStats = new MongoStats;

protected serializer: BSONBinarySerializer = mongoBinarySerializer;

constructor(
connectionString: string
) {
this.config = new MongoClientConfig(connectionString);
this.connectionPool = new MongoConnectionPool(this.config, this.serializer);
this.connectionPool = new MongoConnectionPool(this.config, this.serializer, this.stats);
}

public resolveCollectionName(schema: ReflectionClass<any>): string {
Expand Down
32 changes: 28 additions & 4 deletions packages/mongo/src/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ export interface ConnectionRequest {
nearest?: boolean;
}

export class MongoStats {
/**
* How many connections have been created.
*/
connectionsCreated: number = 0;

/**
* How many connections have been reused.
*/
connectionsReused: number = 0;

/**
* How many connection requests were queued because pool was full.
*/
connectionsQueued: number = 0;
}

export class MongoConnectionPool {
protected connectionId: number = 0;
/**
Expand All @@ -48,8 +65,11 @@ export class MongoConnectionPool {

protected lastError?: Error;

constructor(protected config: MongoClientConfig,
protected serializer: BSONBinarySerializer) {
constructor(
protected config: MongoClientConfig,
protected serializer: BSONBinarySerializer,
protected stats: MongoStats,
) {
}

protected async waitForAllConnectionsToConnect(throws: boolean = false): Promise<void> {
Expand Down Expand Up @@ -123,6 +143,7 @@ export class MongoConnectionPool {
}

protected newConnection(host: Host): MongoConnection {
this.stats.connectionsCreated++;
const connection = new MongoConnection(this.connectionId++, host, this.config, this.serializer, (connection) => {
arrayRemoveItem(host.connections, connection);
arrayRemoveItem(this.connections, connection);
Expand All @@ -139,6 +160,7 @@ export class MongoConnectionPool {
if (this.queue.length) {
const waiter = this.queue.shift();
if (waiter) {
this.stats.connectionsReused++;
waiter(connection);
return;
}
Expand Down Expand Up @@ -173,6 +195,7 @@ export class MongoConnectionPool {
if (!connection.host.isReadable()) continue;
}

this.stats.connectionsReused++;
connection.reserved = true;
if (connection.cleanupTimeout) {
clearTimeout(connection.cleanupTimeout);
Expand All @@ -182,13 +205,14 @@ export class MongoConnectionPool {
return connection;
}

if (this.connections.length <= this.config.options.maxPoolSize) {
const connection = await this.createAdditionalConnectionForRequest(request);
if (this.connections.length < this.config.options.maxPoolSize) {
const connection = this.createAdditionalConnectionForRequest(request);
connection.reserved = true;
return connection;
}

return asyncOperation((resolve) => {
this.stats.connectionsQueued++;
this.queue.push(resolve);
});
}
Expand Down
60 changes: 58 additions & 2 deletions packages/mongo/tests/client/client.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { expect, test } from '@jest/globals';
import { expect, test, jest } from '@jest/globals';
import { MongoClient } from '../../src/client/client.js';
import { HostType } from '../../src/client/host.js';
import { IsMasterCommand } from '../../src/client/command/ismaster.js';
Expand All @@ -7,6 +7,8 @@ import { ConnectionOptions } from '../../src/client/options.js';
import { cast, validatedDeserialize } from '@deepkit/type';
import { createConnection } from 'net';

jest.setTimeout(60000);

test('ConnectionOptions', async () => {
{
const options = cast<ConnectionOptions>({});
Expand Down Expand Up @@ -129,8 +131,62 @@ test('connect isMaster command', async () => {
//
// });

test('connection pool 1', async () => {
const client = new MongoClient('mongodb://127.0.0.1?maxPoolSize=1');

//spawn 10 promises, each requesting a connection and releasing it a few ms later
const promises: Promise<any>[] = [];

async function test() {
const c = await client.connectionPool.getConnection();
await sleep(0.1 * Math.random());
c.release();
}

for (let i = 0; i < 10; i++) {
promises.push(test());
}

await Promise.all(promises);

expect(client.stats.connectionsCreated).toBe(1);
expect(client.stats.connectionsReused).toBe(10);
expect(client.stats.connectionsQueued).toBe(9);

client.close();
});

test('connection pool stress test', async () => {
const client = new MongoClient('mongodb://127.0.0.1?maxPoolSize=2');

//spawn many promises, each requesting a connection and releasing it a few ms later
const promises: Promise<any>[] = [];

async function test() {
const c = await client.connectionPool.getConnection();
await sleep(0.001 * Math.random());
c.release();
}

const batch = 500;
for (let i = 0; i < 5_000; i++) {
promises.push(test());
if (i % batch === 0) {
await Promise.all(promises);
promises.length = 0;
console.log('batch', i);
}
}

await Promise.all(promises);

expect(client.stats.connectionsCreated).toBe(2);
expect(client.stats.connectionsReused).toBe(4999);

client.close();
});

test('connection pool', async () => {
test('connection pool 10', async () => {
const client = new MongoClient('mongodb://127.0.0.1?maxPoolSize=10');

{
Expand Down

0 comments on commit 666ba86

Please sign in to comment.