Skip to content

Commit

Permalink
feat(orm): new selector API, still work in progress 5
Browse files Browse the repository at this point in the history
  • Loading branch information
marcj committed Jul 1, 2024
1 parent d59cbf7 commit db6c7f6
Show file tree
Hide file tree
Showing 25 changed files with 2,480 additions and 839 deletions.
4 changes: 2 additions & 2 deletions packages/bson/src/bson-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
digitByteSize,
TWO_PWR_32_DBL_N,
} from './utils.js';
import { decodeUTF8 } from './strings.js';
import { decodeUTF8 } from '@deepkit/core';
import { nodeBufferToArrayBuffer, ReflectionKind, SerializationError, Type } from '@deepkit/type';
import { hexTable } from './model.js';

Expand Down Expand Up @@ -316,7 +316,7 @@ export class BaseParser {
}

/**
* Size includes the \0. If not existend, increase by 1.
* Size includes the \0. If not existent, increase by 1.
*/
eatString(size: number): string {
this.offset += size;
Expand Down
57 changes: 0 additions & 57 deletions packages/bson/src/strings.ts

This file was deleted.

54 changes: 54 additions & 0 deletions packages/core/src/string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,57 @@ export function indent(indentation: number, prefix: string = '') {
export function capitalize(string: string): string {
return string.charAt(0).toUpperCase() + string.slice(1)
}

const decoder = new TextDecoder('utf-8');

const decodeUTF8Fast = 'undefined' !== typeof Buffer ? Buffer.prototype.utf8Slice : undefined;

const decodeUTF8Big = decodeUTF8Fast ? (buffer: Uint8Array, off: number, end: number) => {
return decodeUTF8Fast.call(buffer, off, end);
} : (buffer: Uint8Array, off: number, end: number) => {
return decoder.decode(buffer.slice(off, end));
}

export function decodeUTF8(buffer: Uint8Array, off: number = 0, end: number) {
if (end - off > 64) {
return decodeUTF8Big(buffer, off, end);
} else {
return decodeUTF8Short(buffer, off, end);
}
}

export function decodeUTF8Short(buffer: Uint8Array, off: number = 0, end: number) {
let s = '';
while (off < end) {
let c = buffer[off++];

if (c > 127) {
if (c > 191 && c < 224) {
if (off >= end)
throw new Error('UTF-8 decode: incomplete 2-byte sequence');
c = (c & 31) << 6 | buffer[off++] & 63;
} else if (c > 223 && c < 240) {
if (off + 1 >= end)
throw new Error('UTF-8 decode: incomplete 3-byte sequence');
c = (c & 15) << 12 | (buffer[off++] & 63) << 6 | buffer[off++] & 63;
} else if (c > 239 && c < 248) {
if (off + 2 >= end)
throw new Error('UTF-8 decode: incomplete 4-byte sequence');
c = (c & 7) << 18 | (buffer[off++] & 63) << 12 | (buffer[off++] & 63) << 6 | buffer[off++] & 63;
} else throw new Error('UTF-8 decode: unknown multibyte start 0x' + c.toString(16) + ' at index ' + (off - 1));
if (c <= 0xffff) {
s += String.fromCharCode(c);
} else if (c <= 0x10ffff) {
c -= 0x10000;
s += String.fromCharCode(c >> 10 | 0xd800, c & 0x3FF | 0xdc00);
} else throw new Error('UTF-8 decode: code point 0x' + c.toString(16) + ' exceeds UTF-16 reach');
} else {
if (c === 0) {
return s;
}

s += String.fromCharCode(c);
}
}
return s;
}
17 changes: 17 additions & 0 deletions packages/core/tests/core.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,20 @@ test('isGlobalClass', () => {

expect(isGlobalClass(Uint8Array)).toBe(true);
});

test('typed array offset', () => {
const a = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
expect(a.byteOffset).toBe(0);
expect(a.length).toBe(10);
expect(a[0]).toBe(1);

const b= new Uint8Array(a.buffer, 1);
expect(b.byteOffset).toBe(1);
expect(b.length).toBe(9);
expect(b[0]).toBe(2);

const c = new Uint8Array(b.buffer, b.byteOffset + 1);
expect(c.byteOffset).toBe(2);
expect(c.length).toBe(8);
expect(c[0]).toBe(3);
});
12 changes: 8 additions & 4 deletions packages/mongo/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
* You should have received a copy of the MIT License along with this program.
*/

import { ConnectionRequest, MongoConnection, MongoConnectionPool, MongoDatabaseTransaction, MongoStats } from './connection.js';
import {
ConnectionRequest,
MongoConnection,
MongoConnectionPool,
MongoDatabaseTransaction,
MongoStats,
} from './connection.js';
import { isErrorRetryableRead, isErrorRetryableWrite, MongoError } from './error.js';
import { sleep } from '@deepkit/core';
import { Command } from './command/command.js';
Expand All @@ -27,9 +33,7 @@ export class MongoClient {

protected serializer: BSONBinarySerializer = mongoBinarySerializer;

constructor(
connectionString: string
) {
constructor(connectionString: string) {
this.config = new MongoClientConfig(connectionString);
this.connectionPool = new MongoConnectionPool(this.config, this.serializer, this.stats);
}
Expand Down
8 changes: 3 additions & 5 deletions packages/mongo/src/client/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ export class MongoConnectionPool {

protected queue: { resolve: (connection: MongoConnection) => void, request: ConnectionRequest }[] = [];

protected nextConnectionClose: Promise<boolean> = Promise.resolve(true);

protected lastError?: Error;

constructor(
Expand Down Expand Up @@ -172,7 +170,7 @@ export class MongoConnectionPool {
waiter.resolve(connection);
//we don't set reserved/set cleanupTimeout,
//since the connection is already reserved and the timeout
//is only set when the connection actually starting idling.
//is only set when the connection actually is starting idling.
return;
}

Expand Down Expand Up @@ -435,7 +433,7 @@ export class MongoConnection {

/**
* Puts a command on the queue and executes it when queue is empty.
* A promises is return that is resolved with the when executed successfully, or rejected
* A promise is returned that is resolved when executed successfully, or rejected
* when timed out, parser error, or any other error.
*/
public async execute<T extends Command<unknown>>(command: T): Promise<ReturnType<T['execute']>> {
Expand Down Expand Up @@ -497,9 +495,9 @@ export class MongoConnection {
writer.writeInt32(messageLength);

//detect backPressure
this.socket.write(buffer);
this.bytesSent += buffer.byteLength;
this.onSent(buffer.byteLength);
this.socket.write(buffer);
} catch (error) {
console.log('failed sending message', message, 'for type', stringifyType(type));
throw error;
Expand Down
2 changes: 1 addition & 1 deletion packages/mongo/src/client/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class ConnectionOptions {
journal?: string;

appName?: string;
retryWrites: boolean = true;
retryWrites: boolean = false;
retryReads: boolean = true;

readConcernLevel: 'local' | 'majority' | 'linearizable' | 'available' = 'majority';
Expand Down
14 changes: 12 additions & 2 deletions packages/mongo/src/query.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@
* You should have received a copy of the MIT License along with this program.
*/

import { DatabaseAdapter, DatabaseDeleteError, DatabasePatchError, DatabaseSession, DeleteResult, Formatter, GenericQueryResolver, OrmEntity, PatchResult } from '@deepkit/orm';
import {
DatabaseAdapter,
DatabaseDeleteError,
DatabasePatchError,
DatabaseSession,
DeleteResult,
Formatter,
GenericQueryResolver,
OrmEntity,
PatchResult,
} from '@deepkit/orm';
import {
Changes,
getPartialSerializeFunction,
Expand Down Expand Up @@ -314,7 +324,7 @@ export class MongoQueryResolver<T extends OrmEntity> extends GenericQueryResolve
}

public async find(model: MongoQueryModel<T>): Promise<T[]> {
const formatter = this.createFormatter(model.withIdentityMap);
const formatter9 = this.createFormatter(model.withIdentityMap);
const connection = await this.client.getConnection(undefined, this.session.assignedTransaction);

try {
Expand Down
1 change: 1 addition & 0 deletions packages/orm/src/database-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ export abstract class DatabaseTransaction {
export class DatabaseSession<ADAPTER extends DatabaseAdapter = DatabaseAdapter> {
public readonly id = SESSION_IDS++;
public withIdentityMap = true;
public withChangeDetection = true;

/**
* When this session belongs to a transaction, then this is set.
Expand Down
7 changes: 5 additions & 2 deletions packages/orm/src/formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ export class Formatter {
protected serializer: Serializer,
protected hydrator?: HydratorFn,
protected identityMap?: IdentityMap,
protected withChangeDetection: boolean = true,
) {
this.deserialize = getSerializeFunction(rootClassSchema.type, serializer.deserializeRegistry);
this.partialDeserialize = getPartialSerializeFunction(rootClassSchema.type, serializer.deserializeRegistry);
if (identityMap) throw new Error('nope');
if (withChangeDetection) throw new Error('nope');
}

protected getInstancePoolForClass(classType: ClassType): Map<PKHash, any> {
Expand Down Expand Up @@ -274,7 +277,7 @@ export class Formatter {
const converted = this.createObject(model, classState, classSchema, dbRecord);

if (!partial) {
if (model.withChangeDetection !== false) getInstanceState(classState, converted).markAsPersisted();
if (this.withChangeDetection) getInstanceState(classState, converted).markAsPersisted();
if (pool) pool.set(pkHash, converted);
if (this.identityMap) this.identityMap.store(classSchema, converted);
}
Expand Down Expand Up @@ -336,7 +339,7 @@ export class Formatter {
: (partial ? getPartialSerializeFunction(classSchema.type, this.serializer.deserializeRegistry)(dbRecord) : getSerializeFunction(classSchema.type, this.serializer.deserializeRegistry)(dbRecord));

if (!partial) {
if (model.withChangeDetection !== false) getInstanceState(classState, converted).markAsFromDatabase();
if (this.withChangeDetection) getInstanceState(classState, converted).markAsFromDatabase();
}

// if (!partial && model.lazyLoad.size) {
Expand Down
43 changes: 31 additions & 12 deletions packages/orm/src/select.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,12 @@ export type OpExpression = {
export type Op = ((...args: any[]) => OpExpression) & { id: symbol };

export function getStateCacheId(state: SelectorState): string {
const cacheId = state.schema.type.id + '_' + state.where?.[treeTag].id + '_' + state.orderBy?.map(v => v.a[treeTag].id).join(':');
const cacheId = state.schema.type.id
+ '_' + state.where?.[treeTag].id
+ '_' + state.limit
+ '_' + state.offset
+ '_' + state.select.map(v => v[treeTag].id).join(':')
+ '_' + state.orderBy?.map(v => v.a[treeTag].id).join(':');
//todo select also
// todo join also
return cacheId;
Expand Down Expand Up @@ -545,6 +550,20 @@ export class Query2<T extends object, R = any> {
return this;
}

/**
* When receiving full objects the change-detector is enabled by default
* to be able to calculate change sets for database.persist()/session.commit().
*
* If disabled, it is impossible to send updates via database.persist()/session.commit(),
* and patchOne/patchMany has to be used.
*
* This is disabled per default for partial results.
*/
disableChangeDetection(): this {
this.state.withChangeDetection = false;
return this;
}

protected async callOnFetchEvent(query: Query2<object>): Promise<void> {
const hasEvents = this.session.eventDispatcher.hasListeners(onFind);
if (!hasEvents) return;
Expand Down Expand Up @@ -607,24 +626,24 @@ export class Query2<T extends object, R = any> {
* @throws DatabaseError
*/
public async find(): Promise<T[]> {
const frame = this.session
.stopwatch?.start('Find:' + this.classSchema.getClassName(), FrameCategory.database);
// const frame = this.session
// .stopwatch?.start('Find:' + this.classSchema.getClassName(), FrameCategory.database);

try {
frame?.data({
collection: this.classSchema.getCollectionName(),
className: this.classSchema.getClassName(),
});
const eventFrame = this.session.stopwatch?.start('Events');
await this.callOnFetchEvent(this);
this.onQueryResolve(this);
eventFrame?.end();
// frame?.data({
// collection: this.classSchema.getCollectionName(),
// className: this.classSchema.getClassName(),
// });
// const eventFrame = this.session.stopwatch?.start('Events');
// await this.callOnFetchEvent(this);
// this.onQueryResolve(this);
// eventFrame?.end();
return await this.resolver.find(this.state) as T[];
} catch (error: any) {
await this.session.eventDispatcher.dispatch(onDatabaseError, new DatabaseErrorEvent(error, this.session, this.state.schema, this));
throw error;
} finally {
frame?.end();
// frame?.end();
}
}

Expand Down
Loading

0 comments on commit db6c7f6

Please sign in to comment.