Skip to content

Commit

Permalink
feat(orm): onDatabaseError event
Browse files Browse the repository at this point in the history
This new event token allows to hook into various errors that are thrown when working with the `Database` abstraction and their adapter.

For example, a user can now listen to connection errors, or on errors from a query, or from the unit of work.

```typescript

const database = new Database(new Adapter());

database.listen(onDatabaseError, (event) => {
    if (event.error instanceof MongoConnectionError) {
        // connection could not be established
    }

    // event.query/event.classSchema is set if it originated from a query
});
```

In general event.error could contain anything, is from subclass `DatabaseError` if it was related to database operations there were correctly wrapped by the adapter. Various specialisation errors that extends DatabaseError exist in various adapters like e.g. MongoConnectionError and MongoDatabaseError.

The event is of type DatabaseErrorEvent, but has specialisations for unit of work errors DatabaseErrorInsertEvent and DatabaseErrorUpdateEvent with more information available.
  • Loading branch information
marcj committed Feb 2, 2024
1 parent 2a80fd7 commit cdb7256
Show file tree
Hide file tree
Showing 15 changed files with 411 additions and 168 deletions.
18 changes: 18 additions & 0 deletions packages/core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -825,3 +825,21 @@ export function formatError(error: any): string {

return String(error);
}

/**
* Asserts that the given object is an instance of the given class.
*/
export function assertInstanceOf<T>(object: any, constructor: { new (...args: any[]): T }): asserts object is T {
if (!(object instanceof constructor)) {
throw new Error(`Object ${getClassName(object)} is not an instance of the expected class ${getClassName(constructor)}`);
}
}

/**
* Asserts that the given value is defined (not null and not undefined).
*/
export function assertDefined<T>(value: T): asserts value is NonNullable<T> {
if (value === null || value === undefined) {
throw new Error(`Value is not defined`);
}
}
2 changes: 1 addition & 1 deletion packages/event/src/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
ReflectionClass
} from '@deepkit/type';

export type EventListenerCallback<T> = (event: T, ...args: any[]) => void | Promise<void>;
export type EventListenerCallback<T> = (event: T, ...args: any[]) => any | Promise<any>;

export class EventError extends CustomError {
}
Expand Down
37 changes: 27 additions & 10 deletions packages/mongo/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import {
DatabaseAdapter,
DatabaseAdapterQueryFactory,
DatabaseEntityRegistry,
DatabaseErrorEvent,
DatabaseSession,
FindQuery,
ItemNotFound,
MigrateOptions,
onDatabaseError,
OrmEntity,
RawFactory
RawFactory,
} from '@deepkit/orm';
import { AbstractClassType, ClassType, isArray } from '@deepkit/core';
import { MongoDatabaseQuery } from './query.js';
Expand Down Expand Up @@ -56,20 +58,35 @@ class MongoRawCommandQuery<T> implements FindQuery<T> {
}

async find(): Promise<T[]> {
const res = await this.client.execute(this.command);
return res as any;
try {
const res = await this.client.execute(this.command);
return res as any;
} catch (error: any) {
await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session));
throw error;
}
}

async findOneOrUndefined(): Promise<T> {
const res = await this.client.execute(this.command);
if (isArray(res)) return res[0];
return res;
try {
const res = await this.client.execute(this.command);
if (isArray(res)) return res[0];
return res;
} catch (error: any) {
await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session));
throw error;
}
}

async findOne(): Promise<T> {
const item = await this.findOneOrUndefined();
if (!item) throw new ItemNotFound('Could not find item');
return item;
try {
const item = await this.findOneOrUndefined();
if (!item) throw new ItemNotFound('Could not find item');
return item;
} catch (error: any) {
await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session));
throw error;
}
}
}

