diff --git a/packages/rpc/src/client/action.ts b/packages/rpc/src/client/action.ts index 8ef4f4c91..4ac79e9b7 100644 --- a/packages/rpc/src/client/action.ts +++ b/packages/rpc/src/client/action.ts @@ -24,7 +24,7 @@ import { rpcResponseActionObservableSubscriptionError, rpcResponseActionType, RpcTypes, - WrappedV + WrappedV, } from '../model.js'; import { rpcDecodeError, RpcMessage } from '../protocol.js'; import { ClientProgress } from '../writer.js'; @@ -310,6 +310,9 @@ export class RpcActionClient { subject.release(); const error = reply.getError(); // console.debug('Client received error', error); + for (const sub of Object.values(subscribers)) { + sub.error(error); + } reject(error); break; } diff --git a/packages/rpc/src/client/client.ts b/packages/rpc/src/client/client.ts index 71128a940..f74a20772 100644 --- a/packages/rpc/src/client/client.ts +++ b/packages/rpc/src/client/client.ts @@ -412,6 +412,7 @@ export class RpcBaseClient implements WritableClient { } else { const callback = this.replies.get(message.id); if (!callback) { + console.log(message.debug()); throw new Error('No callback for ' + message.id); } if (callback) callback(message); diff --git a/packages/rpc/src/server/action.ts b/packages/rpc/src/server/action.ts index eb61d6d47..ed4b7ed22 100644 --- a/packages/rpc/src/server/action.ts +++ b/packages/rpc/src/server/action.ts @@ -24,7 +24,7 @@ import { typeOf, TypeTuple, ValidationError, - ValidationErrorItem + ValidationErrorItem, } from '@deepkit/type'; import { isObservable, Observable, Subject, Subscription } from 'rxjs'; import { Collection, CollectionEvent, CollectionQueryModel, CollectionQueryModelInterface, CollectionState } from '../collection.js'; @@ -281,7 +281,7 @@ export class RpcServerAction { const { types, classType, method } = observable; const body = message.parseBody(); if (observable.subscriptions[body.id]) return response.error(new Error('Subscription already created')); - if (!types.observableNextSchema) return response.error(new Error('No observable type detected')); + if (!types.observableNextSchema) return response.error(new Error('No observable type on RPC action detected. No Observable defined or RxJS not nominal.')); const sub: { active: boolean, sub?: Subscription, complete: () => void } = { active: true, @@ -335,7 +335,7 @@ export class RpcServerAction { if (!observable) return response.error(new Error('No observable to unsubscribe found')); const body = message.parseBody(); const sub = observable.subscriptions[body.id]; - if (!sub) return response.error(new Error('No subscription found')); + if (!sub) return; sub.active = false; if (sub.sub) { sub.sub.unsubscribe(); diff --git a/packages/rpc/tests/controller.spec.ts b/packages/rpc/tests/controller.spec.ts index ba7888623..34c167f6e 100644 --- a/packages/rpc/tests/controller.spec.ts +++ b/packages/rpc/tests/controller.spec.ts @@ -1,10 +1,10 @@ import { assertType, entity, Positive, ReflectionClass, ReflectionKind } from '@deepkit/type'; import { expect, test } from '@jest/globals'; import { DirectClient } from '../src/client/client-direct.js'; -import { getActions, rpc, rpcClass, RpcController } from '../src/decorators.js'; +import { getActions, rpc, RpcController } from '../src/decorators.js'; import { RpcKernel, RpcKernelConnection } from '../src/server/kernel.js'; import { Session, SessionState } from '../src/server/security.js'; -import { BehaviorSubject } from 'rxjs'; +import { BehaviorSubject, Observable } from 'rxjs'; import { getClassName, sleep } from '@deepkit/core'; import { ProgressTracker } from '@deepkit/core-rxjs'; @@ -595,3 +595,20 @@ test('progress tracker reuse', async () => { expect(res).toBe(undefined); } }); + +test('missing observable types throw', async () => { + class Controller { + @rpc.action() + test() { + return new Observable() + } + } + + const kernel = new RpcKernel(); + kernel.registerController(Controller, 'myController'); + const client = new DirectClient(kernel); + const controller = client.controller('myController'); + + const observable = await controller.test(); + await expect(observable.toPromise()).rejects.toThrow('No observable type on RPC action detected.'); +});