From cdb7256b34f1d9de16145dd79b307ccf45f7c72f Mon Sep 17 00:00:00 2001 From: "Marc J. Schmidt" Date: Fri, 2 Feb 2024 12:23:48 +0100 Subject: [PATCH] feat(orm): onDatabaseError event This new event token allows to hook into various errors that are thrown when working with the `Database` abstraction and their adapter. For example, a user can now listen to connection errors, or on errors from a query, or from the unit of work. ```typescript const database = new Database(new Adapter()); database.listen(onDatabaseError, (event) => { if (event.error instanceof MongoConnectionError) { // connection could not be established } // event.query/event.classSchema is set if it originated from a query }); ``` In general event.error could contain anything, is from subclass `DatabaseError` if it was related to database operations there were correctly wrapped by the adapter. Various specialisation errors that extends DatabaseError exist in various adapters like e.g. MongoConnectionError and MongoDatabaseError. The event is of type DatabaseErrorEvent, but has specialisations for unit of work errors DatabaseErrorInsertEvent and DatabaseErrorUpdateEvent with more information available. --- packages/core/src/core.ts | 18 ++++ packages/event/src/event.ts | 2 +- packages/mongo/src/adapter.ts | 37 +++++--- packages/mongo/src/client/command/command.ts | 8 +- packages/mongo/src/client/connection.ts | 35 ++++---- packages/mongo/src/client/error.ts | 72 ++++++++-------- packages/mongo/tests/client/client.spec.ts | 15 ++++ packages/mongo/tests/database.spec.ts | 70 +++++++++++++++- packages/orm/src/database-session.ts | 27 ++++-- packages/orm/src/database.ts | 1 - packages/orm/src/event.ts | 48 +++++++++-- packages/orm/src/memory-db.ts | 88 ++++++++++---------- packages/orm/src/query-filter.ts | 2 +- packages/orm/src/query.ts | 82 +++++++++--------- packages/orm/tests/events.spec.ts | 74 ++++++++++++++++ 15 files changed, 411 insertions(+), 168 deletions(-) diff --git a/packages/core/src/core.ts b/packages/core/src/core.ts index 594fb9600..ee06bac3b 100644 --- a/packages/core/src/core.ts +++ b/packages/core/src/core.ts @@ -825,3 +825,21 @@ export function formatError(error: any): string { return String(error); } + +/** + * Asserts that the given object is an instance of the given class. + */ +export function assertInstanceOf(object: any, constructor: { new (...args: any[]): T }): asserts object is T { + if (!(object instanceof constructor)) { + throw new Error(`Object ${getClassName(object)} is not an instance of the expected class ${getClassName(constructor)}`); + } +} + +/** + * Asserts that the given value is defined (not null and not undefined). + */ +export function assertDefined(value: T): asserts value is NonNullable { + if (value === null || value === undefined) { + throw new Error(`Value is not defined`); + } +} diff --git a/packages/event/src/event.ts b/packages/event/src/event.ts index 3a44e3268..486f37086 100644 --- a/packages/event/src/event.ts +++ b/packages/event/src/event.ts @@ -19,7 +19,7 @@ import { ReflectionClass } from '@deepkit/type'; -export type EventListenerCallback = (event: T, ...args: any[]) => void | Promise; +export type EventListenerCallback = (event: T, ...args: any[]) => any | Promise; export class EventError extends CustomError { } diff --git a/packages/mongo/src/adapter.ts b/packages/mongo/src/adapter.ts index bb0feb42e..67978a8b0 100644 --- a/packages/mongo/src/adapter.ts +++ b/packages/mongo/src/adapter.ts @@ -12,12 +12,14 @@ import { DatabaseAdapter, DatabaseAdapterQueryFactory, DatabaseEntityRegistry, + DatabaseErrorEvent, DatabaseSession, FindQuery, ItemNotFound, MigrateOptions, + onDatabaseError, OrmEntity, - RawFactory + RawFactory, } from '@deepkit/orm'; import { AbstractClassType, ClassType, isArray } from '@deepkit/core'; import { MongoDatabaseQuery } from './query.js'; @@ -56,20 +58,35 @@ class MongoRawCommandQuery implements FindQuery { } async find(): Promise { - const res = await this.client.execute(this.command); - return res as any; + try { + const res = await this.client.execute(this.command); + return res as any; + } catch (error: any) { + await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session)); + throw error; + } } async findOneOrUndefined(): Promise { - const res = await this.client.execute(this.command); - if (isArray(res)) return res[0]; - return res; + try { + const res = await this.client.execute(this.command); + if (isArray(res)) return res[0]; + return res; + } catch (error: any) { + await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session)); + throw error; + } } async findOne(): Promise { - const item = await this.findOneOrUndefined(); - if (!item) throw new ItemNotFound('Could not find item'); - return item; + try { + const item = await this.findOneOrUndefined(); + if (!item) throw new ItemNotFound('Could not find item'); + return item; + } catch (error: any) { + await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session)); + throw error; + } } } @@ -99,7 +116,7 @@ export class MongoDatabaseAdapter extends DatabaseAdapter { protected ormSequences: ReflectionClass; constructor( - connectionString: string + connectionString: string, ) { super(); this.client = new MongoClient(connectionString); diff --git a/packages/mongo/src/client/command/command.ts b/packages/mongo/src/client/command/command.ts index cfe98bf4c..9b1ccdc93 100644 --- a/packages/mongo/src/client/command/command.ts +++ b/packages/mongo/src/client/command/command.ts @@ -9,14 +9,13 @@ */ import { asyncOperation, getClassName } from '@deepkit/core'; -import { handleErrorResponse, MongoError } from '../error.js'; +import { handleErrorResponse, MongoDatabaseError, MongoError } from '../error.js'; import { MongoClientConfig } from '../config.js'; import { Host } from '../host.js'; import type { MongoDatabaseTransaction } from '../connection.js'; import { ReceiveType, ReflectionClass, resolveReceiveType, SerializationError, stringifyType, Type, typeOf, typeSettings, UnpopulatedCheck, ValidationError } from '@deepkit/type'; import { BSONDeserializer, deserializeBSONWithoutOptimiser, getBSONDeserializer } from '@deepkit/bson'; import { mongoBinarySerializer } from '../../mongo-serializer.js'; -import { inspect } from 'util'; export interface CommandMessageResponseCallbackResult { /** @@ -55,7 +54,7 @@ export abstract class Command { public sender?: (schema: Type, message: T) => void; public sendAndWait( - message: T, messageType?: ReceiveType, responseType?: ReceiveType + message: T, messageType?: ReceiveType, responseType?: ReceiveType, ): Promise { if (!this.sender) throw new Error(`No sender set in command ${getClassName(this)}`); this.sender(resolveReceiveType(messageType), message); @@ -84,7 +83,7 @@ export abstract class Command { } if (!message.ok) { - this.current.reject(new MongoError(message.errmsg || 'error', message.code)); + this.current.reject(Object.assign(new MongoDatabaseError(message.errmsg || 'error'), { code: message.code })); } else { this.current.resolve(message); } @@ -92,7 +91,6 @@ export abstract class Command { if (error instanceof ValidationError || error instanceof SerializationError) { if (this.current.responseType) { const raw = deserializeBSONWithoutOptimiser(response); - console.log('mongo raw response', inspect(raw, { depth: null })); if (raw.errmsg && raw.ok === 0) { const error = handleErrorResponse(raw); if (error) { diff --git a/packages/mongo/src/client/connection.ts b/packages/mongo/src/client/connection.ts index 2a1ba0989..c0148cb43 100644 --- a/packages/mongo/src/client/connection.ts +++ b/packages/mongo/src/client/connection.ts @@ -8,7 +8,7 @@ * You should have received a copy of the MIT License along with this program. */ -import { arrayRemoveItem, asyncOperation } from '@deepkit/core'; +import { arrayRemoveItem, asyncOperation, formatError } from '@deepkit/core'; import { Host } from './host.js'; import { createConnection, Socket } from 'net'; import { connect as createTLSConnection, TLSSocket } from 'tls'; @@ -17,8 +17,7 @@ import { stringifyType, Type, uuid } from '@deepkit/type'; import { BSONBinarySerializer, getBSONSerializer, getBSONSizer, Writer } from '@deepkit/bson'; import { HandshakeCommand } from './command/handshake.js'; import { MongoClientConfig } from './config.js'; -import { MongoError } from './error.js'; - +import { MongoConnectionError, MongoError } from './error.js'; import { DatabaseTransaction } from '@deepkit/orm'; import { CommitTransactionCommand } from './command/commitTransaction.js'; import { AbortTransactionCommand } from './command/abortTransaction.js'; @@ -59,7 +58,7 @@ export class MongoConnectionPool { */ public connections: MongoConnection[] = []; - protected queue: {resolve: (connection: MongoConnection) => void, request: ConnectionRequest}[] = []; + protected queue: { resolve: (connection: MongoConnection) => void, request: ConnectionRequest }[] = []; protected nextConnectionClose: Promise = Promise.resolve(true); @@ -88,7 +87,7 @@ export class MongoConnectionPool { await Promise.allSettled(promises); } } catch (error: any) { - throw new MongoError('Failed to connect: ' + error.message); + throw new MongoConnectionError(`Failed to connect: ${formatError(error)}`); } } @@ -132,7 +131,7 @@ export class MongoConnectionPool { if (request.readonly && host.isReadable()) return host; } - throw new MongoError(`Could not find host for connection request. (readonly=${request.readonly}, hosts=${hosts.length}). Last Error: ${this.lastError}`); + throw new MongoConnectionError(`Could not find host for connection request. (readonly=${request.readonly}, hosts=${hosts.length}). Last Error: ${this.lastError}`); } protected createAdditionalConnectionForRequest(request: ConnectionRequest): MongoConnection { @@ -203,7 +202,7 @@ export class MongoConnectionPool { if (!connection.isConnected()) continue; if (connection.reserved) continue; - if (request.nearest) throw new Error('Nearest not implemented yet'); + if (request.nearest) throw new MongoConnectionError('Nearest not implemented yet'); if (!this.matchRequest(connection, r)) continue; @@ -225,7 +224,7 @@ export class MongoConnectionPool { return asyncOperation((resolve) => { this.stats.connectionsQueued++; - this.queue.push({resolve, request: r}); + this.queue.push({ resolve, request: r }); }); } } @@ -265,7 +264,7 @@ export class MongoDatabaseTransaction extends DatabaseTransaction { async commit() { if (!this.connection) return; - if (this.ended) throw new Error('Transaction ended already'); + if (this.ended) throw new MongoError('Transaction ended already'); await this.connection.execute(new CommitTransactionCommand()); this.ended = true; @@ -274,7 +273,7 @@ export class MongoDatabaseTransaction extends DatabaseTransaction { async rollback() { if (!this.connection) return; - if (this.ended) throw new Error('Transaction ended already'); + if (this.ended) throw new MongoError('Transaction ended already'); if (!this.started) return; await this.connection.execute(new AbortTransactionCommand()); @@ -321,7 +320,7 @@ export class MongoConnection { host: host.hostname, port: host.port, timeout: config.options.connectTimeoutMS, - servername: host.hostname + servername: host.hostname, }; const optional = { ca: config.options.tlsCAFile, @@ -343,7 +342,7 @@ export class MongoConnection { this.socket = createConnection({ host: host.hostname, port: host.port, - timeout: config.options.connectTimeoutMS + timeout: config.options.connectTimeoutMS, }); this.socket.on('data', (data) => this.responseParser.feed(data)); @@ -410,7 +409,7 @@ export class MongoConnection { // const offset = 16 + 4 + 8 + 4 + 4; //QUERY_REPLY const message = response.slice(offset, size); - if (!this.lastCommand) throw new Error(`Got a server response without active command`); + if (!this.lastCommand) throw new MongoError(`Got a server response without active command`); this.lastCommand.command.handleResponse(message); } @@ -422,7 +421,7 @@ export class MongoConnection { */ public async execute(command: T): Promise> { if (this.status === MongoConnectionStatus.pending) await this.connect(); - if (this.status === MongoConnectionStatus.disconnected) throw new Error('Disconnected'); + if (this.status === MongoConnectionStatus.disconnected) throw new MongoError('Disconnected'); if (this.lastCommand && this.lastCommand.promise) { await this.lastCommand.promise; @@ -487,7 +486,7 @@ export class MongoConnection { } async connect(): Promise { - if (this.status === MongoConnectionStatus.disconnected) throw new Error('Connection disconnected'); + if (this.status === MongoConnectionStatus.disconnected) throw new MongoError('Connection disconnected'); if (this.status !== MongoConnectionStatus.pending) return; this.status = MongoConnectionStatus.connecting; @@ -496,7 +495,7 @@ export class MongoConnection { this.socket.on('error', (error) => { this.connectingPromise = undefined; this.status = MongoConnectionStatus.disconnected; - reject(new MongoError('Connection error: ' + error.message)); + reject(new MongoConnectionError(formatError(error.message))); }); if (this.socket.destroyed) { @@ -526,7 +525,7 @@ export class ResponseParser { protected currentMessageSize: number = 0; constructor( - protected readonly onMessage: (response: Uint8Array) => void + protected readonly onMessage: (response: Uint8Array) => void, ) { } @@ -586,7 +585,7 @@ export class ResponseParser { } const nextCurrentSize = readUint32LE(currentBuffer); - if (nextCurrentSize <= 0) throw new Error('message size wrong'); + if (nextCurrentSize <= 0) throw new MongoError('message size wrong'); currentSize = nextCurrentSize; //buffer and size has been set. consume this message in the next loop iteration } diff --git a/packages/mongo/src/client/error.ts b/packages/mongo/src/client/error.ts index bbc1b17c5..8db0cc98a 100644 --- a/packages/mongo/src/client/error.ts +++ b/packages/mongo/src/client/error.ts @@ -8,7 +8,6 @@ * You should have received a copy of the MIT License along with this program. */ -import { CustomError } from '@deepkit/core'; import { BaseResponse } from './command/command.js'; import { DatabaseError, UniqueConstraintFailure } from '@deepkit/orm'; @@ -25,16 +24,14 @@ export function handleErrorResponse(response: BaseResponse): DatabaseError | und } if (message) { - return new DatabaseError(message); + return Object.assign(new MongoDatabaseError(message), { code: response.code || 0 }); } return; } -export class MongoError extends CustomError { - constructor(message: string, public readonly code?: number) { - super(message); - } +export class MongoError extends DatabaseError { + public readonly code: number = 0; toString() { if (this.code) return `[${this.code}] ${this.message}`; @@ -42,6 +39,22 @@ export class MongoError extends CustomError { } } +/** + * When a tcp/connection issue happened. + */ +export class MongoConnectionError extends MongoError { + +} + +/** + * When the Mongo server returns an error with code, + * generally from database.raw or database.query. + */ +export class MongoDatabaseError extends MongoError { + +} + + //https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#determining-retryable-errors const retryableWrites: number[] = [ 11600, @@ -55,7 +68,7 @@ const retryableWrites: number[] = [ 6, 89, 9001, - 262 + 262, ]; export function isErrorRetryableWrite(error: any): boolean { @@ -68,19 +81,20 @@ export function isErrorRetryableWrite(error: any): boolean { // https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.rst#retryable-error const retryableReads: number[] = [ - 11600, - 11602, - 10107, - 13435, - 13436, - 189, - 91, - 7, - 6, - 89, - 9001 -] - ; + 11600, + 11602, + 10107, + 13435, + 13436, + 189, + 91, + 7, + 6, + 89, + 9001, + ] +; + export function isErrorRetryableRead(error: any): boolean { if (error instanceof MongoError && error.code) { return retryableReads.includes(error.code); @@ -88,21 +102,3 @@ export function isErrorRetryableRead(error: any): boolean { return false; } - -/** - * When a tcp/connection issue happened. - */ -export class MongoConnectionError extends MongoError { - -} - -/** - * When the error came from the server `errmsg`. - */ -export class MongoCommandError extends MongoError { - -} - -export class MongoFindConnectionTimeOut extends MongoError { - -} diff --git a/packages/mongo/tests/client/client.spec.ts b/packages/mongo/tests/client/client.spec.ts index db94135c4..52f591746 100644 --- a/packages/mongo/tests/client/client.spec.ts +++ b/packages/mongo/tests/client/client.spec.ts @@ -6,6 +6,8 @@ import { sleep } from '@deepkit/core'; import { ConnectionOptions } from '../../src/client/options.js'; import { cast, validatedDeserialize } from '@deepkit/type'; import { createConnection } from 'net'; +import { fail } from 'assert'; +import { MongoConnectionError, MongoError } from '../../src/client/error.js'; jest.setTimeout(60000); @@ -27,6 +29,19 @@ test('connect invalid', async () => { client.close(); }); +test('connect invalid 2', async () => { + const client = new MongoClient('mongodb://invalid/'); + + try { + await client.connect(); + fail('should fail'); + } catch (error) { + expect(error).toBeInstanceOf(MongoConnectionError); + } + + client.close(); +}); + test('connect valid', async () => { const client = new MongoClient('mongodb://localhost/'); diff --git a/packages/mongo/tests/database.spec.ts b/packages/mongo/tests/database.spec.ts index fc98167d3..d6e72cb3a 100644 --- a/packages/mongo/tests/database.spec.ts +++ b/packages/mongo/tests/database.spec.ts @@ -1,7 +1,9 @@ import { expect, test } from '@jest/globals'; -import { Database } from '@deepkit/orm'; +import { Database, DatabaseErrorEvent, onDatabaseError } from '@deepkit/orm'; import { MongoDatabaseAdapter } from '../src/adapter.js'; import { entity, MongoId, PrimaryKey } from '@deepkit/type'; +import { MongoConnectionError, MongoDatabaseError } from '../src/client/error.js'; +import { assertDefined, assertInstanceOf } from '@deepkit/core'; test('simple', async () => { @entity.name('asd') @@ -117,3 +119,69 @@ test('session', async () => { database.disconnect(); }); + +test('errors connect', async () => { + class Test { + _id: MongoId & PrimaryKey = ''; + + constructor(public name: string) { + } + } + + const database = new Database(new MongoDatabaseAdapter('mongodb://invalid/test')); + let called: DatabaseErrorEvent | undefined; + database.listen(onDatabaseError, (event) => { + called = event; + }); + + await expect(() => database.query(Test).findOne()).rejects.toThrow('MongoConnectionError: getaddrinfo ENOTFOUND invalid'); + await expect(() => database.query(Test).findOne()).rejects.toBeInstanceOf(MongoConnectionError); + + assertDefined(called); + assertInstanceOf(called.error, MongoConnectionError); + expect(called.error.message).toBe('Failed to connect: MongoConnectionError: getaddrinfo ENOTFOUND invalid'); +}); + +test('errors raw', async () => { + class Test { + _id: MongoId & PrimaryKey = ''; + + constructor(public name: string) { + } + } + + const database = new Database(new MongoDatabaseAdapter('mongodb://127.0.0.1/test')); + let called: DatabaseErrorEvent | undefined; + database.listen(onDatabaseError, (event) => { + called = event; + }); + + await expect(() => database.raw([{$invalid: 1}]).find()).rejects.toThrow('Unrecognized pipeline stage name'); + + assertDefined(called); + assertInstanceOf(called.error, MongoDatabaseError); + expect(called.error.code).toBe(40324); + expect(called.error.message).toContain('Unrecognized pipeline stage nam'); +}); + +test('errors query', async () => { + class Test { + _id: MongoId & PrimaryKey = ''; + + constructor(public name: string) { + } + } + + const database = new Database(new MongoDatabaseAdapter('mongodb://127.0.0.1/test')); + let called: DatabaseErrorEvent | undefined; + database.listen(onDatabaseError, (event) => { + called = event; + }); + + await expect(() => database.query().filter({$invalid: 1}).find()).rejects.toThrow('unknown top level operator: $invalid'); + + assertDefined(called); + assertInstanceOf(called.error, MongoDatabaseError); + expect(called.error.code).toBe(2); + expect(called.error.message).toContain('unknown top level operator: $invalid'); +}); diff --git a/packages/orm/src/database-session.ts b/packages/orm/src/database-session.ts index e650f2f03..e7e5888e0 100644 --- a/packages/orm/src/database-session.ts +++ b/packages/orm/src/database-session.ts @@ -21,14 +21,14 @@ import { ReflectionClass, typeSettings, UnpopulatedCheck, - validate + validate, } from '@deepkit/type'; import { GroupArraySort } from '@deepkit/topsort'; import { getClassState, getInstanceState, getNormalizedPrimaryKey, IdentityMap } from './identity-map.js'; import { getClassSchemaInstancePairs } from './utils.js'; import { HydratorFn } from './formatter.js'; import { getReference } from './reference.js'; -import { UnitOfWorkCommitEvent, UnitOfWorkEvent, UnitOfWorkUpdateEvent } from './event.js'; +import { DatabaseErrorInsertEvent, DatabaseErrorUpdateEvent, onDatabaseError, UnitOfWorkCommitEvent, UnitOfWorkEvent, UnitOfWorkUpdateEvent } from './event.js'; import { DatabaseLogger } from './logger.js'; import { Stopwatch } from '@deepkit/stopwatch'; import { EventDispatcher, EventDispatcherInterface, EventToken } from '@deepkit/event'; @@ -201,7 +201,16 @@ export class DatabaseSessionRound { if (event.stopped) doInsert = false; } if (doInsert) { - await persistence.insert(group.type, inserts); + try { + await persistence.insert(group.type, inserts); + } catch (error: any) { + await this.eventDispatcher.dispatch(onDatabaseError, Object.assign( + new DatabaseErrorInsertEvent(error, this.session, classState.classSchema), + { inserts }, + )); + throw error; + } + if (this.eventDispatcher.hasListeners(DatabaseSession.onInsertPost)) { await this.eventDispatcher.dispatch(DatabaseSession.onInsertPost, new UnitOfWorkEvent(group.type, this.session, inserts)); } @@ -217,7 +226,15 @@ export class DatabaseSessionRound { } if (doUpdate) { - await persistence.update(group.type, changeSets); + try { + await persistence.update(group.type, changeSets); + } catch (error: any) { + await this.eventDispatcher.dispatch(onDatabaseError, Object.assign( + new DatabaseErrorUpdateEvent(error, this.session, classState.classSchema), + { changeSets }, + )); + throw error; + } if (this.eventDispatcher.hasListeners(DatabaseSession.onUpdatePost)) { await this.eventDispatcher.dispatch(DatabaseSession.onUpdatePost, new UnitOfWorkUpdateEvent(group.type, this.session, changeSets)); @@ -504,7 +521,7 @@ export class DatabaseSession { Object.defineProperty(item, property.symbol, { enumerable: false, configurable: true, - value: itemDB[property.getNameAsString() as keyof T] + value: itemDB[property.getNameAsString() as keyof T], }); } } diff --git a/packages/orm/src/database.ts b/packages/orm/src/database.ts index 57b8567f1..405af16d8 100644 --- a/packages/orm/src/database.ts +++ b/packages/orm/src/database.ts @@ -89,7 +89,6 @@ function setupVirtualForeignKey(database: Database, virtualForeignKeyConstraint: await virtualForeignKeyConstraint.onQueryDelete(event); }); } - /** * Database abstraction. Use createSession() to create a work session with transaction support. * diff --git a/packages/orm/src/event.ts b/packages/orm/src/event.ts index 52275c8d0..6579eaa60 100644 --- a/packages/orm/src/event.ts +++ b/packages/orm/src/event.ts @@ -9,7 +9,7 @@ */ import { ClassType } from '@deepkit/core'; -import { BaseEvent } from '@deepkit/event'; +import { BaseEvent, EventToken } from '@deepkit/event'; import type { Changes } from '@deepkit/type'; import { PrimaryKeyType, ReflectionClass } from '@deepkit/type'; import type { DatabasePersistenceChangeSet } from './database-adapter.js'; @@ -32,7 +32,7 @@ export class DatabaseEvent extends BaseEvent { export class UnitOfWorkCommitEvent extends DatabaseEvent { constructor( - public readonly databaseSession: DatabaseSession + public readonly databaseSession: DatabaseSession, ) { super(); } @@ -42,7 +42,7 @@ export class UnitOfWorkEvent extends DatabaseEvent { constructor( public readonly classSchema: ReflectionClass, public readonly databaseSession: DatabaseSession, - public readonly items: T[] + public readonly items: T[], ) { super(); } @@ -65,7 +65,7 @@ export class UnitOfWorkUpdateEvent extends DatabaseEvent { constructor( public readonly classSchema: ReflectionClass, public readonly databaseSession: DatabaseSession, - public readonly changeSets: DatabasePersistenceChangeSet[] + public readonly changeSets: DatabasePersistenceChangeSet[], ) { super(); } @@ -79,7 +79,7 @@ export class QueryDatabaseEvent extends DatabaseEvent { constructor( public readonly databaseSession: DatabaseSession, public readonly classSchema: ReflectionClass, - public query: Query + public query: Query, ) { super(); } @@ -89,12 +89,46 @@ export class QueryDatabaseEvent extends DatabaseEvent { } } +export class DatabaseErrorEvent extends DatabaseEvent { + constructor( + public readonly error: Error, + public readonly databaseSession: DatabaseSession, + public readonly classSchema?: ReflectionClass, + public readonly query?: Query, + ) { + super(); + } +} + +/** + * Error event emitted when unit of work commit failed inserting new items. + */ +export class DatabaseErrorInsertEvent extends DatabaseErrorEvent { + inserts: OrmEntity[] = []; +} + +/** + * Error event emitted when unit of work commit failed updating existing items. + */ +export class DatabaseErrorUpdateEvent extends DatabaseErrorEvent { + changeSets: DatabasePersistenceChangeSet[] = []; +} + +/** + * This event is emitted when an error occurs in async database operation, like query, commit, connect, etc. + * In event.databaseSession.adapter you can access the adapter that caused the error. + * In event.error you can access the caught error. + * In event.classSchema and event.query you might find additional context, but not necessarily. + */ +export const onDatabaseError = new EventToken('database.error'); + + export class QueryDatabaseDeleteEvent extends DatabaseEvent { constructor( public readonly databaseSession: DatabaseSession, public readonly classSchema: ReflectionClass, public query: Query, - public readonly deleteResult: DeleteResult + public readonly deleteResult: DeleteResult, ) { super(); } @@ -112,7 +146,7 @@ export class QueryDatabasePatchEvent extends DatabaseEvent { public readonly classSchema: ReflectionClass, public query: Query, public readonly patch: Changes, - public readonly patchResult: PatchResult + public readonly patchResult: PatchResult, ) { super(); } diff --git a/packages/orm/src/memory-db.ts b/packages/orm/src/memory-db.ts index 660097043..dd26374b6 100644 --- a/packages/orm/src/memory-db.ts +++ b/packages/orm/src/memory-db.ts @@ -243,6 +243,50 @@ export class MemoryDatabaseTransaction extends DatabaseTransaction { } } +export class MemoryPersistence extends DatabasePersistence { + constructor(private adapter: MemoryDatabaseAdapter) { + super(); + } + + async remove(classSchema: ReflectionClass, items: T[]): Promise { + const store = this.adapter.getStore(classSchema); + + const primaryKey = classSchema.getPrimary().name as keyof T; + for (const item of items) { + store.items.delete(item[primaryKey] as any); + } + } + + async insert(classSchema: ReflectionClass, items: T[]): Promise { + const store = this.adapter.getStore(classSchema); + const serializer = getSerializeFunction(classSchema.type, memorySerializer.serializeRegistry); + const autoIncrement = classSchema.getAutoIncrement(); + + const primaryKey = classSchema.getPrimary().name as keyof T; + for (const item of items) { + if (autoIncrement) { + store.autoIncrement++; + item[autoIncrement.name as keyof T & string] = store.autoIncrement as any; + } + store.items.set(item[primaryKey] as any, serializer(item)); + } + } + + async update(classSchema: ReflectionClass, changeSets: DatabasePersistenceChangeSet[]): Promise { + const store = this.adapter.getStore(classSchema); + const serializer = getSerializeFunction(classSchema.type, memorySerializer.serializeRegistry); + const primaryKey = classSchema.getPrimary().name as keyof T; + + for (const changeSet of changeSets) { + store.items.set(changeSet.item[primaryKey] as any, serializer(changeSet.item)); + } + } + + async release() { + + } +} + export class MemoryDatabaseAdapter extends DatabaseAdapter { protected store = new Map, SimpleStore>(); @@ -267,49 +311,7 @@ export class MemoryDatabaseAdapter extends DatabaseAdapter { } createPersistence(): DatabasePersistence { - const adapter = this; - - class Persistence extends DatabasePersistence { - async remove(classSchema: ReflectionClass, items: T[]): Promise { - const store = adapter.getStore(classSchema); - - const primaryKey = classSchema.getPrimary().name as keyof T; - for (const item of items) { - store.items.delete(item[primaryKey] as any); - } - } - - async insert(classSchema: ReflectionClass, items: T[]): Promise { - const store = adapter.getStore(classSchema); - const serializer = getSerializeFunction(classSchema.type, memorySerializer.serializeRegistry); - const autoIncrement = classSchema.getAutoIncrement(); - - const primaryKey = classSchema.getPrimary().name as keyof T; - for (const item of items) { - if (autoIncrement) { - store.autoIncrement++; - item[autoIncrement.name as keyof T & string] = store.autoIncrement as any; - } - store.items.set(item[primaryKey] as any, serializer(item)); - } - } - - async update(classSchema: ReflectionClass, changeSets: DatabasePersistenceChangeSet[]): Promise { - const store = adapter.getStore(classSchema); - const serializer = getSerializeFunction(classSchema.type, memorySerializer.serializeRegistry); - const primaryKey = classSchema.getPrimary().name as keyof T; - - for (const changeSet of changeSets) { - store.items.set(changeSet.item[primaryKey] as any, serializer(changeSet.item)); - } - } - - async release() { - - } - } - - return new Persistence; + return new MemoryPersistence(this); } disconnect(force?: boolean): void { diff --git a/packages/orm/src/query-filter.ts b/packages/orm/src/query-filter.ts index 546efe636..c5e5b202b 100644 --- a/packages/orm/src/query-filter.ts +++ b/packages/orm/src/query-filter.ts @@ -134,7 +134,7 @@ export function convertQueryFilter convertQueryFilter(classType, v, converter, fieldNamesMap, customMapping)); continue; } diff --git a/packages/orm/src/query.ts b/packages/orm/src/query.ts index 05f4b818a..7aa1a2a6f 100644 --- a/packages/orm/src/query.ts +++ b/packages/orm/src/query.ts @@ -21,11 +21,11 @@ import { ReflectionClass, ReflectionKind, ReflectionProperty, - resolveForeignReflectionClass + resolveForeignReflectionClass, } from '@deepkit/type'; import { DatabaseAdapter } from './database-adapter.js'; import { DatabaseSession } from './database-session.js'; -import { QueryDatabaseDeleteEvent, QueryDatabaseEvent, QueryDatabasePatchEvent } from './event.js'; +import { DatabaseErrorEvent, onDatabaseError, QueryDatabaseDeleteEvent, QueryDatabaseEvent, QueryDatabasePatchEvent } from './event.js'; import { DeleteResult, OrmEntity, PatchResult } from './type.js'; import { FieldName, FlattenIfArray, Replace, Resolve } from './utils.js'; import { FrameCategory } from '@deepkit/stopwatch'; @@ -225,7 +225,7 @@ export class BaseQuery { constructor( public readonly classSchema: ReflectionClass, - model?: DatabaseQueryModel + model?: DatabaseQueryModel, ) { this.model = model || this.createModel(); } @@ -675,7 +675,7 @@ export class Query extends BaseQuery { constructor( classSchema: ReflectionClass, protected session: DatabaseSession, - protected resolver: GenericQueryResolver + protected resolver: GenericQueryResolver, ) { super(classSchema); this.model.withIdentityMap = session.withIdentityMap; @@ -690,7 +690,7 @@ export class Query extends BaseQuery { } public lift>, T extends ReturnType['_']>, THIS extends Query & { _: () => T }>( - this: THIS, query: B + this: THIS, query: B, ): Replace, Resolve> & Pick> { const base = this['constructor'] as ClassType; //we create a custom class to have our own prototype @@ -759,56 +759,56 @@ export class Query extends BaseQuery { } public async count(fromHas: boolean = false): Promise { - if (!this.session.stopwatch) { - const query = this.onQueryResolve(await this.callOnFetchEvent(this)); - return await query.resolver.count(query.model); - } + let query: Query | undefined = undefined; - const frame = this.session.stopwatch.start((fromHas ? 'Has:' : 'Count:') + this.classSchema.getClassName(), FrameCategory.database); + const frame = this.session.stopwatch?.start((fromHas ? 'Has:' : 'Count:') + this.classSchema.getClassName(), FrameCategory.database); try { - frame.data({ collection: this.classSchema.getCollectionName(), className: this.classSchema.getClassName() }); - const eventFrame = this.session.stopwatch.start('Events'); - const query = this.onQueryResolve(await this.callOnFetchEvent(this)); - eventFrame.end(); + frame?.data({ collection: this.classSchema.getCollectionName(), className: this.classSchema.getClassName() }); + const eventFrame = this.session.stopwatch?.start('Events'); + query = this.onQueryResolve(await this.callOnFetchEvent(this)); + eventFrame?.end(); return await query.resolver.count(query.model); + } catch (error: any) { + await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session, query?.classSchema, query)); + throw error; } finally { - frame.end(); + frame?.end(); } } public async find(): Promise[]> { - if (!this.session.stopwatch) { - const query = this.onQueryResolve(await this.callOnFetchEvent(this)); - return await query.resolver.find(query.model) as Resolve[]; - } + const frame = this.session.stopwatch?.start('Find:' + this.classSchema.getClassName(), FrameCategory.database); + let query: Query | undefined = undefined; - const frame = this.session.stopwatch.start('Find:' + this.classSchema.getClassName(), FrameCategory.database); try { - frame.data({ collection: this.classSchema.getCollectionName(), className: this.classSchema.getClassName() }); - const eventFrame = this.session.stopwatch.start('Events'); - const query = this.onQueryResolve(await this.callOnFetchEvent(this)); - eventFrame.end(); + frame?.data({ collection: this.classSchema.getCollectionName(), className: this.classSchema.getClassName() }); + const eventFrame = this.session.stopwatch?.start('Events'); + query = this.onQueryResolve(await this.callOnFetchEvent(this)); + eventFrame?.end(); return await query.resolver.find(query.model) as Resolve[]; + } catch (error: any) { + await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session, query?.classSchema, query)); + throw error; } finally { - frame.end(); + frame?.end(); } } public async findOneOrUndefined(): Promise | undefined> { - if (!this.session.stopwatch) { - const query = this.onQueryResolve(await this.callOnFetchEvent(this.limit(1))); - return await query.resolver.findOneOrUndefined(query.model) as Resolve; - } + const frame = this.session.stopwatch?.start('FindOne:' + this.classSchema.getClassName(), FrameCategory.database); + let query: Query | undefined = undefined; - const frame = this.session.stopwatch.start('FindOne:' + this.classSchema.getClassName(), FrameCategory.database); try { - frame.data({ collection: this.classSchema.getCollectionName(), className: this.classSchema.getClassName() }); - const eventFrame = this.session.stopwatch.start('Events'); - const query = this.onQueryResolve(await this.callOnFetchEvent(this.limit(1))); - eventFrame.end(); + frame?.data({ collection: this.classSchema.getCollectionName(), className: this.classSchema.getClassName() }); + const eventFrame = this.session.stopwatch?.start('Events'); + query = this.onQueryResolve(await this.callOnFetchEvent(this.limit(1))); + eventFrame?.end(); return await query.resolver.findOneOrUndefined(query.model) as Resolve; + } catch (error: any) { + await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session, query?.classSchema, query)); + throw error; } finally { - frame.end(); + frame?.end(); } } @@ -831,10 +831,10 @@ export class Query extends BaseQuery { const deleteResult: DeleteResult = { modified: 0, - primaryKeys: [] + primaryKeys: [], }; - const frame = this.session.stopwatch ? this.session.stopwatch.start('Delete:' + this.classSchema.getClassName(), FrameCategory.database) : undefined; + const frame = this.session.stopwatch?.start('Delete:' + this.classSchema.getClassName(), FrameCategory.database); if (frame) frame.data({ collection: this.classSchema.getCollectionName(), className: this.classSchema.getClassName() }); try { @@ -867,6 +867,9 @@ export class Query extends BaseQuery { } return deleteResult; + } catch (error: any) { + await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session, query.classSchema, query)); + throw error; } finally { if (frame) frame.end(); } @@ -899,7 +902,7 @@ export class Query extends BaseQuery { const patchResult: PatchResult = { modified: 0, returning: {}, - primaryKeys: [] + primaryKeys: [], }; if (changes.empty) return patchResult; @@ -951,6 +954,9 @@ export class Query extends BaseQuery { } return patchResult; + } catch (error: any) { + await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session, query.classSchema, query)); + throw error; } finally { if (frame) frame.end(); } diff --git a/packages/orm/tests/events.spec.ts b/packages/orm/tests/events.spec.ts index a5fe3b4bc..6b890c178 100644 --- a/packages/orm/tests/events.spec.ts +++ b/packages/orm/tests/events.spec.ts @@ -3,6 +3,9 @@ import { Database } from '../src/database.js'; import { MemoryDatabaseAdapter } from '../src/memory-db.js'; import { AutoIncrement, PrimaryKey, ReflectionClass, t } from '@deepkit/type'; import { DatabaseSession } from '../src/database-session.js'; +import { DatabasePersistence } from '../src/database-adapter.js'; +import { DatabaseErrorEvent, DatabaseErrorInsertEvent, DatabaseErrorUpdateEvent, onDatabaseError } from '../src/event.js'; +import { assertDefined, assertInstanceOf } from '@deepkit/core'; test('onUpdate plugin', async () => { function onUpdate() { @@ -61,3 +64,74 @@ test('onUpdate plugin', async () => { expect(schema.getProperty('updatedAt').data['timestamp/onUpdate']).toBe(true); }); + +test('error insert event', async () => { + class User { + constructor( + public id: number & PrimaryKey, + public username: string, + ) { + } + } + + class FailAdapter extends MemoryDatabaseAdapter { + createPersistence(): DatabasePersistence { + return Object.assign(super.createPersistence(), { + insert() { + throw new Error('oops'); + }, + }); + } + } + + const database = new Database(new FailAdapter(), [User]); + + let event: DatabaseErrorEvent | undefined; + database.listen(onDatabaseError, e => event = e); + + await database.persist(new User(1, 'peter')).catch(() => undefined); + + assertDefined(event); + assertInstanceOf(event, DatabaseErrorInsertEvent); + expect(event.error.message).toBe('oops'); + expect(event.inserts.length).toBe(1); + assertInstanceOf(event.inserts[0], User); + expect(event.inserts[0]).toMatchObject({ id: 1, username: 'peter' }) +}); + + +test('error update event', async () => { + class User { + constructor( + public id: number & PrimaryKey, + public username: string, + ) { + } + } + + class FailAdapter extends MemoryDatabaseAdapter { + createPersistence(): DatabasePersistence { + return Object.assign(super.createPersistence(), { + update() { + throw new Error('oops'); + }, + }); + } + } + + const database = new Database(new FailAdapter(), [User]); + + let event: DatabaseErrorEvent | undefined; + database.listen(onDatabaseError, e => event = e); + + const item = new User(1, 'peter'); + await database.persist(item); + item.username = 'changed'; + await database.persist(item).catch(() => undefined); + + assertDefined(event); + assertInstanceOf(event, DatabaseErrorUpdateEvent); + expect(event.error.message).toBe('oops'); + expect(event.changeSets.length).toBe(1); + expect(event.changeSets[0].changes.$set).toEqual({ username: 'changed' }); +});