Skip to content

Commit

Permalink
feat(rpc): allow serialization of unknown Observable type
Browse files Browse the repository at this point in the history
If an RPC action does not declare any type, serialization already uses the slow `any` serializer. This is now the case as well for Observable<?>.

When no type was detected, we issue a warning that performance is impacted if no explicit type is defined.
  • Loading branch information
marcj committed Feb 10, 2024
1 parent 6a4bac2 commit 0014074
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 52 deletions.
4 changes: 2 additions & 2 deletions packages/framework/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
RpcKernelConnection,
RpcMessage,
RpcMessageBuilder,
RpcServerAction
RpcServerAction,
} from '@deepkit/rpc';
import { FrameCategory, Stopwatch } from '@deepkit/stopwatch';
import { ClassType } from '@deepkit/core';
Expand Down Expand Up @@ -66,7 +66,7 @@ export class RpcServerActionWithStopwatch extends RpcServerAction {
}

export class RpcKernelConnectionWithStopwatch extends RpcKernelConnection {
protected actionHandler = new RpcServerActionWithStopwatch(this, this.controllers, this.injector, this.security, this.sessionState);
protected actionHandler = new RpcServerActionWithStopwatch(this, this.controllers, this.injector, this.security, this.sessionState, this.logger);
stopwatch?: Stopwatch;

setStopwatch(stopwatch: Stopwatch) {
Expand Down
5 changes: 5 additions & 0 deletions packages/logger/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ export class MemoryLogger extends Logger {
getOutput(): string {
return this.memory.messageStrings.join('\n');
}

clear() {
this.memory.messageStrings = [];
this.memory.messages = [];
}
}

export type ScopedLogger = Inject<LoggerInterface, 'scoped-logger'>;
Expand Down
16 changes: 7 additions & 9 deletions packages/rpc/src/client/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ export class RpcActionClient {

const returnType = deserializeType(parsed.type, { disableReuse: typeReuseDisabled });

let observableNextSchema: TypeObjectLiteral | undefined;
let collectionSchema: Type | undefined;
let collectionQueryModel: Type | undefined;
let unwrappedReturnType = returnType;
Expand All @@ -461,13 +460,6 @@ export class RpcActionClient {
assertType(parameters, ReflectionKind.tuple);

if (parsed.mode === 'observable') {
observableNextSchema = {
kind: ReflectionKind.objectLiteral,
types: [
{ kind: ReflectionKind.propertySignature, name: 'id', type: { kind: ReflectionKind.number } },
{ kind: ReflectionKind.propertySignature, name: 'v', type: unwrappedReturnType },
]
} as TypeObjectLiteral;
} else if (parsed.mode === 'entitySubject') {
} else if (parsed.mode === 'collection') {
collectionQueryModel = typeOf<CollectionQueryModelInterface<unknown>>([unwrappedReturnType]) as TypeObjectLiteral;
Expand Down Expand Up @@ -501,7 +493,13 @@ export class RpcActionClient {
{ kind: ReflectionKind.propertySignature, name: 'v', type: unwrappedReturnType },
]
} as TypeObjectLiteral,
observableNextSchema,
observableNextSchema: {
kind: ReflectionKind.objectLiteral,
types: [
{ kind: ReflectionKind.propertySignature, name: 'id', type: { kind: ReflectionKind.number } },
{ kind: ReflectionKind.propertySignature, name: 'v', type: unwrappedReturnType },
]
} as TypeObjectLiteral
};

resolve(state.types);
Expand Down
100 changes: 66 additions & 34 deletions packages/rpc/src/server/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
* You should have received a copy of the MIT License along with this program.
*/

import { ClassType, collectForMicrotask, getClassName, isPrototypeOfBase, toFastProperties } from '@deepkit/core';
import {
ClassType,
collectForMicrotask,
getClassName,
isPlainObject,
isPrototypeOfBase,
toFastProperties,
} from '@deepkit/core';
import { isBehaviorSubject, isSubject, ProgressTracker, ProgressTrackerState } from '@deepkit/core-rxjs';
import {
assertType,
Expand All @@ -26,7 +33,13 @@ import {
ValidationErrorItem,
} from '@deepkit/type';
import { isObservable, Observable, Subject, Subscription } from 'rxjs';
import { Collection, CollectionEvent, CollectionQueryModel, CollectionQueryModelInterface, CollectionState } from '../collection.js';
import {
Collection,
CollectionEvent,
CollectionQueryModel,
CollectionQueryModelInterface,
CollectionState,
} from '../collection.js';
import { getActions } from '../decorators.js';
import {
ActionMode,
Expand All @@ -46,6 +59,7 @@ import { rpcEncodeError, RpcMessage } from '../protocol.js';
import { RpcKernelBaseConnection, RpcMessageBuilder } from './kernel.js';
import { RpcControllerAccess, RpcKernelSecurity, SessionState } from './security.js';
import { InjectorContext, InjectorModule } from '@deepkit/injector';
import { LoggerInterface } from '@deepkit/logger';

export type ActionTypes = {
actionCallSchema: TypeObjectLiteral, //with args as property
Expand All @@ -59,12 +73,32 @@ export type ActionTypes = {
observableNextSchema?: TypeObjectLiteral, //with v as property
collectionSchema?: Type, //with v as array property
collectionQueryModel?: Type,

noTypeWarned: boolean;
};

function createNoTypeError(classType: ClassType, method: string) {
return new Error(`No observable type on RPC action ${getClassName(classType)}.${method} detected. Either no return type Observable<T> defined or wrong RxJS nominal type.`)
}

function createNoObservableWarning(classType: ClassType, method: string) {
return new Error(`RPC action ${getClassName(classType)}.${method} returns an Observable, but no specific type (e.g. Observable<T>) or 'any | unknown' type is defined. This might lead to unexpected behavior and slow performance.`);
}

function createNoTypeWarning(classType: ClassType, method: string, value: any) {
const firstKey = Object.keys(value)[0];
return new Error(`RPC action ${getClassName(classType)}.${method} returns an object, but no specific type (e.g. { ${firstKey || 'v'}: T }) or 'any | unknown' type is defined. This might lead to slow performance.`);
}

function validV(type: Type, index = 0): boolean{
if (type.kind !== ReflectionKind.objectLiteral) return false;
const second = type.types[index];
if (!second || second.kind !== ReflectionKind.propertySignature) return false;
if (second.name !== 'v') return false;
if (second.type.kind === ReflectionKind.any || second.type.kind === ReflectionKind.unknown) return false;
return true;
}

export class RpcServerAction {
protected cachedActionsTypes: { [id: string]: ActionTypes } = {};
protected observableSubjects: {
Expand Down Expand Up @@ -98,6 +132,7 @@ export class RpcServerAction {
protected injector: InjectorContext,
protected security: RpcKernelSecurity,
protected sessionState: SessionState,
protected logger: LoggerInterface,
) {
}

Expand Down Expand Up @@ -161,7 +196,6 @@ export class RpcServerAction {
]
};

let nextSchema: Type | undefined = undefined;
let unwrappedReturnType = methodReflection.getReturnType();
if (unwrappedReturnType.kind === ReflectionKind.promise) {
unwrappedReturnType = unwrappedReturnType.type;
Expand All @@ -177,6 +211,7 @@ export class RpcServerAction {
}

let type: Type = fullType;
let nextType: Type | undefined;
let collectionSchema: Type | undefined;
let collectionQueryModel: Type | undefined;

Expand All @@ -202,39 +237,11 @@ export class RpcServerAction {
} else if (isPrototypeOfBase(unwrappedReturnType.classType, ProgressTracker)) {
mode = 'observable';
type = typeOf<ProgressTrackerState[] | undefined>();
nextSchema = {
kind: ReflectionKind.objectLiteral,
types: [{
kind: ReflectionKind.propertySignature,
name: 'id',
parent: Object as any,
type: { kind: ReflectionKind.number },
}, {
kind: ReflectionKind.propertySignature,
name: 'v',
parent: Object as any,
optional: true,
type: type,
}]
};
nextType = type;
} else if (isPrototypeOfBase(unwrappedReturnType.classType, Observable)) {
mode = 'observable';
type = unwrappedReturnType.typeArguments ? unwrappedReturnType.typeArguments[0] : { kind: ReflectionKind.any };
nextSchema = {
kind: ReflectionKind.objectLiteral,
types: [{
kind: ReflectionKind.propertySignature,
name: 'id',
parent: Object as any,
type: { kind: ReflectionKind.number },
}, {
kind: ReflectionKind.propertySignature,
name: 'v',
parent: Object as any,
optional: true,
type: type,
}]
};
nextType = type;
}
}

Expand All @@ -256,9 +263,24 @@ export class RpcServerAction {
mode,
type,
parametersValidate: getValidatorFunction(undefined, parameters),
observableNextSchema: nextSchema,
observableNextSchema: {
kind: ReflectionKind.objectLiteral,
types: [{
kind: ReflectionKind.propertySignature,
name: 'id',
parent: Object as any,
type: { kind: ReflectionKind.number },
}, {
kind: ReflectionKind.propertySignature,
name: 'v',
parent: Object as any,
optional: true,
type: nextType || { kind: ReflectionKind.any },
}]
},
collectionSchema,
collectionQueryModel,
noTypeWarned: false,
};
if (!types.type) {
throw new Error(`No type detected for action ${controller}.${methodName}`);
Expand All @@ -279,6 +301,11 @@ export class RpcServerAction {
if (observable.subscriptions[body.id]) return response.error(new Error('Subscription already created'));
if (!types.observableNextSchema) return response.error(createNoTypeError(classType, method));

if (!types.noTypeWarned && (types.mode !== 'observable' || !validV(types.observableNextSchema, 1))) {
types.noTypeWarned = true;
this.logger.warn(createNoObservableWarning(classType, method));
}

const sub: { active: boolean, sub?: Subscription, complete: () => void } = {
active: true,
complete: () => {
Expand Down Expand Up @@ -507,6 +534,11 @@ export class RpcServerAction {

response.reply<rpcResponseActionObservable>(RpcTypes.ResponseActionObservable, { type });
} else {
if (!types.noTypeWarned && isPlainObject(result) && !validV(types.resultSchema)) {
types.noTypeWarned = true;
this.logger.warn(createNoTypeWarning(classType.controller, body.method, result));
}

response.reply(RpcTypes.ResponseActionSimple, { v: result }, types.resultSchema);
}
} catch (error: any) {
Expand Down
16 changes: 12 additions & 4 deletions packages/rpc/src/server/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@
import { arrayRemoveItem, ClassType, getClassName } from '@deepkit/core';
import { ReceiveType, resolveReceiveType, stringifyUuid, typeOf, writeUuid } from '@deepkit/type';
import { RpcMessageSubject } from '../client/message-subject.js';
import { AuthenticationError, ControllerDefinition, rpcAuthenticate, rpcClientId, rpcError, rpcPeerRegister, rpcResponseAuthenticate, RpcTypes } from '../model.js';
import {
AuthenticationError,
ControllerDefinition,
rpcAuthenticate,
rpcClientId,
rpcError,
rpcPeerRegister,
rpcResponseAuthenticate,
RpcTypes,
} from '../model.js';
import {
createBuffer,
createRpcCompositeMessage,
Expand All @@ -24,7 +33,7 @@ import {
rpcEncodeError,
RpcMessage,
RpcMessageReader,
RpcMessageRouteType
RpcMessageRouteType,
} from '../protocol.js';
import { RpcMessageWriter, RpcMessageWriterOptions } from '../writer.js';
import { RpcServerAction } from './action.js';
Expand All @@ -34,7 +43,6 @@ import { RemoteController } from '../client/client.js';
import { InjectorContext, InjectorModule, NormalizedProvider } from '@deepkit/injector';
import { Logger, LoggerInterface } from '@deepkit/logger';
import { rpcClass } from '../decorators.js';
import { Provider } from '@deepkit/injector';

export class RpcCompositeMessage {
protected messages: RpcCreateMessageDef<any>[] = [];
Expand Down Expand Up @@ -304,7 +312,7 @@ export class RpcKernelConnections {

export class RpcKernelConnection extends RpcKernelBaseConnection {
public myPeerId?: string;
protected actionHandler = new RpcServerAction(this, this.controllers, this.injector, this.security, this.sessionState);
protected actionHandler = new RpcServerAction(this, this.controllers, this.injector, this.security, this.sessionState, this.logger);

public routeType: RpcMessageRouteType.client | RpcMessageRouteType.server = RpcMessageRouteType.client;

Expand Down
71 changes: 68 additions & 3 deletions packages/rpc/tests/controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Session, SessionState } from '../src/server/security.js';
import { BehaviorSubject, Observable } from 'rxjs';
import { getClassName, sleep } from '@deepkit/core';
import { ProgressTracker } from '@deepkit/core-rxjs';
import { MemoryLogger } from '@deepkit/logger';

test('default name', () => {
@rpc.controller()
Expand Down Expand Up @@ -596,7 +597,7 @@ test('progress tracker reuse', async () => {
}
});

test('missing observable types throw', async () => {
test('missing types log warning', async () => {
class Controller {
@rpc.action()
test() {
Expand All @@ -605,13 +606,77 @@ test('missing observable types throw', async () => {
observer.complete();
});
}

@rpc.action()
test2(): Observable<any> {
return new Observable(observer => {
observer.next({ a: '123' });
observer.complete();
});
}

@rpc.action()
test3(): Observable<string> {
return new Observable(observer => {
observer.next('abc');
observer.complete();
});
}

@rpc.action()
test4() {
return { a: '123' };
}

@rpc.action()
test5(): { a: string } {
return { a: '123' };
}

@rpc.action()
test6() {
return 123;
}
}

const kernel = new RpcKernel();
const memoryLogger = new MemoryLogger();
const kernel = new RpcKernel(undefined, memoryLogger);
kernel.registerController(Controller, 'myController');
const client = new DirectClient(kernel);
const controller = client.controller<Controller>('myController');

const observable = await controller.test();
await expect(observable.toPromise()).rejects.toThrow('No observable type on RPC action Controller.test detected');
expect(await observable.toPromise()).toBe(123);
expect(memoryLogger.getOutput()).toContain('RPC action Controller.test returns an Observable, but no specific type');

memoryLogger.clear();
const observable2 = await controller.test2();
expect(await observable2.toPromise()).toEqual({ a: '123' });
expect(memoryLogger.getOutput()).toContain('RPC action Controller.test2 returns an Observable, but no specific type');

memoryLogger.clear();
const observable3 = await controller.test3();
expect(await observable3.toPromise()).toBe('abc');
expect(memoryLogger.getOutput()).not.toContain('RPC action Controller.test3 returns an Observable, but no specific type');

memoryLogger.clear();
const result = await controller.test4();
expect(result).toEqual({ a: '123' });
expect(memoryLogger.getOutput()).toContain('RPC action Controller.test4 returns an object, but no specific type');

memoryLogger.clear();
const result2 = await controller.test4();
expect(result2).toEqual({ a: '123' });
// does not log again
expect(memoryLogger.getOutput()).not.toContain('RPC action Controller.test4 returns an object, but no specific type');

memoryLogger.clear();
const result3 = await controller.test5();
expect(result3).toEqual({ a: '123' });
expect(memoryLogger.getOutput()).not.toContain('RPC action Controller.test5 returns a number, but no specific type');

memoryLogger.clear();
const result4 = await controller.test6();
expect(result4).toEqual(123);
expect(memoryLogger.getOutput()).not.toContain('RPC action Controller.test5 returns a number, but no specific type');
});

0 comments on commit 0014074

Please sign in to comment.