Expand Down Expand Up @@ -99,7 +116,7 @@ export class MongoDatabaseAdapter extends DatabaseAdapter {
protected ormSequences: ReflectionClass<any>;

constructor(
connectionString: string
connectionString: string,
) {
super();
this.client = new MongoClient(connectionString);
Expand Down
8 changes: 3 additions & 5 deletions packages/mongo/src/client/command/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
*/

import { asyncOperation, getClassName } from '@deepkit/core';
import { handleErrorResponse, MongoError } from '../error.js';
import { handleErrorResponse, MongoDatabaseError, MongoError } from '../error.js';
import { MongoClientConfig } from '../config.js';
import { Host } from '../host.js';
import type { MongoDatabaseTransaction } from '../connection.js';
import { ReceiveType, ReflectionClass, resolveReceiveType, SerializationError, stringifyType, Type, typeOf, typeSettings, UnpopulatedCheck, ValidationError } from '@deepkit/type';
import { BSONDeserializer, deserializeBSONWithoutOptimiser, getBSONDeserializer } from '@deepkit/bson';
import { mongoBinarySerializer } from '../../mongo-serializer.js';
import { inspect } from 'util';

export interface CommandMessageResponseCallbackResult<T> {
/**
Expand Down Expand Up @@ -55,7 +54,7 @@ export abstract class Command {
public sender?: <T>(schema: Type, message: T) => void;

public sendAndWait<T, R = BaseResponse>(
message: T, messageType?: ReceiveType<T>, responseType?: ReceiveType<R>
message: T, messageType?: ReceiveType<T>, responseType?: ReceiveType<R>,
): Promise<R> {
if (!this.sender) throw new Error(`No sender set in command ${getClassName(this)}`);
this.sender(resolveReceiveType(messageType), message);
Expand Down Expand Up @@ -84,15 +83,14 @@ export abstract class Command {
}

if (!message.ok) {
this.current.reject(new MongoError(message.errmsg || 'error', message.code));
this.current.reject(Object.assign(new MongoDatabaseError(message.errmsg || 'error'), { code: message.code }));
} else {
this.current.resolve(message);
}
} catch (error: any) {
if (error instanceof ValidationError || error instanceof SerializationError) {
if (this.current.responseType) {
const raw = deserializeBSONWithoutOptimiser(response);
console.log('mongo raw response', inspect(raw, { depth: null }));
if (raw.errmsg && raw.ok === 0) {
const error = handleErrorResponse(raw);
if (error) {
Expand Down
35 changes: 17 additions & 18 deletions packages/mongo/src/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* You should have received a copy of the MIT License along with this program.
*/

import { arrayRemoveItem, asyncOperation } from '@deepkit/core';
import { arrayRemoveItem, asyncOperation, formatError } from '@deepkit/core';
import { Host } from './host.js';
import { createConnection, Socket } from 'net';
import { connect as createTLSConnection, TLSSocket } from 'tls';
Expand All @@ -17,8 +17,7 @@ import { stringifyType, Type, uuid } from '@deepkit/type';
import { BSONBinarySerializer, getBSONSerializer, getBSONSizer, Writer } from '@deepkit/bson';
import { HandshakeCommand } from './command/handshake.js';
import { MongoClientConfig } from './config.js';
import { MongoError } from './error.js';

import { MongoConnectionError, MongoError } from './error.js';
import { DatabaseTransaction } from '@deepkit/orm';
import { CommitTransactionCommand } from './command/commitTransaction.js';
import { AbortTransactionCommand } from './command/abortTransaction.js';
Expand Down Expand Up @@ -59,7 +58,7 @@ export class MongoConnectionPool {
*/
public connections: MongoConnection[] = [];

protected queue: {resolve: (connection: MongoConnection) => void, request: ConnectionRequest}[] = [];
protected queue: { resolve: (connection: MongoConnection) => void, request: ConnectionRequest }[] = [];

protected nextConnectionClose: Promise<boolean> = Promise.resolve(true);

Expand Down Expand Up @@ -88,7 +87,7 @@ export class MongoConnectionPool {
await Promise.allSettled(promises);
}
} catch (error: any) {
throw new MongoError('Failed to connect: ' + error.message);
throw new MongoConnectionError(`Failed to connect: ${formatError(error)}`);
}
}

Expand Down Expand Up @@ -132,7 +131,7 @@ export class MongoConnectionPool {
if (request.readonly && host.isReadable()) return host;
}

throw new MongoError(`Could not find host for connection request. (readonly=${request.readonly}, hosts=${hosts.length}). Last Error: ${this.lastError}`);
throw new MongoConnectionError(`Could not find host for connection request. (readonly=${request.readonly}, hosts=${hosts.length}). Last Error: ${this.lastError}`);
}

protected createAdditionalConnectionForRequest(request: ConnectionRequest): MongoConnection {
Expand Down Expand Up @@ -203,7 +202,7 @@ export class MongoConnectionPool {
if (!connection.isConnected()) continue;
if (connection.reserved) continue;

if (request.nearest) throw new Error('Nearest not implemented yet');
if (request.nearest) throw new MongoConnectionError('Nearest not implemented yet');

if (!this.matchRequest(connection, r)) continue;

Expand All @@ -225,7 +224,7 @@ export class MongoConnectionPool {

return asyncOperation((resolve) => {
this.stats.connectionsQueued++;
this.queue.push({resolve, request: r});
this.queue.push({ resolve, request: r });
});
}
}
Expand Down Expand Up @@ -265,7 +264,7 @@ export class MongoDatabaseTransaction extends DatabaseTransaction {

async commit() {
if (!this.connection) return;
if (this.ended) throw new Error('Transaction ended already');
if (this.ended) throw new MongoError('Transaction ended already');

await this.connection.execute(new CommitTransactionCommand());
this.ended = true;
Expand All @@ -274,7 +273,7 @@ export class MongoDatabaseTransaction extends DatabaseTransaction {

async rollback() {
if (!this.connection) return;
if (this.ended) throw new Error('Transaction ended already');
if (this.ended) throw new MongoError('Transaction ended already');
if (!this.started) return;

await this.connection.execute(new AbortTransactionCommand());
Expand Down Expand Up @@ -321,7 +320,7 @@ export class MongoConnection {
host: host.hostname,
port: host.port,
timeout: config.options.connectTimeoutMS,
servername: host.hostname
servername: host.hostname,
};
const optional = {
ca: config.options.tlsCAFile,
Expand All @@ -343,7 +342,7 @@ export class MongoConnection {
this.socket = createConnection({
host: host.hostname,
port: host.port,
timeout: config.options.connectTimeoutMS
timeout: config.options.connectTimeoutMS,
});

this.socket.on('data', (data) => this.responseParser.feed(data));
Expand Down Expand Up @@ -410,7 +409,7 @@ export class MongoConnection {
// const offset = 16 + 4 + 8 + 4 + 4; //QUERY_REPLY
const message = response.slice(offset, size);

if (!this.lastCommand) throw new Error(`Got a server response without active command`);
if (!this.lastCommand) throw new MongoError(`Got a server response without active command`);

this.lastCommand.command.handleResponse(message);
}
Expand All @@ -422,7 +421,7 @@ export class MongoConnection {
*/
public async execute<T extends Command>(command: T): Promise<ReturnType<T['execute']>> {
if (this.status === MongoConnectionStatus.pending) await this.connect();
if (this.status === MongoConnectionStatus.disconnected) throw new Error('Disconnected');
if (this.status === MongoConnectionStatus.disconnected) throw new MongoError('Disconnected');

if (this.lastCommand && this.lastCommand.promise) {
await this.lastCommand.promise;
Expand Down Expand Up @@ -487,7 +486,7 @@ export class MongoConnection {
}

async connect(): Promise<void> {
if (this.status === MongoConnectionStatus.disconnected) throw new Error('Connection disconnected');
if (this.status === MongoConnectionStatus.disconnected) throw new MongoError('Connection disconnected');
if (this.status !== MongoConnectionStatus.pending) return;

this.status = MongoConnectionStatus.connecting;
Expand All @@ -496,7 +495,7 @@ export class MongoConnection {
this.socket.on('error', (error) => {
this.connectingPromise = undefined;
this.status = MongoConnectionStatus.disconnected;
reject(new MongoError('Connection error: ' + error.message));
reject(new MongoConnectionError(formatError(error.message)));
});

if (this.socket.destroyed) {
Expand Down Expand Up @@ -526,7 +525,7 @@ export class ResponseParser {
protected currentMessageSize: number = 0;

constructor(
protected readonly onMessage: (response: Uint8Array) => void
protected readonly onMessage: (response: Uint8Array) => void,
) {
}

Expand Down Expand Up @@ -586,7 +585,7 @@ export class ResponseParser {
}

const nextCurrentSize = readUint32LE(currentBuffer);
if (nextCurrentSize <= 0) throw new Error('message size wrong');
if (nextCurrentSize <= 0) throw new MongoError('message size wrong');
currentSize = nextCurrentSize;
//buffer and size has been set. consume this message in the next loop iteration
}
Expand Down
Loading

0 comments on commit cdb7256

Please sign in to comment.