Skip to content

Commit

Permalink
fix(rpc): error Observable Subscribers when no Observable Next type c…
Browse files Browse the repository at this point in the history
…an be detected

Also, make the error message more useful.
  • Loading branch information
marcj committed Feb 9, 2024
1 parent e215420 commit e207fea
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
5 changes: 4 additions & 1 deletion packages/rpc/src/client/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
rpcResponseActionObservableSubscriptionError,
rpcResponseActionType,
RpcTypes,
WrappedV
WrappedV,
} from '../model.js';
import { rpcDecodeError, RpcMessage } from '../protocol.js';
import { ClientProgress } from '../writer.js';
Expand Down Expand Up @@ -310,6 +310,9 @@ export class RpcActionClient {
subject.release();
const error = reply.getError();
// console.debug('Client received error', error);
for (const sub of Object.values(subscribers)) {
sub.error(error);
}
reject(error);
break;
}
Expand Down
1 change: 1 addition & 0 deletions packages/rpc/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ export class RpcBaseClient implements WritableClient {
} else {
const callback = this.replies.get(message.id);
if (!callback) {
console.log(message.debug());
throw new Error('No callback for ' + message.id);
}
if (callback) callback(message);
Expand Down
6 changes: 3 additions & 3 deletions packages/rpc/src/server/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
typeOf,
TypeTuple,
ValidationError,
ValidationErrorItem
ValidationErrorItem,
} from '@deepkit/type';
import { isObservable, Observable, Subject, Subscription } from 'rxjs';
import { Collection, CollectionEvent, CollectionQueryModel, CollectionQueryModelInterface, CollectionState } from '../collection.js';
Expand Down Expand Up @@ -281,7 +281,7 @@ export class RpcServerAction {
const { types, classType, method } = observable;
const body = message.parseBody<rpcActionObservableSubscribeId>();
if (observable.subscriptions[body.id]) return response.error(new Error('Subscription already created'));
if (!types.observableNextSchema) return response.error(new Error('No observable type detected'));
if (!types.observableNextSchema) return response.error(new Error('No observable type on RPC action detected. No Observable<T> defined or RxJS not nominal.'));

const sub: { active: boolean, sub?: Subscription, complete: () => void } = {
active: true,
Expand Down Expand Up @@ -335,7 +335,7 @@ export class RpcServerAction {
if (!observable) return response.error(new Error('No observable to unsubscribe found'));
const body = message.parseBody<rpcActionObservableSubscribeId>();
const sub = observable.subscriptions[body.id];
if (!sub) return response.error(new Error('No subscription found'));
if (!sub) return;
sub.active = false;
if (sub.sub) {
sub.sub.unsubscribe();
Expand Down
21 changes: 19 additions & 2 deletions packages/rpc/tests/controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { assertType, entity, Positive, ReflectionClass, ReflectionKind } from '@deepkit/type';
import { expect, test } from '@jest/globals';
import { DirectClient } from '../src/client/client-direct.js';
import { getActions, rpc, rpcClass, RpcController } from '../src/decorators.js';
import { getActions, rpc, RpcController } from '../src/decorators.js';
import { RpcKernel, RpcKernelConnection } from '../src/server/kernel.js';
import { Session, SessionState } from '../src/server/security.js';
import { BehaviorSubject } from 'rxjs';
import { BehaviorSubject, Observable } from 'rxjs';
import { getClassName, sleep } from '@deepkit/core';
import { ProgressTracker } from '@deepkit/core-rxjs';

Expand Down Expand Up @@ -595,3 +595,20 @@ test('progress tracker reuse', async () => {
expect(res).toBe(undefined);
}
});

test('missing observable types throw', async () => {
class Controller {
@rpc.action()
test() {
return new Observable()
}
}

const kernel = new RpcKernel();
kernel.registerController(Controller, 'myController');
const client = new DirectClient(kernel);
const controller = client.controller<Controller>('myController');

const observable = await controller.test();
await expect(observable.toPromise()).rejects.toThrow('No observable type on RPC action detected.');
});

0 comments on commit e207fea

Please sign in to comment.