Skip to content

Commit

Permalink
feature(orm): custom typed database.raw() support
Browse files Browse the repository at this point in the history
this makes it possible to write custom SQL or custom pipelines (for Mongo) and get the correct type deserialised.

fix(mongo): handle getMore() correctly when batchSize is too big. Also allow to set batchSize in a query manually using `Query.withBatchSize(32)`. fixes #486
  • Loading branch information
marcj committed Oct 1, 2023
1 parent c21009a commit 8a45194
Show file tree
Hide file tree
Showing 23 changed files with 303 additions and 53 deletions.
58 changes: 55 additions & 3 deletions packages/mongo/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
* You should have received a copy of the MIT License along with this program.
*/

import { DatabaseAdapter, DatabaseAdapterQueryFactory, DatabaseEntityRegistry, DatabaseSession, OrmEntity } from '@deepkit/orm';
import { AbstractClassType, ClassType } from '@deepkit/core';
import { DatabaseAdapter, DatabaseAdapterQueryFactory, DatabaseEntityRegistry, DatabaseSession, FindQuery, ItemNotFound, OrmEntity, RawFactory } from '@deepkit/orm';
import { AbstractClassType, ClassType, isArray } from '@deepkit/core';
import { MongoDatabaseQuery } from './query.js';
import { MongoPersistence } from './persistence.js';
import { MongoClient } from './client/client.js';
Expand All @@ -19,7 +19,9 @@ import { MongoDatabaseTransaction } from './client/connection.js';
import { CreateIndex, CreateIndexesCommand } from './client/command/createIndexes.js';
import { DropIndexesCommand } from './client/command/dropIndexes.js';
import { CreateCollectionCommand } from './client/command/createCollection.js';
import { entity, ReceiveType, ReflectionClass } from '@deepkit/type';
import { entity, ReceiveType, ReflectionClass, resolveReceiveType } from '@deepkit/type';
import { Command } from './client/command/command.js';
import { AggregateCommand } from './client/command/aggregate.js';

