Skip to content

Commit

Permalink
fix(mongo): also check connection request for queued requests.
Browse files Browse the repository at this point in the history
fix also some wrong `needsWritableHost` values in various commands.
  • Loading branch information
marcj committed Oct 18, 2023
1 parent 666ba86 commit 6c6ca94
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 30 deletions.
4 changes: 2 additions & 2 deletions packages/mongo/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export class MongoClient {
/**
* Returns an existing or new connection, that needs to be released once done using it.
*/
async getConnection(request: ConnectionRequest = {}, transaction?: MongoDatabaseTransaction): Promise<MongoConnection> {
async getConnection(request: Partial<ConnectionRequest> = {}, transaction?: MongoDatabaseTransaction): Promise<MongoConnection> {
if (transaction && transaction.connection) return transaction.connection;
const connection = await this.connectionPool.getConnection(request);
if (transaction) {
Expand All @@ -73,7 +73,7 @@ export class MongoClient {

public async execute<T extends Command>(command: T): Promise<ReturnType<T['execute']>> {
const maxRetries = 10;
const request = { writable: command.needsWritableHost() };
const request = { readonly: !command.needsWritableHost() };

for (let i = 1; i <= maxRetries; i++) {
const connection = await this.connectionPool.getConnection(request);
Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/createCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ export class CreateCollectionCommand<T extends ReflectionClass<any>> extends Com
}

needsWritableHost(): boolean {
return false;
return true;
}
}
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/createIndexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ export class CreateIndexesCommand<T extends ReflectionClass<any>> extends Comman
}

needsWritableHost(): boolean {
return false;
return true;
}
}
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/findAndModify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ export class FindAndModifyCommand<T extends ReflectionClass<any>> extends Comman
}

needsWritableHost(): boolean {
return false;
return true;
}
}
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ export class UpdateCommand<T extends ReflectionClass<any>> extends Command {
}

needsWritableHost(): boolean {
return false;
return true;
}
}
56 changes: 34 additions & 22 deletions packages/mongo/src/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ export enum MongoConnectionStatus {
}

export interface ConnectionRequest {
writable?: boolean;
nearest?: boolean;
readonly: boolean;
nearest: boolean;
}

export class MongoStats {
Expand All @@ -59,7 +59,7 @@ export class MongoConnectionPool {
*/
public connections: MongoConnection[] = [];

protected queue: ((connection: MongoConnection) => void)[] = [];
protected queue: {resolve: (connection: MongoConnection) => void, request: ConnectionRequest}[] = [];

protected nextConnectionClose: Promise<boolean> = Promise.resolve(true);

Expand Down Expand Up @@ -128,11 +128,11 @@ export class MongoConnectionPool {
protected findHostForRequest(hosts: Host[], request: ConnectionRequest): Host {
//todo, handle request.nearest
for (const host of hosts) {
if (request.writable && host.isWritable()) return host;
if (!request.writable && host.isReadable()) return host;
if (!request.readonly && host.isWritable()) return host;
if (request.readonly && host.isReadable()) return host;
}

throw new MongoError(`Could not find host for connection request. (writable=${request.writable}, hosts=${hosts.length}). Last Error: ${this.lastError}`);
throw new MongoError(`Could not find host for connection request. (readonly=${request.readonly}, hosts=${hosts.length}). Last Error: ${this.lastError}`);
}

protected createAdditionalConnectionForRequest(request: ConnectionRequest): MongoConnection {
Expand All @@ -157,13 +157,17 @@ export class MongoConnectionPool {
}

protected release(connection: MongoConnection) {
if (this.queue.length) {
const waiter = this.queue.shift();
if (waiter) {
this.stats.connectionsReused++;
waiter(connection);
return;
}
for (let i = 0; i < this.queue.length; i++) {
const waiter = this.queue[i];
if (!this.matchRequest(connection, waiter.request)) continue;

this.stats.connectionsReused++;
this.queue.splice(i, 1);
waiter.resolve(connection);
//we don't set reserved/set cleanupTimeout,
//since the connection is already reserved and the timeout
//is only set when the connection actually starting idling.
return;
}

connection.reserved = false;
Expand All @@ -176,10 +180,23 @@ export class MongoConnectionPool {
}, this.config.options.maxIdleTimeMS);
}

protected matchRequest(connection: MongoConnection, request: ConnectionRequest): boolean {
if (!request.readonly && !connection.host.isWritable()) return false;

if (!request.readonly) {
if (connection.host.isSecondary() && !this.config.options.secondaryReadAllowed) return false;
if (!connection.host.isReadable()) return false;
}

return true;
}

/**
* Returns an existing or new connection, that needs to be released once done using it.
*/
async getConnection(request: ConnectionRequest = {}): Promise<MongoConnection> {
async getConnection(request: Partial<ConnectionRequest> = {}): Promise<MongoConnection> {
const r = Object.assign({ readonly: false, nearest: false }, request) as ConnectionRequest;

await this.ensureHostsConnected(true);

for (const connection of this.connections) {
Expand All @@ -188,12 +205,7 @@ export class MongoConnectionPool {

if (request.nearest) throw new Error('Nearest not implemented yet');

if (request.writable && !connection.host.isWritable()) continue;

if (!request.writable) {
if (connection.host.isSecondary() && !this.config.options.secondaryReadAllowed) continue;
if (!connection.host.isReadable()) continue;
}
if (!this.matchRequest(connection, r)) continue;

this.stats.connectionsReused++;
connection.reserved = true;
Expand All @@ -206,14 +218,14 @@ export class MongoConnectionPool {
}

if (this.connections.length < this.config.options.maxPoolSize) {
const connection = this.createAdditionalConnectionForRequest(request);
const connection = this.createAdditionalConnectionForRequest(r);
connection.reserved = true;
return connection;
}

return asyncOperation((resolve) => {
this.stats.connectionsQueued++;
this.queue.push(resolve);
this.queue.push({resolve, request: r});
});
}
}
Expand Down
7 changes: 5 additions & 2 deletions packages/mongo/tests/client/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ test('connection pool 10', async () => {

const c3 = await client.connectionPool.getConnection();
expect(c3 === c1).toBe(true);
c3.release();
}

{
Expand All @@ -217,12 +218,12 @@ test('connection pool 10', async () => {
let c11: any;
client.connectionPool.getConnection().then((c) => {
c11 = c;
expect(c11.id).toBe(1);
expect(c11.id).toBe(0);
});
let c12: any;
client.connectionPool.getConnection().then((c) => {
c12 = c;
expect(c12.id).toBe(2);
expect(c12.id).toBe(1);
});
await sleep(0.01);
expect(c11).toBe(undefined);
Expand All @@ -236,4 +237,6 @@ test('connection pool 10', async () => {
await sleep(0.01);
expect(c12).toBe(c2);
}

client.close();
});

0 comments on commit 6c6ca94

Please sign in to comment.