Skip to content

Commit

Permalink
feat(mongo): add readPreference support
Browse files Browse the repository at this point in the history
Option `readPreference` is now correctly passed to all commands.
  • Loading branch information
marcj committed Feb 13, 2024
1 parent d030310 commit 6275c37
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 79 deletions.
3 changes: 3 additions & 0 deletions packages/mongo/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ export * from './src/query.model.js';
export * from './src/query.resolver.js';
export * from './src/query.js';

export * from './src/client/host.js';
export * from './src/client/dns.js';
export * from './src/client/connection.js';
export * from './src/client/client.js';
export * from './src/client/config.js';
export * from './src/client/error.js';
Expand Down
25 changes: 12 additions & 13 deletions packages/mongo/src/client/command/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,20 @@
*/

import { toFastProperties } from '@deepkit/core';
import { BaseResponse, Command } from './command.js';
import { getTypeJitContainer, InlineRuntimeType, isType, ReflectionClass, Type, typeOf, UUID } from '@deepkit/type';
import { BaseResponse, Command, ReadPreferenceMessage, TransactionalMessage } from './command.js';
import { getTypeJitContainer, InlineRuntimeType, isType, ReflectionClass, Type, typeOf } from '@deepkit/type';
import { MongoError } from '../error.js';
import { GetMoreMessage } from './getMore.js';
import { MongoClientConfig } from '../config.js';

interface AggregateMessage {
type AggregateMessage = {
aggregate: string;
$db: string;
pipeline: any[],
cursor: {
batchSize: number,
},
lsid?: { id: UUID },
txnNumber?: number,
startTransaction?: boolean,
autocommit?: boolean,
}
} & TransactionalMessage & ReadPreferenceMessage;