export class MongoDatabaseQueryFactory extends DatabaseAdapterQueryFactory {
constructor(
Expand All @@ -35,6 +37,52 @@ export class MongoDatabaseQueryFactory extends DatabaseAdapterQueryFactory {
}
}

class MongoRawCommandQuery<T> implements FindQuery<T> {
constructor(
protected session: DatabaseSession<MongoDatabaseAdapter>,
protected client: MongoClient,
protected command: Command,
) {
}

async find(): Promise<T[]> {
const res = await this.client.execute(this.command);
return res as any;
}

async findOneOrUndefined(): Promise<T> {
const res = await this.client.execute(this.command);
if (isArray(res)) return res[0];
return res;
}

async findOne(): Promise<T> {
const item = await this.findOneOrUndefined();
if (!item) throw new ItemNotFound('Could not find item');
return item;
}
}

export class MongoRawFactory implements RawFactory<[Command]> {
constructor(
protected session: DatabaseSession<MongoDatabaseAdapter>,
protected client: MongoClient,
) {
}

create<Entity = any, ResultSchema = Entity>(
commandOrPipeline: Command | any[],
type?: ReceiveType<Entity>,
resultType?: ReceiveType<ResultSchema>,
): MongoRawCommandQuery<ResultSchema> {
type = resolveReceiveType(type);
const resultSchema = resultType ? resolveReceiveType(resultType) : undefined;

const command = isArray(commandOrPipeline) ? new AggregateCommand(ReflectionClass.from(type), commandOrPipeline, resultSchema) : commandOrPipeline;
return new MongoRawCommandQuery<ResultSchema>(this.session, this.client, command);
}
}

export class MongoDatabaseAdapter extends DatabaseAdapter {
public readonly client: MongoClient;

Expand All @@ -55,6 +103,10 @@ export class MongoDatabaseAdapter extends DatabaseAdapter {
this.ormSequences = ReflectionClass.from(OrmSequence);
}

rawFactory(session: DatabaseSession<this>): MongoRawFactory {
return new MongoRawFactory(session, this.client);
}

getName(): string {
return 'mongo';
}
Expand Down
39 changes: 30 additions & 9 deletions packages/mongo/src/client/command/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

import { toFastProperties } from '@deepkit/core';
import { BaseResponse, Command } from './command.js';
import { InlineRuntimeType, ReflectionClass, Type, typeOf, UUID } from '@deepkit/type';
import { getTypeJitContainer, InlineRuntimeType, isType, ReflectionClass, Type, typeOf, UUID } from '@deepkit/type';
import { MongoError } from '../error.js';
import { GetMoreMessage } from './getMore.js';

interface AggregateMessage {
aggregate: string;
Expand All @@ -28,29 +29,31 @@ interface AggregateMessage {

export class AggregateCommand<T, R = BaseResponse> extends Command {
partial: boolean = false;
batchSize: number = 1_000_000;

constructor(
public schema: ReflectionClass<T>,
public pipeline: any[] = [],
public resultSchema?: ReflectionClass<R>,
public resultSchema?: ReflectionClass<R> | Type,
) {
super();
}

async execute(config, host, transaction): Promise<R[]> {
const cmd = {
aggregate: this.schema.collectionName || this.schema.name || 'unknown',
aggregate: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
pipeline: this.pipeline,
cursor: {
batchSize: 1_000_000, //todo make configurable
batchSize: this.batchSize
}
};

if (transaction) transaction.applyTransaction(cmd);
const resultSchema = this.resultSchema || this.schema;
let resultSchema = this.resultSchema || this.schema;
if (resultSchema && !isType(resultSchema)) resultSchema = resultSchema.type;

const jit = resultSchema.getJitContainer();
const jit = getTypeJitContainer(resultSchema);
let specialisedResponse: Type | undefined = this.partial ? jit.mdbAggregatePartial : jit.mdbAggregate;

if (!specialisedResponse) {
Expand Down Expand Up @@ -82,14 +85,32 @@ export class AggregateCommand<T, R = BaseResponse> extends Command {
}

interface Response extends BaseResponse {
cursor: { id: BigInt, firstBatch?: any[], nextBatch?: any[] };
cursor: { id: bigint, firstBatch?: any[], nextBatch?: any[] };
}

const res = await this.sendAndWait<AggregateMessage, Response>(cmd, undefined, specialisedResponse);
if (!res.cursor.firstBatch) throw new MongoError(`No firstBatch received`);

//todo: implement fetchMore and decrease batchSize
return res.cursor.firstBatch;
const result: R[] = res.cursor.firstBatch;

let cursorId = res.cursor.id;
while (cursorId) {
const nextCommand = {
getMore: cursorId,
$db: cmd.$db,
collection: cmd.aggregate,
batchSize: cmd.cursor.batchSize,
};
if (transaction) transaction.applyTransaction(nextCommand);
const next = await this.sendAndWait<GetMoreMessage, Response>(nextCommand, undefined, specialisedResponse);

if (next.cursor.nextBatch) {
result.push(...next.cursor.nextBatch);
}
cursorId = next.cursor.id;
}

return result;
}

needsWritableHost(): boolean {
Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class CountCommand<T extends ReflectionClass<any>> extends Command {

async execute(config, host, transaction): Promise<number> {
const cmd: any = {
count: this.schema.collectionName || this.schema.name || 'unknown',
count: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
query: this.query,
limit: this.limit,
Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/createCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class CreateCollectionCommand<T extends ReflectionClass<any>> extends Com

async execute(config, host, transaction): Promise<BaseResponse> {
const cmd: any = {
create: this.schema.collectionName || this.schema.name || 'unknown',
create: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
};

Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/createIndexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class CreateIndexesCommand<T extends ReflectionClass<any>> extends Comman

async execute(config, host, transaction): Promise<BaseResponse> {
const cmd: any = {
createIndexes: this.schema.collectionName || this.schema.name || 'unknown',
createIndexes: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
indexes: this.indexes,
};
Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class DeleteCommand<T extends ReflectionClass<any>> extends Command {

async execute(config, host, transaction): Promise<number> {
const cmd = {
delete: this.schema.collectionName || this.schema.name || 'unknown',
delete: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
deletes: [
{
Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/dropIndexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class DropIndexesCommand<T extends ReflectionClass<any>> extends Command

async execute(config, host, transaction): Promise<BaseResponse> {
const cmd: any = {
dropIndexes: this.schema.collectionName || this.schema.name || 'unknown',
dropIndexes: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
index: this.names
};
Expand Down
31 changes: 26 additions & 5 deletions packages/mongo/src/client/command/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { toFastProperties } from '@deepkit/core';
import { DEEP_SORT } from '../../query.model.js';
import { InlineRuntimeType, ReflectionClass, ReflectionKind, typeOf, TypeUnion, UUID } from '@deepkit/type';
import { MongoError } from '../error.js';
import { GetMoreMessage } from './getMore.js';

interface FindSchema {
find: string;
Expand All @@ -30,6 +31,8 @@ interface FindSchema {
}

export class FindCommand<T> extends Command {
batchSize: number = 1_000_000;

constructor(
public schema: ReflectionClass<T>,
public filter: { [name: string]: any } = {},
Expand All @@ -43,12 +46,12 @@ export class FindCommand<T> extends Command {

async execute(config, host, transaction): Promise<T[]> {
const cmd: FindSchema = {
find: this.schema.collectionName || this.schema.name || 'unknown',
find: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
filter: this.filter,
limit: this.limit,
skip: this.skip,
batchSize: 1_000_000, //todo make configurable
batchSize: this.batchSize,
};

if (transaction) transaction.applyTransaction(cmd);
Expand Down Expand Up @@ -119,14 +122,32 @@ export class FindCommand<T> extends Command {
}

interface Response extends BaseResponse {
cursor: { id: BigInt, firstBatch?: any[], nextBatch?: any[] };
cursor: { id: bigint, firstBatch?: any[], nextBatch?: any[] };
}

const res = await this.sendAndWait<FindSchema, Response>(cmd, undefined, specialisedResponse);
if (!res.cursor.firstBatch) throw new MongoError(`No firstBatch received`);

//todo: implement fetchMore and decrease batchSize
return res.cursor.firstBatch;
const result: T[] = res.cursor.firstBatch;

let cursorId = res.cursor.id;
while (cursorId) {
const nextCommand = {
getMore: cursorId,
$db: cmd.$db,
collection: cmd.find,
batchSize: cmd.batchSize,
};
if (transaction) transaction.applyTransaction(nextCommand);
const next = await this.sendAndWait<GetMoreMessage, Response>(nextCommand, undefined, specialisedResponse);

if (next.cursor.nextBatch) {
result.push(...next.cursor.nextBatch);
}
cursorId = next.cursor.id;
}

return result;
}

needsWritableHost(): boolean {
Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/findAndModify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class FindAndModifyCommand<T extends ReflectionClass<any>> extends Comman
for (const name of this.fields) fields[name] = 1;

const cmd: any = {
findAndModify: this.schema.collectionName || this.schema.name || 'unknown',
findAndModify: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
query: this.query,
update: this.update,
Expand Down
15 changes: 15 additions & 0 deletions packages/mongo/src/client/command/getMore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { UUID } from '@deepkit/type';

export interface GetMoreMessage {
getMore: bigint;
$db: string;
collection: string;
batchSize?: number;
maxTimeMS?: number;
comment?: string;

lsid?: { id: UUID },
txnNumber?: number,
startTransaction?: boolean,
autocommit?: boolean,
}
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/insert.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class InsertCommand<T> extends Command {

async execute(config, host, transaction): Promise<number> {
const cmd: any = {
insert: this.schema.collectionName || this.schema.name || 'unknown',
insert: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
documents: this.documents,
};
Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/command/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class UpdateCommand<T extends ReflectionClass<any>> extends Command {

async execute(config, host, transaction): Promise<number> {
const cmd = {
update: this.schema.collectionName || this.schema.name || 'unknown',
update: this.schema.getCollectionName() || 'unknown',
$db: this.schema.databaseSchemaName || config.defaultDb || 'admin',
updates: this.updates
};
Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export class MongoClientConfig {
}

resolveCollectionName(schema: ReflectionClass<any>): string {
return schema.collectionName || schema.name || 'unknown';
return schema.getCollectionName() || 'unknown';
}

@singleStack()
Expand Down
1 change: 1 addition & 0 deletions packages/mongo/src/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ export class MongoDatabaseTransaction extends DatabaseTransaction {

async begin() {
if (!this.connection) return;
// see https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst
this.lsid = { id: uuid() };
this.txnNumber = MongoDatabaseTransaction.txnNumber++;
// const res = await this.connection.execute(new StartSessionCommand());
Expand Down
7 changes: 5 additions & 2 deletions packages/mongo/src/query.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ export class MongoQueryResolver<T extends OrmEntity> extends GenericQueryResolve
const pipeline = this.buildAggregationPipeline(model);
const resultsSchema = model.isAggregate() ? this.getCachedAggregationSchema(model) : this.getSchemaWithJoins();
const command = new AggregateCommand(this.classSchema, pipeline, resultsSchema);
if (model.batchSize) command.batchSize = model.batchSize;
command.partial = model.isPartial();
const items = await connection.execute(command);
if (model.isAggregate()) {
Expand All @@ -318,14 +319,16 @@ export class MongoQueryResolver<T extends OrmEntity> extends GenericQueryResolve

return items.map(v => formatter.hydrate(model, v));
} else {
const items = await connection.execute(new FindCommand(
const command = new FindCommand(
this.classSchema,
getMongoFilter(this.classSchema, model),
this.getProjection(this.classSchema, model.select),
this.getSortFromModel(model.sort),
model.limit,
model.skip,
));
);
if (model.batchSize) command.batchSize = model.batchSize;
const items = await connection.execute(command);
return items.map(v => formatter.hydrate(model, v));
}
} finally {
Expand Down
Loading

0 comments on commit 8a45194

Please sign in to comment.