From 6275c377a557c5eda3d50b35f2b8ab5e868861dc Mon Sep 17 00:00:00 2001 From: "Marc J. Schmidt" Date: Tue, 13 Feb 2024 11:43:20 +0100 Subject: [PATCH] feat(mongo): add readPreference support Option `readPreference` is now correctly passed to all commands. --- packages/mongo/index.ts | 3 + .../mongo/src/client/command/aggregate.ts | 25 +++--- packages/mongo/src/client/command/command.ts | 84 ++++++++++++++++--- packages/mongo/src/client/command/count.ts | 17 ++-- packages/mongo/src/client/command/find.ts | 20 ++--- .../mongo/src/client/command/findAndModify.ts | 15 ++-- packages/mongo/src/client/command/getMore.ts | 11 +-- packages/mongo/src/client/config.ts | 23 ++++- packages/mongo/src/client/connection.ts | 31 +++++-- packages/mongo/src/client/options.ts | 20 ++++- packages/mongo/tests/client/client.spec.ts | 7 +- 11 files changed, 177 insertions(+), 79 deletions(-) diff --git a/packages/mongo/index.ts b/packages/mongo/index.ts index dbbab1e9d..78dda7d96 100644 --- a/packages/mongo/index.ts +++ b/packages/mongo/index.ts @@ -15,6 +15,9 @@ export * from './src/query.model.js'; export * from './src/query.resolver.js'; export * from './src/query.js'; +export * from './src/client/host.js'; +export * from './src/client/dns.js'; +export * from './src/client/connection.js'; export * from './src/client/client.js'; export * from './src/client/config.js'; export * from './src/client/error.js'; diff --git a/packages/mongo/src/client/command/aggregate.ts b/packages/mongo/src/client/command/aggregate.ts index ad5df8200..8364ad9da 100644 --- a/packages/mongo/src/client/command/aggregate.ts +++ b/packages/mongo/src/client/command/aggregate.ts @@ -9,23 +9,20 @@ */ import { toFastProperties } from '@deepkit/core'; -import { BaseResponse, Command } from './command.js'; -import { getTypeJitContainer, InlineRuntimeType, isType, ReflectionClass, Type, typeOf, UUID } from '@deepkit/type'; +import { BaseResponse, Command, ReadPreferenceMessage, TransactionalMessage } from './command.js'; +import { getTypeJitContainer, InlineRuntimeType, isType, ReflectionClass, Type, typeOf } from '@deepkit/type'; import { MongoError } from '../error.js'; import { GetMoreMessage } from './getMore.js'; +import { MongoClientConfig } from '../config.js'; -interface AggregateMessage { +type AggregateMessage = { aggregate: string; $db: string; pipeline: any[], cursor: { batchSize: number, }, - lsid?: { id: UUID }, - txnNumber?: number, - startTransaction?: boolean, - autocommit?: boolean, -} +} & TransactionalMessage & ReadPreferenceMessage; export class AggregateCommand extends Command { partial: boolean = false; @@ -39,17 +36,18 @@ export class AggregateCommand extends Command { super(); } - async execute(config, host, transaction): Promise { - const cmd = { + async execute(config: MongoClientConfig, host, transaction): Promise { + const cmd: AggregateMessage = { aggregate: this.schema.getCollectionName() || 'unknown', $db: this.schema.databaseSchemaName || config.defaultDb || 'admin', pipeline: this.pipeline, cursor: { - batchSize: this.batchSize - } + batchSize: this.batchSize, + }, }; if (transaction) transaction.applyTransaction(cmd); + config.applyReadPreference(cmd); let resultSchema = this.resultSchema || this.schema; if (resultSchema && !isType(resultSchema)) resultSchema = resultSchema.type; @@ -95,13 +93,14 @@ export class AggregateCommand extends Command { let cursorId = res.cursor.id; while (cursorId) { - const nextCommand = { + const nextCommand: GetMoreMessage = { getMore: cursorId, $db: cmd.$db, collection: cmd.aggregate, batchSize: cmd.cursor.batchSize, }; if (transaction) transaction.applyTransaction(nextCommand); + config.applyReadPreference(nextCommand); const next = await this.sendAndWait(nextCommand, undefined, specialisedResponse); if (next.cursor.nextBatch) { diff --git a/packages/mongo/src/client/command/command.ts b/packages/mongo/src/client/command/command.ts index e86bbf6bd..283f43939 100644 --- a/packages/mongo/src/client/command/command.ts +++ b/packages/mongo/src/client/command/command.ts @@ -13,7 +13,19 @@ import { handleErrorResponse, MongoDatabaseError, MongoError } from '../error.js import { MongoClientConfig } from '../config.js'; import { Host } from '../host.js'; import type { MongoDatabaseTransaction } from '../connection.js'; -import { ReceiveType, resolveReceiveType, SerializationError, stringifyType, Type, typeOf, typeSettings, UnpopulatedCheck, ValidationError } from '@deepkit/type'; +import { + InlineRuntimeType, + ReceiveType, + resolveReceiveType, + SerializationError, + stringifyType, + Type, + typeOf, + typeSettings, + UnpopulatedCheck, + UUID, + ValidationError, +} from '@deepkit/type'; import { BSONDeserializer, deserializeBSONWithoutOptimiser, getBSONDeserializer } from '@deepkit/bson'; import { mongoBinarySerializer } from '../../mongo-serializer.js'; @@ -25,6 +37,25 @@ export interface BaseResponse { writeErrors?: Array<{ index: number, code: number, errmsg: string }>; } +export interface TransactionalMessage { + lsid?: { id: UUID }; + txnNumber?: bigint; + startTransaction?: boolean; + autocommit?: boolean; + + abortTransaction?: 1; + commitTransaction?: 1; +} + +export interface ReadPreferenceMessage { + $readPreference?: { + mode: string; + tags?: { [name: string]: string }[]; + maxStalenessSeconds?: number; + hedge?: { enabled: boolean } + }; +} + export abstract class Command { protected current?: { responseType?: Type, resolve: Function, reject: Function }; @@ -37,13 +68,19 @@ export abstract class Command { this.sender(resolveReceiveType(messageType), message); return asyncOperation((resolve, reject) => { - this.current = { resolve, reject, responseType: responseType ? resolveReceiveType(responseType) : typeOf() }; + this.current = { + resolve, + reject, + responseType: responseType ? resolveReceiveType(responseType) : typeOf(), + }; }); } abstract execute(config: MongoClientConfig, host: Host, transaction?: MongoDatabaseTransaction): Promise; - abstract needsWritableHost(): boolean; + needsWritableHost(): boolean { + return false; + } handleResponse(response: Uint8Array): void { if (!this.current) throw new Error('Got handleResponse without active command'); @@ -87,19 +124,46 @@ export abstract class Command { } } -export function createCommand( - request: Request, - options: Partial<{ needsWritableHost: boolean }> = {}, +interface CommandOptions { + // default false + needsWritableHost: boolean; + + // default true + transactional: boolean; + + // default true + readPreference: boolean; +} + +export function createCommand( + request: Request | ((config: MongoClientConfig) => Request), + optionsIn: Partial = {}, typeRequest?: ReceiveType, typeResponse?: ReceiveType, -): Command { +): Command { + const options: CommandOptions = Object.assign( + { needsWritableHost: false, transactional: true, readPreference: true }, + optionsIn, + ); + + typeRequest = resolveReceiveType(typeRequest); + type FullTypeRequest = InlineRuntimeType & TransactionalMessage & ReadPreferenceMessage; + typeRequest = typeOf(); + + typeResponse = resolveReceiveType(typeResponse); + type FullTypeResponse = InlineRuntimeType & BaseResponse; + typeResponse = typeOf(); + class DynamicCommand extends Command { - async execute(): Promise { - return this.sendAndWait(request, typeRequest, typeResponse); + async execute(config: MongoClientConfig, host, transaction?): Promise { + const cmd = 'function' === typeof request ? request(config) : request; + if (options.transactional && transaction) transaction.applyTransaction(cmd); + if (options.readPreference) config.applyReadPreference(cmd as any); + return await this.sendAndWait(cmd, typeRequest, typeResponse as Type) as any; } needsWritableHost(): boolean { - return options.needsWritableHost || false; + return options.needsWritableHost; } } diff --git a/packages/mongo/src/client/command/count.ts b/packages/mongo/src/client/command/count.ts index a0ee1d2da..9a05a51d0 100644 --- a/packages/mongo/src/client/command/count.ts +++ b/packages/mongo/src/client/command/count.ts @@ -8,24 +8,20 @@ * You should have received a copy of the MIT License along with this program. */ -import { BaseResponse, Command } from './command.js'; -import { ReflectionClass, UUID } from '@deepkit/type'; +import { BaseResponse, Command, ReadPreferenceMessage, TransactionalMessage } from './command.js'; +import { ReflectionClass } from '@deepkit/type'; interface CountResponse extends BaseResponse { n: number; } -interface CountSchema { +type CountSchema = { count: string; $db: string; limit?: number; query: any; skip?: number; - lsid?: { id: UUID }; - txnNumber?: number; - startTransaction?: boolean; - autocommit?: boolean; -} +} & TransactionalMessage & ReadPreferenceMessage; export class CountCommand> extends Command { constructor( @@ -47,12 +43,9 @@ export class CountCommand> extends Command(cmd); return res.n; } - - needsWritableHost(): boolean { - return false; - } } diff --git a/packages/mongo/src/client/command/find.ts b/packages/mongo/src/client/command/find.ts index 10b00fb3c..2bbd02c96 100644 --- a/packages/mongo/src/client/command/find.ts +++ b/packages/mongo/src/client/command/find.ts @@ -8,14 +8,14 @@ * You should have received a copy of the MIT License along with this program. */ -import { BaseResponse, Command } from './command.js'; +import { BaseResponse, Command, ReadPreferenceMessage, TransactionalMessage } from './command.js'; import { toFastProperties } from '@deepkit/core'; import { DEEP_SORT } from '../../query.model.js'; -import { InlineRuntimeType, ReflectionClass, ReflectionKind, typeOf, TypeUnion, UUID } from '@deepkit/type'; +import { InlineRuntimeType, ReflectionClass, ReflectionKind, typeOf, TypeUnion } from '@deepkit/type'; import { MongoError } from '../error.js'; import { GetMoreMessage } from './getMore.js'; -interface FindSchema { +type FindSchema = { find: string; $db: string; batchSize: number; @@ -24,11 +24,7 @@ interface FindSchema { filter: any; projection?: any; sort?: any; - lsid?: { id: UUID }, - txnNumber?: number, - startTransaction?: boolean, - autocommit?: boolean, -} +} & TransactionalMessage & ReadPreferenceMessage export class FindCommand extends Command { batchSize: number = 1_000_000; @@ -55,6 +51,7 @@ export class FindCommand extends Command { }; if (transaction) transaction.applyTransaction(cmd); + config.applyReadPreference(cmd); if (this.projection) cmd.projection = this.projection; if (this.sort) cmd.sort = this.sort; @@ -132,13 +129,14 @@ export class FindCommand extends Command { let cursorId = res.cursor.id; while (cursorId) { - const nextCommand = { + const nextCommand: GetMoreMessage = { getMore: cursorId, $db: cmd.$db, collection: cmd.find, batchSize: cmd.batchSize, }; if (transaction) transaction.applyTransaction(nextCommand); + config.applyReadPreference(nextCommand); const next = await this.sendAndWait(nextCommand, undefined, specialisedResponse); if (next.cursor.nextBatch) { @@ -149,8 +147,4 @@ export class FindCommand extends Command { return result; } - - needsWritableHost(): boolean { - return false; - } } diff --git a/packages/mongo/src/client/command/findAndModify.ts b/packages/mongo/src/client/command/findAndModify.ts index 1a4830236..e08eefa0e 100644 --- a/packages/mongo/src/client/command/findAndModify.ts +++ b/packages/mongo/src/client/command/findAndModify.ts @@ -8,14 +8,14 @@ * You should have received a copy of the MIT License along with this program. */ -import { BaseResponse, Command } from './command.js'; -import { ReflectionClass, UUID } from '@deepkit/type'; +import { BaseResponse, Command, ReadPreferenceMessage, TransactionalMessage } from './command.js'; +import { ReflectionClass } from '@deepkit/type'; interface FindAndModifyResponse extends BaseResponse { value: any; } -interface findAndModifySchema { +type findAndModifySchema = { findAndModify: string; $db: string; query: any; @@ -23,11 +23,7 @@ interface findAndModifySchema { new: boolean; upsert: boolean; fields: Record; - lsid?: { id: UUID }; - txnNumber?: number; - autocommit?: boolean; - startTransaction?: boolean; -} +} & TransactionalMessage & ReadPreferenceMessage; export class FindAndModifyCommand> extends Command { public upsert = false; @@ -46,7 +42,7 @@ export class FindAndModifyCommand> extends Comman const fields = {}; for (const name of this.fields) fields[name] = 1; - const cmd: any = { + const cmd: findAndModifySchema = { findAndModify: this.schema.getCollectionName() || 'unknown', $db: this.schema.databaseSchemaName || config.defaultDb || 'admin', query: this.query, @@ -57,6 +53,7 @@ export class FindAndModifyCommand> extends Comman }; if (transaction) transaction.applyTransaction(cmd); + config.applyReadPreference(cmd); return await this.sendAndWait(cmd); } diff --git a/packages/mongo/src/client/command/getMore.ts b/packages/mongo/src/client/command/getMore.ts index 150345d66..01b2e77ee 100644 --- a/packages/mongo/src/client/command/getMore.ts +++ b/packages/mongo/src/client/command/getMore.ts @@ -1,15 +1,10 @@ -import { UUID } from '@deepkit/type'; +import { ReadPreferenceMessage, TransactionalMessage } from './command.js'; -export interface GetMoreMessage { +export type GetMoreMessage = { getMore: bigint; $db: string; collection: string; batchSize?: number; maxTimeMS?: number; comment?: string; - - lsid?: { id: UUID }, - txnNumber?: number, - startTransaction?: boolean, - autocommit?: boolean, -} +} & TransactionalMessage & ReadPreferenceMessage; diff --git a/packages/mongo/src/client/config.ts b/packages/mongo/src/client/config.ts index 316eafc11..961d1444b 100644 --- a/packages/mongo/src/client/config.ts +++ b/packages/mongo/src/client/config.ts @@ -15,7 +15,8 @@ import { parse as parseQueryString } from 'querystring'; import { MongoError } from './error.js'; import { arrayRemoveItem, eachPair, singleStack } from '@deepkit/core'; import { resolveSrvHosts } from './dns.js'; -import { ReflectionClass, validatedDeserialize } from '@deepkit/type'; +import { cast, ReflectionClass } from '@deepkit/type'; +import { ReadPreferenceMessage } from './command/command.js'; /** * Default URL: @@ -54,6 +55,21 @@ export class MongoClientConfig { this.parseConnectionString(connectionString); } + applyReadPreference(cmd: ReadPreferenceMessage) { + if (this.options.readPreference) { + cmd.$readPreference = { + mode: this.options.readPreference, + }; + if (this.options.getReadPreferenceTags().length) { + cmd.$readPreference.tags = this.options.getReadPreferenceTags(); + } + if (this.options.maxStalenessSeconds) { + cmd.$readPreference.maxStalenessSeconds = this.options.maxStalenessSeconds; + } + if (this.options.hedge) cmd.$readPreference.hedge = { enabled: true }; + } + } + protected parseConnectionString(url: string) { //we replace only first `,` with `/,` so we get additional host names in parsed.path url = url.replace(',', '/,'); @@ -89,6 +105,7 @@ export class MongoClientConfig { } this.defaultDb = defaultDb; + if (parsed.auth) { const firstColon = parsed.auth.indexOf(':'); if (firstColon === -1) { @@ -100,7 +117,7 @@ export class MongoClientConfig { } const options = parsed.query ? parseQueryString(parsed.query) : {}; - this.options = validatedDeserialize(options); + this.options = cast(options); if (url.startsWith('mongodb+srv://')) { this.isSrv = true; @@ -154,7 +171,7 @@ export class MongoClientConfig { const hostsData = await this.resolveSrvHosts(); const options = { ...hostsData.options ? parseQueryString(hostsData.options) : {} }; - const partialOptions = validatedDeserialize(options) as {}; + const partialOptions = cast(options) as {}; for (const [k, v] of eachPair(partialOptions)) { this.options[k] = v; } diff --git a/packages/mongo/src/client/connection.ts b/packages/mongo/src/client/connection.ts index e2e03e47d..5a5b15b26 100644 --- a/packages/mongo/src/client/connection.ts +++ b/packages/mongo/src/client/connection.ts @@ -12,7 +12,7 @@ import { arrayRemoveItem, asyncOperation, formatError } from '@deepkit/core'; import { Host } from './host.js'; import { createConnection, Socket } from 'net'; import { connect as createTLSConnection, TLSSocket } from 'tls'; -import { Command } from './command/command.js'; +import { Command, TransactionalMessage } from './command/command.js'; import { stringifyType, Type, uuid } from '@deepkit/type'; import { BSONBinarySerializer, getBSONSerializer, getBSONSizer, Writer } from '@deepkit/bson'; import { HandshakeCommand } from './command/handshake.js'; @@ -49,6 +49,9 @@ export class MongoStats { * How many connection requests were queued because pool was full. */ connectionsQueued: number = 0; + + bytesReceived: number = 0; + bytesSent: number = 0; } export class MongoConnectionPool { @@ -149,6 +152,10 @@ export class MongoConnectionPool { //onClose does not automatically reconnect. Only new commands re-establish connections. }, (connection) => { this.release(connection); + }, (bytesSent) => { + this.stats.bytesSent += bytesSent; + }, (bytesReceived) =>{ + this.stats.bytesReceived += bytesReceived; }); host.connections.push(connection); this.connections.push(connection); @@ -233,7 +240,6 @@ export function readUint32LE(buffer: Uint8Array | ArrayBuffer, offset: number = return buffer[offset] + (buffer[offset + 1] * 2 ** 8) + (buffer[offset + 2] * 2 ** 16) + (buffer[offset + 3] * 2 ** 24); } - export class MongoDatabaseTransaction extends DatabaseTransaction { static txnNumber: bigint = 0n; @@ -242,7 +248,7 @@ export class MongoDatabaseTransaction extends DatabaseTransaction { txnNumber: bigint = 0n; started: boolean = false; - applyTransaction(cmd: any) { + applyTransaction(cmd: TransactionalMessage) { if (!this.lsid) return; cmd.lsid = this.lsid; cmd.txnNumber = this.txnNumber; @@ -303,6 +309,9 @@ export class MongoConnection { responseParser: ResponseParser; error?: Error; + bytesReceived: number = 0; + bytesSent: number = 0; + protected boundSendMessage = this.sendMessage.bind(this); constructor( @@ -312,6 +321,8 @@ export class MongoConnection { protected serializer: BSONBinarySerializer, protected onClose: (connection: MongoConnection) => void, protected onRelease: (connection: MongoConnection) => void, + protected onSent: (bytes: number) => void, + protected onReceived: (bytes: number) => void, ) { this.responseParser = new ResponseParser(this.onResponse.bind(this)); @@ -337,7 +348,11 @@ export class MongoConnection { } this.socket = createTLSConnection(options); - this.socket.on('data', (data) => this.responseParser.feed(data)); + this.socket.on('data', (data) => { + this.bytesReceived += data.byteLength; + this.onReceived(data.byteLength); + this.responseParser.feed(data); + }); } else { this.socket = createConnection({ host: host.hostname, @@ -345,7 +360,11 @@ export class MongoConnection { timeout: config.options.connectTimeoutMS, }); - this.socket.on('data', (data) => this.responseParser.feed(data)); + this.socket.on('data', (data) => { + this.bytesReceived += data.byteLength; + this.onReceived(data.byteLength); + this.responseParser.feed(data); + }); // const socket = this.socket = turbo.connect(host.port, host.hostname); // // this.socket.setNoDelay(true); @@ -478,6 +497,8 @@ export class MongoConnection { writer.writeInt32(messageLength); //detect backPressure + this.bytesSent += buffer.byteLength; + this.onSent(buffer.byteLength); this.socket.write(buffer); } catch (error) { console.log('failed sending message', message, 'for type', stringifyType(type)); diff --git a/packages/mongo/src/client/options.ts b/packages/mongo/src/client/options.ts index f39374263..b372a98f4 100644 --- a/packages/mongo/src/client/options.ts +++ b/packages/mongo/src/client/options.ts @@ -21,6 +21,7 @@ export class ConnectionOptions { connectTimeoutMS: number = 10000; socketTimeoutMS: number = 36000; + w?: string; wtimeoutMS?: number; journal?: string; @@ -35,7 +36,8 @@ export class ConnectionOptions { readPreference: 'primary' | 'primaryPreferred' | 'secondary' | 'secondaryPreferred' | 'nearest' | 'unknown' = 'primary'; maxStalenessSeconds?: number; - readPreferenceTags?: string; + readPreferenceTags?: string; //e.g. "dc:ny,rack:1" + hedge?: boolean; compressors?: 'snappy' | 'zlib' | 'zstd'; zlibCompressionLevel?: number; @@ -45,6 +47,7 @@ export class ConnectionOptions { authMechanismProperties?: string; gssapiServiceName?: string; + ssl?: boolean; tlsCertificateFile?: string; tlsCertificateKeyFile?: string; @@ -60,6 +63,21 @@ export class ConnectionOptions { maxIdleTimeMS: number = 100; waitQueueTimeoutMS: number = 0; + protected parsedReadPreferenceTags?: { [name: string]: string }[]; + + getReadPreferenceTags(): { [name: string]: string }[] { + if (!this.parsedReadPreferenceTags) { + this.parsedReadPreferenceTags = []; + if (this.readPreferenceTags) { + for (const tag of this.readPreferenceTags.split(',')) { + const [name, value] = tag.split(':'); + this.parsedReadPreferenceTags.push({ [name]: value }); + } + } + } + return this.parsedReadPreferenceTags; + } + getAuthMechanismProperties(): AuthMechanismProperties { const properties: AuthMechanismProperties = {}; if (!this.authMechanismProperties) return properties; diff --git a/packages/mongo/tests/client/client.spec.ts b/packages/mongo/tests/client/client.spec.ts index 268b9651c..ab29c4fb9 100644 --- a/packages/mongo/tests/client/client.spec.ts +++ b/packages/mongo/tests/client/client.spec.ts @@ -8,7 +8,7 @@ import { cast, validatedDeserialize } from '@deepkit/type'; import { createConnection } from 'net'; import { fail } from 'assert'; import { MongoConnectionError } from '../../src/client/error.js'; -import { BaseResponse, createCommand } from '../../index.js'; +import { createCommand } from '../../index.js'; jest.setTimeout(60000); @@ -70,10 +70,7 @@ test('custom command', async () => { $db: string; } - interface Response extends BaseResponse { - ismaster: boolean; - } - const command = createCommand({isMaster: 1, $db: 'deepkit'}); + const command = createCommand({isMaster: 1, $db: 'deepkit'}); const client = new MongoClient('mongodb://127.0.0.1/'); const res = await client.execute(command);