export class AggregateCommand<T, R = BaseResponse> extends Command<R[]> {
partial: boolean = false;
Expand All @@ -39,17 +36,18 @@ export class AggregateCommand<T, R = BaseResponse> extends Command<R[]> {
super();
}

async execute(config, host, transaction): Promise<R[]> {
const cmd = {
async execute(config: MongoClientConfig, host, transaction): Promise<R[]> {
const cmd: AggregateMessage = {
aggregate: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
pipeline: this.pipeline,
cursor: {
batchSize: this.batchSize
}
batchSize: this.batchSize,
},
};

if (transaction) transaction.applyTransaction(cmd);
config.applyReadPreference(cmd);
let resultSchema = this.resultSchema || this.schema;
if (resultSchema && !isType(resultSchema)) resultSchema = resultSchema.type;

Expand Down Expand Up @@ -95,13 +93,14 @@ export class AggregateCommand<T, R = BaseResponse> extends Command<R[]> {

let cursorId = res.cursor.id;
while (cursorId) {
const nextCommand = {
const nextCommand: GetMoreMessage = {
getMore: cursorId,
$db: cmd.$db,
collection: cmd.aggregate,
batchSize: cmd.cursor.batchSize,
};
if (transaction) transaction.applyTransaction(nextCommand);
config.applyReadPreference(nextCommand);
const next = await this.sendAndWait<GetMoreMessage, Response>(nextCommand, undefined, specialisedResponse);

if (next.cursor.nextBatch) {
Expand Down
84 changes: 74 additions & 10 deletions packages/mongo/src/client/command/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,19 @@ import { handleErrorResponse, MongoDatabaseError, MongoError } from '../error.js
import { MongoClientConfig } from '../config.js';
import { Host } from '../host.js';
import type { MongoDatabaseTransaction } from '../connection.js';
import { ReceiveType, resolveReceiveType, SerializationError, stringifyType, Type, typeOf, typeSettings, UnpopulatedCheck, ValidationError } from '@deepkit/type';
import {
InlineRuntimeType,
ReceiveType,
resolveReceiveType,
SerializationError,
stringifyType,
Type,
typeOf,
typeSettings,
UnpopulatedCheck,
UUID,
ValidationError,
} from '@deepkit/type';
import { BSONDeserializer, deserializeBSONWithoutOptimiser, getBSONDeserializer } from '@deepkit/bson';
import { mongoBinarySerializer } from '../../mongo-serializer.js';

Expand All @@ -25,6 +37,25 @@ export interface BaseResponse {
writeErrors?: Array<{ index: number, code: number, errmsg: string }>;
}

export interface TransactionalMessage {
lsid?: { id: UUID };
txnNumber?: bigint;
startTransaction?: boolean;
autocommit?: boolean;

abortTransaction?: 1;
commitTransaction?: 1;
}

export interface ReadPreferenceMessage {
$readPreference?: {
mode: string;
tags?: { [name: string]: string }[];
maxStalenessSeconds?: number;
hedge?: { enabled: boolean }
};
}

export abstract class Command<T> {
protected current?: { responseType?: Type, resolve: Function, reject: Function };

Expand All @@ -37,13 +68,19 @@ export abstract class Command<T> {
this.sender(resolveReceiveType(messageType), message);

return asyncOperation((resolve, reject) => {
this.current = { resolve, reject, responseType: responseType ? resolveReceiveType(responseType) : typeOf<BaseResponse>() };
this.current = {
resolve,
reject,
responseType: responseType ? resolveReceiveType(responseType) : typeOf<BaseResponse>(),
};
});
}

abstract execute(config: MongoClientConfig, host: Host, transaction?: MongoDatabaseTransaction): Promise<T>;

abstract needsWritableHost(): boolean;
needsWritableHost(): boolean {
return false;
}

handleResponse(response: Uint8Array): void {
if (!this.current) throw new Error('Got handleResponse without active command');
Expand Down Expand Up @@ -87,19 +124,46 @@ export abstract class Command<T> {
}
}

export function createCommand<Request extends {}, Response extends BaseResponse>(
request: Request,
options: Partial<{ needsWritableHost: boolean }> = {},
interface CommandOptions {
// default false
needsWritableHost: boolean;

// default true
transactional: boolean;

// default true
readPreference: boolean;
}

export function createCommand<Request extends {[name: string]: any}, Response>(
request: Request | ((config: MongoClientConfig) => Request),
optionsIn: Partial<CommandOptions> = {},
typeRequest?: ReceiveType<Request>,
typeResponse?: ReceiveType<Response>,
): Command<Response> {
): Command<Response & BaseResponse> {
const options: CommandOptions = Object.assign(
{ needsWritableHost: false, transactional: true, readPreference: true },
optionsIn,
);

typeRequest = resolveReceiveType(typeRequest);
type FullTypeRequest = InlineRuntimeType<typeof typeRequest> & TransactionalMessage & ReadPreferenceMessage;
typeRequest = typeOf<FullTypeRequest>();

typeResponse = resolveReceiveType(typeResponse);
type FullTypeResponse = InlineRuntimeType<typeof typeResponse> & BaseResponse;
typeResponse = typeOf<FullTypeResponse>();

class DynamicCommand extends Command<Response> {
async execute(): Promise<Response> {
return this.sendAndWait(request, typeRequest, typeResponse);
async execute(config: MongoClientConfig, host, transaction?): Promise<Response & BaseResponse> {
const cmd = 'function' === typeof request ? request(config) : request;
if (options.transactional && transaction) transaction.applyTransaction(cmd);
if (options.readPreference) config.applyReadPreference(cmd as any);
return await this.sendAndWait(cmd, typeRequest, typeResponse as Type) as any;
}

needsWritableHost(): boolean {
return options.needsWritableHost || false;
return options.needsWritableHost;
}
}

Expand Down
17 changes: 5 additions & 12 deletions packages/mongo/src/client/command/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,20 @@
* You should have received a copy of the MIT License along with this program.
*/

import { BaseResponse, Command } from './command.js';
import { ReflectionClass, UUID } from '@deepkit/type';
import { BaseResponse, Command, ReadPreferenceMessage, TransactionalMessage } from './command.js';
import { ReflectionClass } from '@deepkit/type';

interface CountResponse extends BaseResponse {
n: number;
}

interface CountSchema {
type CountSchema = {
count: string;
$db: string;
limit?: number;
query: any;
skip?: number;
lsid?: { id: UUID };
txnNumber?: number;
startTransaction?: boolean;
autocommit?: boolean;
}
} & TransactionalMessage & ReadPreferenceMessage;

export class CountCommand<T extends ReflectionClass<any>> extends Command<number> {
constructor(
Expand All @@ -47,12 +43,9 @@ export class CountCommand<T extends ReflectionClass<any>> extends Command<number
};

if (transaction) transaction.applyTransaction(cmd);
config.applyReadPreference(cmd);

const res = await this.sendAndWait<CountSchema, CountResponse>(cmd);
return res.n;
}

needsWritableHost(): boolean {
return false;
}
}
20 changes: 7 additions & 13 deletions packages/mongo/src/client/command/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
* You should have received a copy of the MIT License along with this program.
*/

import { BaseResponse, Command } from './command.js';
import { BaseResponse, Command, ReadPreferenceMessage, TransactionalMessage } from './command.js';
import { toFastProperties } from '@deepkit/core';
import { DEEP_SORT } from '../../query.model.js';
import { InlineRuntimeType, ReflectionClass, ReflectionKind, typeOf, TypeUnion, UUID } from '@deepkit/type';
import { InlineRuntimeType, ReflectionClass, ReflectionKind, typeOf, TypeUnion } from '@deepkit/type';
import { MongoError } from '../error.js';
import { GetMoreMessage } from './getMore.js';

interface FindSchema {
type FindSchema = {
find: string;
$db: string;
batchSize: number;
Expand All @@ -24,11 +24,7 @@ interface FindSchema {
filter: any;
projection?: any;
sort?: any;
lsid?: { id: UUID },
txnNumber?: number,
startTransaction?: boolean,
autocommit?: boolean,
}
} & TransactionalMessage & ReadPreferenceMessage

export class FindCommand<T> extends Command<T[]> {
batchSize: number = 1_000_000;
Expand All @@ -55,6 +51,7 @@ export class FindCommand<T> extends Command<T[]> {
};

if (transaction) transaction.applyTransaction(cmd);
config.applyReadPreference(cmd);

if (this.projection) cmd.projection = this.projection;
if (this.sort) cmd.sort = this.sort;
Expand Down Expand Up @@ -132,13 +129,14 @@ export class FindCommand<T> extends Command<T[]> {

let cursorId = res.cursor.id;
while (cursorId) {
const nextCommand = {
const nextCommand: GetMoreMessage = {
getMore: cursorId,
$db: cmd.$db,
collection: cmd.find,
batchSize: cmd.batchSize,
};
if (transaction) transaction.applyTransaction(nextCommand);
config.applyReadPreference(nextCommand);
const next = await this.sendAndWait<GetMoreMessage, Response>(nextCommand, undefined, specialisedResponse);

if (next.cursor.nextBatch) {
Expand All @@ -149,8 +147,4 @@ export class FindCommand<T> extends Command<T[]> {

return result;
}

needsWritableHost(): boolean {
return false;
}
}
15 changes: 6 additions & 9 deletions packages/mongo/src/client/command/findAndModify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,22 @@
* You should have received a copy of the MIT License along with this program.
*/

import { BaseResponse, Command } from './command.js';
import { ReflectionClass, UUID } from '@deepkit/type';
import { BaseResponse, Command, ReadPreferenceMessage, TransactionalMessage } from './command.js';
import { ReflectionClass } from '@deepkit/type';

interface FindAndModifyResponse extends BaseResponse {
value: any;
}

interface findAndModifySchema {
type findAndModifySchema = {
findAndModify: string;
$db: string;
query: any;
update: any;
new: boolean;
upsert: boolean;
fields: Record<string, number>;
lsid?: { id: UUID };
txnNumber?: number;
autocommit?: boolean;
startTransaction?: boolean;
}
} & TransactionalMessage & ReadPreferenceMessage;

export class FindAndModifyCommand<T extends ReflectionClass<any>> extends Command<FindAndModifyResponse> {
public upsert = false;
Expand All @@ -46,7 +42,7 @@ export class FindAndModifyCommand<T extends ReflectionClass<any>> extends Comman
const fields = {};
for (const name of this.fields) fields[name] = 1;

const cmd: any = {
const cmd: findAndModifySchema = {
findAndModify: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
query: this.query,
Expand All @@ -57,6 +53,7 @@ export class FindAndModifyCommand<T extends ReflectionClass<any>> extends Comman
};

if (transaction) transaction.applyTransaction(cmd);
config.applyReadPreference(cmd);

return await this.sendAndWait<findAndModifySchema, FindAndModifyResponse>(cmd);
}
Expand Down
11 changes: 3 additions & 8 deletions packages/mongo/src/client/command/getMore.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import { UUID } from '@deepkit/type';
import { ReadPreferenceMessage, TransactionalMessage } from './command.js';

export interface GetMoreMessage {
export type GetMoreMessage = {
getMore: bigint;
$db: string;
collection: string;
batchSize?: number;
maxTimeMS?: number;
comment?: string;

lsid?: { id: UUID },
txnNumber?: number,
startTransaction?: boolean,
autocommit?: boolean,
}
} & TransactionalMessage & ReadPreferenceMessage;
Loading

0 comments on commit 6275c37

Please sign in to comment.