From 3b2c6cc6c75d70e3b6bfac7d53e3e7606696baf4 Mon Sep 17 00:00:00 2001 From: "Marc J. Schmidt" Date: Mon, 27 May 2024 03:41:15 +0200 Subject: [PATCH] feat(rpc): add http transport This change refactors the transport layer of RPC in a way that allows to serialize RpcMessageDefinition in a custom way. This is used then in a new `RpcHttpClientAdapter` adapter. This is a first experimental version of it. This contains breaking changes in the way RPC adapters are written. RpcMessageWriter does not exist anymore and became an implementation detail. A new TransportClientConnection and TransportConnection abstraction has been added that is used in both: the client and server. --- .../broker/src/adapters/deepkit-adapter.ts | 10 +- packages/broker/src/kernel.ts | 19 +- packages/broker/src/model.ts | 4 + packages/example-app/app.ts | 1 + .../src/controller/rpc.controller.ts | 10 + packages/framework/src/application-server.ts | 6 +- packages/framework/src/module.config.ts | 6 + packages/framework/src/module.ts | 51 +++- packages/framework/src/rpc.ts | 6 +- packages/framework/src/worker.ts | 22 +- packages/rpc-tcp/src/client.ts | 10 +- packages/rpc-tcp/src/server.ts | 10 +- packages/rpc/index.ts | 3 +- packages/rpc/src/client/action.ts | 2 +- packages/rpc/src/client/client-direct.ts | 25 +- packages/rpc/src/client/client-websocket.ts | 17 +- packages/rpc/src/client/client.ts | 109 ++++---- packages/rpc/src/client/http.ts | 87 ++++++ packages/rpc/src/model.ts | 6 - packages/rpc/src/progress.ts | 115 ++++++++ packages/rpc/src/protocol.ts | 259 ++++++++++++------ packages/rpc/src/server/http.ts | 76 +++++ packages/rpc/src/server/kernel.ts | 244 ++++++++++++----- packages/rpc/src/server/security.ts | 4 +- packages/rpc/src/transport.ts | 146 ++++++++++ packages/rpc/src/writer.ts | 200 -------------- packages/rpc/tests/chunks.spec.ts | 6 +- packages/rpc/tests/connection.spec.ts | 13 +- packages/rpc/tests/custom-message.spec.ts | 4 +- packages/rpc/tests/http.spec.ts | 63 +++++ packages/rpc/tests/rpc.spec.ts | 105 +++---- 31 files changed, 1092 insertions(+), 547 deletions(-) create mode 100644 packages/rpc/src/client/http.ts create mode 100644 packages/rpc/src/progress.ts create mode 100644 packages/rpc/src/server/http.ts create mode 100644 packages/rpc/src/transport.ts delete mode 100644 packages/rpc/src/writer.ts create mode 100644 packages/rpc/tests/http.spec.ts diff --git a/packages/broker/src/adapters/deepkit-adapter.ts b/packages/broker/src/adapters/deepkit-adapter.ts index 8fd529b70..cd461ca8c 100644 --- a/packages/broker/src/adapters/deepkit-adapter.ts +++ b/packages/broker/src/adapters/deepkit-adapter.ts @@ -22,6 +22,7 @@ import { BrokerQueueResponseHandleMessage, BrokerQueueSubscribe, BrokerQueueUnsubscribe, + brokerResponseGet, brokerResponseGetCache, brokerResponseGetCacheMeta, brokerResponseIncrement, @@ -227,11 +228,12 @@ export class BrokerDeepkitAdapter implements BrokerAdapter { } async get(key: string, type: Type): Promise { - const first: RpcMessage = await this.pool.getConnection('key/' + key) - .sendMessage(BrokerType.Get, { n: key }).firstThenClose(BrokerType.ResponseGet); - if (first.buffer && first.buffer.byteLength > first.bodyOffset) { + const first = await this.pool.getConnection('key/' + key) + .sendMessage(BrokerType.Get, { n: key }) + .firstThenClose(BrokerType.ResponseGet); + if (first.v) { const serializer = getSerializer(type); - return serializer.decode(first.buffer, first.bodyOffset); + return serializer.decode(first.v, 0); } } diff --git a/packages/broker/src/kernel.ts b/packages/broker/src/kernel.ts index 6c45f36d6..d52eb6613 100644 --- a/packages/broker/src/kernel.ts +++ b/packages/broker/src/kernel.ts @@ -11,13 +11,13 @@ import { arrayRemoveItem, ProcessLock, ProcessLocker } from '@deepkit/core'; import { createRpcMessage, - RpcConnectionWriter, RpcKernel, RpcKernelBaseConnection, RpcKernelConnections, RpcMessage, RpcMessageBuilder, RpcMessageRouteType, + TransportConnection, } from '@deepkit/rpc'; import { Logger } from '@deepkit/logger'; import { @@ -36,6 +36,7 @@ import { BrokerQueuePublish, BrokerQueueResponseHandleMessage, BrokerQueueSubscribe, + brokerResponseGet, brokerResponseGetCache, brokerResponseGetCacheMeta, brokerResponseIncrement, @@ -67,11 +68,11 @@ export class BrokerConnection extends RpcKernelBaseConnection { constructor( logger: Logger, - transportWriter: RpcConnectionWriter, + transportConnection: TransportConnection, protected connections: RpcKernelConnections, protected state: BrokerState, ) { - super(logger, transportWriter, connections); + super(logger, transportConnection, connections); } public close(): void { @@ -261,7 +262,7 @@ export class BrokerConnection extends RpcKernelBaseConnection { ); for (const connection of this.state.invalidationCacheMessageConnections) { - connection.writer.write(message); + connection.write(message); } } response.ack(); @@ -288,7 +289,7 @@ export class BrokerConnection extends RpcKernelBaseConnection { case BrokerType.Get: { const body = message.parseBody(); const v = this.state.getKey(body.n); - response.replyBinary(BrokerType.ResponseGet, v); + response.reply(BrokerType.ResponseGet, { v }); break; } case BrokerType.EnableInvalidationCacheMessages: { @@ -430,7 +431,7 @@ export class BrokerState { ); for (const connection of subscriptions) { - connection.writer.write(message); + connection.write(message); } } @@ -491,7 +492,7 @@ export class BrokerState { m.tries++; m.state = QueueMessageState.inFlight; m.lastError = undefined; - consumer.con.writer.write(createRpcMessage( + consumer.con.write(createRpcMessage( 0, BrokerType.QueueResponseHandleMessage, { c: body.c, v: body.v, id: m.id }, RpcMessageRouteType.server, )); @@ -568,7 +569,7 @@ export class BrokerState { export class BrokerKernel extends RpcKernel { protected state: BrokerState = new BrokerState; - createConnection(writer: RpcConnectionWriter): BrokerConnection { - return new BrokerConnection(this.logger, writer, this.connections, this.state); + createConnection(transport: TransportConnection): BrokerConnection { + return new BrokerConnection(this.logger, transport, this.connections, this.state); } } diff --git a/packages/broker/src/model.ts b/packages/broker/src/model.ts index 5c32c51cf..24614b495 100644 --- a/packages/broker/src/model.ts +++ b/packages/broker/src/model.ts @@ -93,6 +93,10 @@ export interface brokerInvalidateCacheMessage { ttl: number; } +export interface brokerResponseGet { + v?: Uint8Array, +} + export interface brokerResponseGetCache { v?: Uint8Array, ttl?: number, diff --git a/packages/example-app/app.ts b/packages/example-app/app.ts index 8cdc35448..4130a969b 100755 --- a/packages/example-app/app.ts +++ b/packages/example-app/app.ts @@ -46,6 +46,7 @@ const app = new App({ publicDir: 'public', httpLog: true, migrateOnStartup: true, + httpRpcBasePath: 'rpc/v1' }), ] }); diff --git a/packages/example-app/src/controller/rpc.controller.ts b/packages/example-app/src/controller/rpc.controller.ts index a26095e2c..11d73087b 100644 --- a/packages/example-app/src/controller/rpc.controller.ts +++ b/packages/example-app/src/controller/rpc.controller.ts @@ -4,6 +4,16 @@ import { Observable, Subject } from 'rxjs'; @rpc.controller('test-rpc') export class RpcController { + @rpc.action() + hello(): string { + return 'World'; + } + + @rpc.action() + hi(name: string): string { + return `Hi ${name}`; + } + @rpc.action() timesSubject(): Subject { const subject = new Subject(); diff --git a/packages/framework/src/application-server.ts b/packages/framework/src/application-server.ts index b6d05cff6..802ff8e2f 100644 --- a/packages/framework/src/application-server.ts +++ b/packages/framework/src/application-server.ts @@ -347,15 +347,15 @@ export class ApplicationServer { return new RpcClient({ connect(connection) { const kernelConnection = createRpcConnection(context, rpcKernel, { - write: (buffer) => connection.onData(buffer), - close: () => connection.onClose(), + writeBinary: (buffer) => connection.readBinary(buffer), + close: () => connection.onClose('closed'), }); connection.onConnected({ close() { kernelConnection.close(); }, - send(message) { + writeBinary(message: Uint8Array) { queueMicrotask(() => { kernelConnection.feed(message); }); diff --git a/packages/framework/src/module.config.ts b/packages/framework/src/module.config.ts index 3e6cff25b..c8bca7451 100644 --- a/packages/framework/src/module.config.ts +++ b/packages/framework/src/module.config.ts @@ -111,6 +111,12 @@ export class FrameworkConfig { debug: boolean = false; + /** + * @description If set, allows to call RPC methods via HTTP. The value is the base URL for the RPC calls. + * Use e.g. `/rpc/v1` + */ + httpRpcBasePath: string = ''; + debugUrl: string = '_debug'; /** diff --git a/packages/framework/src/module.ts b/packages/framework/src/module.ts index 2644c0cc3..06c45e4b5 100644 --- a/packages/framework/src/module.ts +++ b/packages/framework/src/module.ts @@ -17,7 +17,15 @@ import { DebugDIController } from './cli/debug-di.js'; import { ServerStartController } from './cli/server-start.js'; import { DebugController } from './debug/debug.controller.js'; import { registerDebugHttpController } from './debug/http-debug.controller.js'; -import { http, HttpLogger, HttpModule, HttpRequest, serveStaticListener } from '@deepkit/http'; +import { + http, + HttpLogger, + HttpModule, + HttpRegExp, + HttpRequest, + HttpResponse, + serveStaticListener, +} from '@deepkit/http'; import { InjectorContext, ProviderWithScope, Token } from '@deepkit/injector'; import { BrokerConfig, FrameworkConfig } from './module.config.js'; import { Logger } from '@deepkit/logger'; @@ -37,7 +45,6 @@ import { import { FileStopwatchStore } from './debug/stopwatch/store.js'; import { DebugProfileFramesCommand } from './cli/debug-debug-frames.js'; import { - ConnectionWriter, rpcClass, RpcKernel, RpcKernelBaseConnection, @@ -113,7 +120,6 @@ export class FrameworkModule extends createModule({ { provide: SessionState, scope: 'rpc', useValue: undefined }, { provide: RpcKernelBaseConnection, scope: 'rpc', useValue: undefined }, { provide: RpcKernelConnection, scope: 'rpc', useValue: undefined }, - { provide: ConnectionWriter, scope: 'rpc', useValue: undefined }, ], workflows: [ // rpcWorkflow, @@ -152,7 +158,6 @@ export class FrameworkModule extends createModule({ SessionState, RpcKernelConnection, RpcKernelBaseConnection, - ConnectionWriter, BrokerDeepkitAdapter, BrokerCache, @@ -223,10 +228,46 @@ export class FrameworkModule extends createModule({ // this.setupProvider(LiveDatabase).enableChangeFeed(DebugRequest); } + if (this.config.httpRpcBasePath) { + const rpcBaseUrl = this.config.httpRpcBasePath; + @http.controller(rpcBaseUrl) + class HttpRpcController { + constructor(protected rpcKernel: RpcKernel) { + } + + @http.ANY(':controller/:method') + async handle( + controller: HttpRegExp, + method: string, + request: HttpRequest, + response: HttpResponse, + ) { + const connection = this.rpcKernel.createConnection({ + write: (data) => { + + }, + bufferedAmount() { + return 0; + }, + close() { + + }, + clientAddress() { + return request.socket.remoteAddress || ''; + }, + }); + request.body = await request.readBody(); + await connection.onRequest(rpcBaseUrl, request, response); + return response; + } + } + this.addController(HttpRpcController); + } + const disconnect = async (event: unknown, broker: DebugBrokerBus, store: StopwatchStore) => { await store.close(); await broker.adapter.disconnect(); - } + }; this.addListener(onAppShutdown.listen(disconnect)); // Registering at onServerShutdown also so that ApplicationServer.close disconnects all connections. this.addListener(onServerShutdown.listen(disconnect)); diff --git a/packages/framework/src/rpc.ts b/packages/framework/src/rpc.ts index 96b816ff0..2c6f62bee 100644 --- a/packages/framework/src/rpc.ts +++ b/packages/framework/src/rpc.ts @@ -11,7 +11,6 @@ import { InjectorContext } from '@deepkit/injector'; import { rpcActionType, - RpcConnectionWriter, RpcControllerAccess, RpcKernel, RpcKernelBaseConnection, @@ -19,6 +18,7 @@ import { RpcMessage, RpcMessageBuilder, RpcServerAction, + TransportConnection, } from '@deepkit/rpc'; import { FrameCategory, Stopwatch } from '@deepkit/stopwatch'; import { ClassType } from '@deepkit/core'; @@ -89,8 +89,8 @@ export class RpcKernelWithStopwatch extends RpcKernel { stopwatch?: Stopwatch; - createConnection(writer: RpcConnectionWriter, injector?: InjectorContext): RpcKernelBaseConnection { - const connection = super.createConnection(writer, injector); + createConnection(transport: TransportConnection, injector?: InjectorContext): RpcKernelBaseConnection { + const connection = super.createConnection(transport, injector); if (this.stopwatch && connection instanceof RpcKernelConnectionWithStopwatch) { connection.setStopwatch(this.stopwatch); } diff --git a/packages/framework/src/worker.ts b/packages/framework/src/worker.ts index 1c6341f5b..3eebc1703 100644 --- a/packages/framework/src/worker.ts +++ b/packages/framework/src/worker.ts @@ -9,12 +9,11 @@ */ import { - ConnectionWriter, - RpcConnectionWriter, RpcKernel, RpcKernelBaseConnection, RpcKernelConnection, SessionState, + TransportConnection, } from '@deepkit/rpc'; import http, { Server } from 'http'; import https from 'https'; @@ -105,7 +104,7 @@ export interface RpcServerListener { } export interface RpcServerCreateConnection { - (writer: RpcConnectionWriter, request?: HttpRequest): RpcKernelBaseConnection; + (transport: TransportConnection, request?: HttpRequest): RpcKernelBaseConnection; } export interface RpcServerOptions { @@ -177,13 +176,12 @@ export class WebMemoryWorkerFactory extends WebWorkerFactory { } } -export function createRpcConnection(rootScopedContext: InjectorContext, rpcKernel: RpcKernel, writer: RpcConnectionWriter, request?: HttpRequest) { +export function createRpcConnection(rootScopedContext: InjectorContext, rpcKernel: RpcKernel, transport: TransportConnection, request?: HttpRequest) { const injector = rootScopedContext.createChildScope('rpc'); injector.set(HttpRequest, request); injector.set(RpcInjectorContext, injector); - injector.set(ConnectionWriter, writer); - const connection = rpcKernel.createConnection(writer, injector); + const connection = rpcKernel.createConnection(transport, injector); injector.set(SessionState, connection.sessionState); injector.set(RpcKernelConnection, connection); injector.set(RpcKernelBaseConnection, connection); @@ -298,21 +296,21 @@ export class WebWorker { private startRpc() { if (this.server) { - this.rpcListener = this.rpcServer.start({ server: this.server }, (writer: RpcConnectionWriter, request?: HttpRequest) => { + this.rpcListener = this.rpcServer.start({ server: this.server }, (transport, request?: HttpRequest) => { if (this.shuttingDown) { - writer.close(); + transport.close(); throw new Error('Server is shutting down'); } - return createRpcConnection(this.injectorContext, this.rpcKernel, writer, request); + return createRpcConnection(this.injectorContext, this.rpcKernel, transport, request); }); } if (this.servers) { - this.rpcListener = this.rpcServer.start({ server: this.servers }, (writer: RpcConnectionWriter, request?: HttpRequest) => { + this.rpcListener = this.rpcServer.start({ server: this.servers }, (transport, request?: HttpRequest) => { if (this.shuttingDown) { - writer.close(); + transport.close(); throw new Error('Server is shutting down'); } - return createRpcConnection(this.injectorContext, this.rpcKernel, writer, request); + return createRpcConnection(this.injectorContext, this.rpcKernel, transport, request); }); } } diff --git a/packages/rpc-tcp/src/client.ts b/packages/rpc-tcp/src/client.ts index 41e9c485a..059247c6d 100644 --- a/packages/rpc-tcp/src/client.ts +++ b/packages/rpc-tcp/src/client.ts @@ -1,5 +1,5 @@ import { parseHost } from '@deepkit/core'; -import { ClientTransportAdapter, TransportConnectionHooks } from '@deepkit/rpc'; +import { ClientTransportAdapter, TransportClientConnection } from '@deepkit/rpc'; import { connect } from 'net'; /* @@ -14,7 +14,7 @@ export class RpcTcpClientAdapter implements ClientTransportAdapter { this.host = parseHost(host); } - public async connect(connection: TransportConnectionHooks) { + public async connect(connection: TransportClientConnection) { const port = this.host.port || 8811; const socket = this.host.isUnixSocket ? connect({ path: this.host.unixSocket }) : connect({ port: port, @@ -22,11 +22,11 @@ export class RpcTcpClientAdapter implements ClientTransportAdapter { }); socket.on('data', (data: Uint8Array) => { - connection.onData(data); + connection.readBinary(data); }); socket.on('close', () => { - connection.onClose(); + connection.onClose('socket closed'); }); socket.on('error', (error: any) => { @@ -46,7 +46,7 @@ export class RpcTcpClientAdapter implements ClientTransportAdapter { close() { socket.end(); }, - send(message) { + writeBinary(message) { socket.write(message); } }); diff --git a/packages/rpc-tcp/src/server.ts b/packages/rpc-tcp/src/server.ts index 9ba7c061e..6abc34223 100644 --- a/packages/rpc-tcp/src/server.ts +++ b/packages/rpc-tcp/src/server.ts @@ -1,5 +1,5 @@ import { asyncOperation, ParsedHost, parseHost } from '@deepkit/core'; -import { RpcKernel } from '@deepkit/rpc'; +import { RpcKernel, RpcMessageDefinition } from '@deepkit/rpc'; import { existsSync, mkdirSync, unlinkSync } from 'fs'; import { createServer, Server, Socket } from 'net'; import type { ServerOptions as WebSocketServerOptions } from 'ws'; @@ -39,8 +39,8 @@ export class RpcTcpServer { this.server.on('connection', (socket: Socket) => { const connection = this.kernel?.createConnection({ - write(b: Uint8Array) { - socket.write(b); + write(b: RpcMessageDefinition) { + connection!.sendBinary(b, (data) => socket.write(data)); }, clientAddress(): string { return socket.remoteAddress || ''; @@ -104,8 +104,8 @@ export class RpcWebSocketServer { this.server.on('connection', (ws, req: IncomingMessage) => { const connection = this.kernel?.createConnection({ - write(b) { - ws.send(b); + writeBinary(message) { + ws.send(message); }, close() { ws.close(); diff --git a/packages/rpc/index.ts b/packages/rpc/index.ts index 158f76b50..48bb4d75c 100644 --- a/packages/rpc/index.ts +++ b/packages/rpc/index.ts @@ -21,4 +21,5 @@ export * from './src/collection.js'; export * from './src/decorators.js'; export * from './src/model.js'; export * from './src/protocol.js'; -export * from './src/writer.js'; +export * from './src/progress.js'; +export * from './src/transport.js'; diff --git a/packages/rpc/src/client/action.ts b/packages/rpc/src/client/action.ts index c27ce082b..b090c9e14 100644 --- a/packages/rpc/src/client/action.ts +++ b/packages/rpc/src/client/action.ts @@ -27,7 +27,7 @@ import { WrappedV, } from '../model.js'; import { rpcDecodeError, RpcMessage } from '../protocol.js'; -import { ClientProgress } from '../writer.js'; +import { ClientProgress } from '../progress.js'; import type { WritableClient } from './client.js'; import { EntityState, EntitySubjectStore } from './entity-state.js'; import { assertType, deserializeType, ReflectionKind, Type, TypeObjectLiteral, typeOf } from '@deepkit/type'; diff --git a/packages/rpc/src/client/client-direct.ts b/packages/rpc/src/client/client-direct.ts index da544e82a..f0228512c 100644 --- a/packages/rpc/src/client/client-direct.ts +++ b/packages/rpc/src/client/client-direct.ts @@ -9,8 +9,9 @@ */ import { RpcKernel } from '../server/kernel.js'; -import { ClientTransportAdapter, RpcClient, TransportConnectionHooks } from './client.js'; +import { ClientTransportAdapter, RpcClient } from './client.js'; import { InjectorContext } from '@deepkit/injector'; +import { TransportClientConnection } from '../transport.js'; export class DirectClient extends RpcClient { constructor(rpcKernel: RpcKernel, injector?: InjectorContext) { @@ -22,11 +23,11 @@ export class RpcDirectClientAdapter implements ClientTransportAdapter { constructor(public rpcKernel: RpcKernel, protected injector?: InjectorContext) { } - public async connect(connection: TransportConnectionHooks) { + public async connect(connection: TransportClientConnection) { const kernelConnection = this.rpcKernel.createConnection({ - write: (buffer) => connection.onData(buffer), + writeBinary: (buffer) => connection.readBinary(buffer), close: () => { - connection.onClose(); + connection.onClose('closed'); }, }, this.injector); @@ -40,9 +41,9 @@ export class RpcDirectClientAdapter implements ClientTransportAdapter { close() { kernelConnection.close(); }, - send(buffer) { + writeBinary(buffer) { kernelConnection.feed(buffer); - } + }, }); } } @@ -61,15 +62,15 @@ export class RpcAsyncDirectClientAdapter implements ClientTransportAdapter { constructor(public rpcKernel: RpcKernel, protected injector?: InjectorContext) { } - public async connect(connection: TransportConnectionHooks) { + public async connect(connection: TransportClientConnection) { const kernelConnection = this.rpcKernel.createConnection({ - write: (buffer) => { + writeBinary: (buffer) => { setTimeout(() => { - connection.onData(buffer); + connection.readBinary(buffer); }); }, close: () => { - connection.onClose(); + connection.onClose('closed'); }, }, this.injector); @@ -83,11 +84,11 @@ export class RpcAsyncDirectClientAdapter implements ClientTransportAdapter { close() { kernelConnection.close(); }, - send(buffer) { + writeBinary(buffer) { setTimeout(() => { kernelConnection.feed(buffer); }); - } + }, }); } } diff --git a/packages/rpc/src/client/client-websocket.ts b/packages/rpc/src/client/client-websocket.ts index ce88d6325..acc541c4f 100644 --- a/packages/rpc/src/client/client-websocket.ts +++ b/packages/rpc/src/client/client-websocket.ts @@ -9,7 +9,8 @@ */ import { ClassType } from '@deepkit/core'; -import { ClientTransportAdapter, RpcClient, TransportConnectionHooks } from './client.js'; +import { ClientTransportAdapter, RpcClient } from './client.js'; +import { TransportClientConnection } from '../transport.js'; /** * A RpcClient that connects via WebSocket transport. @@ -79,7 +80,7 @@ export class RpcWebSocketClientAdapter implements ClientTransportAdapter { return this.webSocketConstructor; } - public async connect(connection: TransportConnectionHooks) { + public async connect(connection: TransportClientConnection) { const webSocketConstructor = await this.getWebSocketConstructor(); try { @@ -90,23 +91,23 @@ export class RpcWebSocketClientAdapter implements ClientTransportAdapter { } } - protected mapSocket(socket: WebSocket, connection: TransportConnectionHooks) { + protected mapSocket(socket: WebSocket, connection: TransportClientConnection) { socket.binaryType = 'arraybuffer'; socket.onmessage = (event: MessageEvent) => { - connection.onData(new Uint8Array(event.data)); + connection.readBinary(new Uint8Array(event.data)); }; let errored = false; let connected = false; socket.onclose = (event) => { + const reason = `code ${event.code} reason ${event.reason || 'unknown'}`; + const message = connected ? `abnormal error: ${reason}` : `Could not connect: ${reason}`; if (errored) { - const reason = `code ${event.code} reason ${event.reason || 'unknown'}`; - const message = connected ? `abnormal error: ${reason}` : `Could not connect: ${reason}`; connection.onError(new Error(message)); } else { - connection.onClose(); + connection.onClose(reason); } }; @@ -127,7 +128,7 @@ export class RpcWebSocketClientAdapter implements ClientTransportAdapter { close() { socket.close(); }, - send(message) { + writeBinary(message) { socket.send(message); } }); diff --git a/packages/rpc/src/client/client.ts b/packages/rpc/src/client/client.ts index 28316c9d1..f2fd4d66b 100644 --- a/packages/rpc/src/client/client.ts +++ b/packages/rpc/src/client/client.ts @@ -24,14 +24,22 @@ import { createRpcMessage, createRpcMessagePeer, ErroredRpcMessage, + RpcBinaryMessageReader, RpcMessage, - RpcMessageReader, + RpcMessageDefinition, RpcMessageRouteType, } from '../protocol.js'; import { RpcKernel, RpcKernelConnection } from '../server/kernel.js'; -import { ClientProgress, RpcMessageWriter, RpcMessageWriterOptions, SingleProgress } from '../writer.js'; +import { ClientProgress, SingleProgress } from '../progress.js'; import { RpcActionClient, RpcControllerState } from './action.js'; import { RpcMessageSubject } from './message-subject.js'; +import { + createWriter, + TransportClientConnection, + TransportConnection, + TransportMessageWriter, + TransportOptions, +} from '../transport.js'; export class OfflineError extends Error { constructor(message: string = 'Offline') { @@ -54,28 +62,20 @@ export interface ObservableDisconnect { export type DisconnectableObservable = Observable & ObservableDisconnect; -export interface TransportConnection { - send(message: Uint8Array): void; - - bufferedAmount?(): number; - - clientAddress?(): string; - - close(): void; -} - -export interface TransportConnectionHooks { - onConnected(transportConnection: TransportConnection): void; - - onClose(): void; - - onData(buffer: Uint8Array, bytes?: number): void; +export interface ClientTransportAdapter { + connect(connection: TransportClientConnection): Promise | void; - onError(error: Error): void; -} + /** + * Whether ClientId call is needed to get a client id. + * This is disabled for http adapter. + */ + supportsPeers?(): boolean; -export interface ClientTransportAdapter { - connect(connection: TransportConnectionHooks): Promise | void; + /** + * Whether Authentication call is needed to authenticate the client. + * This is disabled for http adapter (Authorization header is used). + */ + supportsAuthentication?(): boolean; } export interface WritableClient { @@ -116,8 +116,8 @@ export class RpcClientTransporter { protected connectionPromise?: Promise; protected connected = false; - protected writer?: RpcMessageWriter; - public writerOptions: RpcMessageWriterOptions = new RpcMessageWriterOptions; + protected writer?: TransportMessageWriter; + public writerOptions: TransportOptions = new TransportOptions(); public id?: Uint8Array; @@ -143,12 +143,10 @@ export class RpcClientTransporter { */ public readonly errored = new Subject<{ connectionId: number, error: Error }>(); - public reader = new RpcMessageReader( + public reader = new RpcBinaryMessageReader( (v) => this.onMessage(v), (id) => { - if (this.writer) { - this.writer.write(createRpcMessage(id, RpcTypes.ChunkAck)); - } + this.writer!(createRpcMessage(id, RpcTypes.ChunkAck), this.writerOptions); }, ); @@ -207,7 +205,7 @@ export class RpcClientTransporter { return undefined; } - public async onAuthenticate(): Promise { + public async onAuthenticate(token?: any): Promise { } public onMessage(message: RpcMessage) { @@ -226,7 +224,7 @@ export class RpcClientTransporter { this.onDisconnect(); } - protected async doConnect(): Promise { + protected async doConnect(token?: any): Promise { this.connectionTries++; if (this.transportConnection) { @@ -237,29 +235,22 @@ export class RpcClientTransporter { return asyncOperation(async (resolve, reject) => { try { await this.transport.connect({ + token, + onClose: () => { this.onDisconnect(); }, onConnected: async (transport: TransportConnection) => { this.transportConnection = transport; - this.writer = new RpcMessageWriter({ - write(v) { - transport.send(v); - }, - close() { - transport.close(); - }, - clientAddress: transport.clientAddress ? () => transport.clientAddress!() : undefined, - bufferedAmount: transport.bufferedAmount ? () => transport.bufferedAmount!() : undefined, - }, this.reader, this.writerOptions); + this.writer = createWriter(transport, this.writerOptions, this.reader); this.connected = false; this.connectionTries = 0; try { this.id = await this.onHandshake(); - await this.onAuthenticate(); + await this.onAuthenticate(token); } catch (error) { this.connected = false; this.connectionTries = 0; @@ -277,7 +268,11 @@ export class RpcClientTransporter { reject(new OfflineError(`Could not connect: ${formatError(error)}`)); }, - onData: (buffer: Uint8Array, bytes?: number) => { + read: (message: RpcMessage) => { + this.onMessage(message); + }, + + readBinary: (buffer: Uint8Array, bytes?: number) => { this.reader.feed(buffer, bytes); }, }); @@ -290,7 +285,7 @@ export class RpcClientTransporter { /** * Simply connect with login using the token, without auto re-connect. */ - public async connect(): Promise { + public async connect(token?: any): Promise { while (this.connectionPromise) { await this.connectionPromise; await sleep(0.01); @@ -300,7 +295,7 @@ export class RpcClientTransporter { return; } - this.connectionPromise = this.doConnect(); + this.connectionPromise = this.doConnect(token); try { await this.connectionPromise; @@ -309,13 +304,13 @@ export class RpcClientTransporter { } } - public send(message: Uint8Array, progress?: SingleProgress) { + public send(message: RpcMessageDefinition, progress?: SingleProgress) { if (this.writer === undefined) { throw new Error('Transport connection not created yet'); } try { - this.writer.write(message, progress); + this.writer(message, this.writerOptions, progress); } catch (error: any) { throw new OfflineError(error); } @@ -403,17 +398,18 @@ export class RpcBaseClient implements WritableClient { * If you use controllers in this callback, make sure to use dontWaitForConnection=true, otherwise you get an endless loop. * * ```typescript - * async onAuthenticate(): Promise { + * async onAuthenticate(token?: any): Promise { * const auth = this.controller('auth', {dontWaitForConnection: true}); * const result = auth.login('username', 'password'); * if (!result) throw new AuthenticationError('Authentication failed); * } * ``` */ - protected async onAuthenticate(): Promise { - if (!this.token.has()) return; + protected async onAuthenticate(token?: any): Promise { + if (undefined === token) return; + if (this.transport.supportsPeers && !this.transport.supportsPeers()) return; - const reply: RpcMessage = await this.sendMessage(RpcTypes.Authenticate, { token: this.token.get()! }, undefined, { dontWaitForConnection: true }) + const reply: RpcMessage = await this.sendMessage(RpcTypes.Authenticate, { token }, undefined, { dontWaitForConnection: true }) .waitNextMessage(); if (reply.isError()) throw reply.getError(); @@ -517,7 +513,7 @@ export class RpcBaseClient implements WritableClient { this.transporter.send(message, progress?.upload); } else { - this.transporter.connect().then( + this.connect().then( () => { //this.getId() only now available const message = options && options.peerId @@ -543,7 +539,7 @@ export class RpcBaseClient implements WritableClient { } async connect(): Promise { - await this.transporter.connect(); + await this.transporter.connect(this.token.get()); return this; } @@ -577,8 +573,9 @@ export class RpcClient extends RpcBaseClient { protected peerConnections = new Map(); - protected async onHandshake(): Promise { + protected async onHandshake(): Promise { this.clientKernelConnection = undefined; + if (this.transport.supportsPeers && !this.transport.supportsPeers()) return; const reply = await this.sendMessage(RpcTypes.ClientId, undefined, undefined, { dontWaitForConnection: true }) .firstThenClose(RpcTypes.ClientIdResponse); @@ -606,7 +603,7 @@ export class RpcClient extends RpcBaseClient { if (connection) connection.close(); this.peerKernelConnection.delete(peerId); }, - write: (answer: Uint8Array) => { + write: (answer: RpcMessageDefinition) => { //should we modify the package? this.transporter.send(answer); }, @@ -626,7 +623,7 @@ export class RpcClient extends RpcBaseClient { if (message.routeType === RpcMessageRouteType.server && this.clientKernel) { if (!this.clientKernelConnection) { const c = this.clientKernel.createConnection({ - write: (answer: Uint8Array) => { + write: (answer: RpcMessageDefinition) => { this.transporter.send(answer); }, close: () => { @@ -641,7 +638,7 @@ export class RpcClient extends RpcBaseClient { }); // Important to disable since transporter.send chunks already, // otherwise data is chunked twice and protocol breaks. - c.writerOptions.chunkSize = 0; + c.transportOptions.chunkSize = 0; if (!(c instanceof RpcKernelConnection)) throw new Error('Expected RpcKernelConnection from clientKernel.createConnection'); this.clientKernelConnection = c; } diff --git a/packages/rpc/src/client/http.ts b/packages/rpc/src/client/http.ts new file mode 100644 index 000000000..a42efb3b4 --- /dev/null +++ b/packages/rpc/src/client/http.ts @@ -0,0 +1,87 @@ +import { ClientTransportAdapter } from './client.js'; +import { TransportClientConnection } from '../transport.js'; +import { RpcMessageDefinition } from '../protocol.js'; +import { RpcTypes } from '../model.js'; +import { HttpRpcMessage } from '../server/http.js'; +import { serialize } from '@deepkit/type'; + +export class RpcHttpClientAdapter implements ClientTransportAdapter { + constructor(public url: string, public headers: { [name: string]: string } = {}) { + this.url = url.endsWith('/') ? url.slice(0, -1) : url; + } + + supportsPeers() { + //no ClientId call + return false; + } + + supportsAuthentication() { + return false; + } + + async connect(connection: TransportClientConnection): Promise { + // http transporter does not connect, it waits for the first message to connect + connection.onConnected({ + write: async (message: RpcMessageDefinition, options) => { + const qs: string[] = []; + let path = ''; + let method = 'GET'; + let body: any = undefined; + + if (message.type === RpcTypes.ActionType) { + if (!message.body) throw new Error('No body given'); + const body = message.body.body as { controller: string, method: string, }; + path = body.controller + '/' + encodeURIComponent(body.method); + method = 'OPTIONS'; + } else if (message.type === RpcTypes.Action) { + if (!message.body) throw new Error('No body given'); + const messageBody = serialize(message.body.body, undefined, undefined, undefined, message.body.type) as { + controller: string, + method: string, + args: any[] + }; + path = messageBody.controller + '/' + encodeURIComponent(messageBody.method); + const allPrimitive = messageBody.args.every(v => ['string', 'number', 'boolean', 'bigint'].includes(typeof v)); + if (allPrimitive) { + for (const a of messageBody.args) { + qs.push('arg=' + encodeURIComponent(JSON.stringify(a))); + } + method = 'GET'; + } else { + body = JSON.stringify(messageBody.args); + method = 'POST'; + } + } else { + throw new Error('Unsupported message type ' + message.type + ' for Http adapter'); + } + + const res = await fetch(this.url + '/' + path + '?' + qs.join('&'), { + headers: Object.assign({ + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Authorization': String(connection.token) + }, this.headers), + method, + body, + }); + + const type = Number(res.headers.get('X-Message-Type')); + const composite = 'true' === res.headers.get('X-Message-Composite'); + const routeType = Number(res.headers.get('X-Message-RouteType')); + let json = await res.json(); + if (type === RpcTypes.ResponseActionSimple) { + json = {v: json}; + } + connection.read(new HttpRpcMessage(message.id, composite, type, routeType, {}, json)); + }, + bufferedAmount(): number { + return 0; + }, + close(): void { + }, + clientAddress(): string { + return ''; + }, + }); + } +} diff --git a/packages/rpc/src/model.ts b/packages/rpc/src/model.ts index cbf3f8347..a9d964c00 100644 --- a/packages/rpc/src/model.ts +++ b/packages/rpc/src/model.ts @@ -24,12 +24,6 @@ export interface IdVersionInterface extends IdInterface { version: number; } -export class ConnectionWriter { - write(buffer: Uint8Array) { - - } -} - export class StreamBehaviorSubject extends BehaviorSubject { public readonly appendSubject = new Subject(); protected nextChange?: Subject; diff --git a/packages/rpc/src/progress.ts b/packages/rpc/src/progress.ts new file mode 100644 index 000000000..b5d59510d --- /dev/null +++ b/packages/rpc/src/progress.ts @@ -0,0 +1,115 @@ +/* + * Deepkit Framework + * Copyright (C) 2021 Deepkit UG, Marc J. Schmidt + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the MIT License. + * + * You should have received a copy of the MIT License along with this program. + */ + +import { BehaviorSubject, Subject, Subscriber, Subscription, SubscriptionLike } from 'rxjs'; + +export class SingleProgress extends Subject { + public done = false; + + public total = 0; + public current = 0; + public stats = 0; + + protected lastTime = 0; + + protected triggerFinished?: Function; + finished = new Promise((resolve) => { + this.triggerFinished = resolve; + }); + + constructor() { + super(); + } + + /** + * Acts like a BehaviorSubject. + */ + _subscribe(subscriber: Subscriber): Subscription { + //Subject does not expose protected _subscribe anymore, so we have to use prototype directly + const subscription = (Subject as any).prototype._subscribe.apply(this, [subscriber]); + if (subscription && !(subscription).closed) { + subscriber.next(this); + } + return subscription; + } + + public setStart(total: number) { + this.total = total; + this.lastTime = Date.now(); + } + + + public setBatch(size: number) { + this.current += size; + this.lastTime = Date.now(); + } + + get progress(): number { + if (this.done) return 1; + if (this.total === 0) return 0; + return this.current / this.total; + } + + set(total: number, current: number) { + if (this.done) return; + this.total = total; + this.current = current; + this.done = total === current; + this.stats++; + this.next(this); + if (this.done) { + this.complete(); + if (this.triggerFinished) this.triggerFinished(); + } + } +} + +export class Progress extends BehaviorSubject { + public readonly upload = new SingleProgress; + public readonly download = new SingleProgress; + + constructor() { + super(0); + } +} + +export class ClientProgress { + static nextProgress?: Progress; + + /** + * Returns the current stack and sets a new one. + */ + static getNext(): Progress | undefined { + if (ClientProgress.nextProgress) { + const old = ClientProgress.nextProgress; + ClientProgress.nextProgress = undefined; + return old; + } + return undefined; + } + + /** + * Sets up a new Progress object for the next API request to be made. + * Only the very next API call will be tracked. + * + * @example + * ```typescript + * + * ClientProgress.track(); + * await api.myMethod(); + * + * ``` + */ + static track(): Progress { + const progress = new Progress; + ClientProgress.nextProgress = progress; + return progress; + } +} diff --git a/packages/rpc/src/protocol.ts b/packages/rpc/src/protocol.ts index 27dbcd2a8..3a545efb3 100644 --- a/packages/rpc/src/protocol.ts +++ b/packages/rpc/src/protocol.ts @@ -17,8 +17,16 @@ import { } from '@deepkit/bson'; import { ClassType } from '@deepkit/core'; import { rpcChunk, rpcError, RpcTypes } from './model.js'; -import type { SingleProgress } from './writer.js'; -import { deserialize, ReceiveType, ReflectionClass, serialize, Type, typeSettings } from '@deepkit/type'; +import type { SingleProgress } from './progress.js'; +import { + deserialize, + ReceiveType, + ReflectionClass, + resolveReceiveType, + serialize, + Type, + typeSettings, +} from '@deepkit/type'; export const enum RpcMessageRouteType { client = 0, @@ -88,8 +96,8 @@ export class RpcMessage { public composite: boolean, public type: number, public routeType: RpcMessageRouteType, - public bodyOffset: number, - public bodySize: number, + public bodyOffset: number = 0, + public bodySize: number = 0, public buffer?: Uint8Array, ) { } @@ -103,7 +111,10 @@ export class RpcMessage { body: this.bodySize ? this.parseGenericBody() : undefined, messages: this.composite ? this.getBodies().map(message => { return { - id: message.id, type: message.type, date: new Date, body: message.bodySize ? message.parseGenericBody() : undefined, + id: message.id, + type: message.type, + date: new Date, + body: message.bodySize ? message.parseGenericBody() : undefined, }; }) : [], }; @@ -201,7 +212,7 @@ export class ErroredRpcMessage extends RpcMessage { } } -export function readRpcMessage(buffer: Uint8Array): RpcMessage { +export function readBinaryRpcMessage(buffer: Uint8Array): RpcMessage { const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength); const size = view.getUint32(0, true); if (size !== buffer.byteLength) throw new Error(`Message buffer size wrong. Message size=${size}, buffer size=${buffer.byteLength}`); @@ -237,10 +248,21 @@ export function createRpcCompositeMessage( type: number, messages: RpcCreateMessageDef[], routeType: RpcMessageRouteType.client | RpcMessageRouteType.server = RpcMessageRouteType.client, -): Uint8Array { +): RpcMessageDefinition { + return { + id, + type, + routeType, + composite: messages, + }; +} + +export function serializeBinaryRpcCompositeMessage(message: RpcMessageDefinition): Uint8Array { + if (!message.composite) throw new Error('No messages set'); + let bodySize = 0; - for (const message of messages) { - bodySize += 4 + 1 + (message.schema && message.body ? getBSONSizer(undefined, message.schema)(message.body) : 0); + for (const sub of message.composite) { + bodySize += 4 + 1 + (sub.schema && sub.body ? getBSONSizer(undefined, sub.schema)(sub.body) : 0); } // [routeData] @@ -249,35 +271,50 @@ export function createRpcCompositeMessage( const writer = new Writer(createBuffer(messageSize)); writer.writeUint32(messageSize); writer.writeByte(1); //version - writer.writeUint32(id); + writer.writeUint32(message.id); - writer.writeByte(routeType); + writer.writeByte(message.routeType); writer.writeByte(1); - writer.writeByte(type); + writer.writeByte(message.type); - for (const message of messages) { - writer.writeUint32(message.schema && message.body ? getBSONSizer(undefined, message.schema)(message.body) : 0); - writer.writeByte(message.type); //type + for (const sub of message.composite) { + writer.writeUint32(sub.schema && sub.body ? getBSONSizer(undefined, sub.schema)(sub.body) : 0); + writer.writeByte(sub.type); //type - if (message.schema && message.body) { + if (sub.schema && sub.body) { //BSON object contain already their size at the beginning - getBSONSerializer(undefined, message.schema)(message.body, { writer }); + getBSONSerializer(undefined, sub.schema)(sub.body, { writer }); } } return writer.buffer; } -export function createRpcCompositeMessageSourceDest( +export function createRpcCompositeMessageSourceDest( id: number, source: Uint8Array, destination: Uint8Array, type: number, messages: RpcCreateMessageDef[], -): Uint8Array { +): RpcMessageDefinition { + return { + id, + type, + routeType: RpcMessageRouteType.sourceDest, + composite: messages, + source, + destination, + }; +} + +export function serializeBinaryRpcCompositeMessageSourceDest(message: RpcMessageDefinition): Uint8Array { + if (!message.composite) throw new Error('No messages set'); + if (!message.source) throw new Error('No source set'); + if (!message.destination) throw new Error('No destination set'); + let bodySize = 0; - for (const message of messages) { - bodySize += 4 + 1 + (message.schema && message.body ? getBSONSizer(undefined, message.schema)(message.body) : 0); + for (const sub of message.composite) { + bodySize += 4 + 1 + (sub.schema && sub.body ? getBSONSizer(undefined, sub.schema)(sub.body) : 0); } // [routeData] @@ -286,76 +323,99 @@ export function createRpcCompositeMessageSourceDest( const writer = new Writer(createBuffer(messageSize)); writer.writeUint32(messageSize); writer.writeByte(1); //version - writer.writeUint32(id); + writer.writeUint32(message.id); writer.writeByte(RpcMessageRouteType.sourceDest); - if (source.byteLength !== 16) throw new Error(`Source invalid byteLength of ${source.byteLength}`); - if (destination.byteLength !== 16) throw new Error(`Destination invalid byteLength of ${destination.byteLength}`); - writer.writeBuffer(source); - writer.writeBuffer(destination); + if (message.source.byteLength !== 16) throw new Error(`Source invalid byteLength of ${message.source.byteLength}`); + if (message.destination.byteLength !== 16) throw new Error(`Destination invalid byteLength of ${message.destination.byteLength}`); + writer.writeBuffer(message.source); + writer.writeBuffer(message.destination); writer.writeByte(1); //composite=true - writer.writeByte(type); + writer.writeByte(message.type); - for (const message of messages) { - writer.writeUint32(message.schema && message.body ? getBSONSizer(undefined, message.schema)(message.body) : 0); - writer.writeByte(message.type); //type + for (const sub of message.composite) { + writer.writeUint32(sub.schema && sub.body ? getBSONSizer(undefined, sub.schema)(sub.body) : 0); + writer.writeByte(sub.type); //type - if (message.schema && message.body) { + if (sub.schema && sub.body) { //BSON object contain already their size at the beginning - getBSONSerializer(undefined, message.schema)(message.body, { writer }); + getBSONSerializer(undefined, sub.schema)(sub.body, { writer }); } } return writer.buffer; } +export interface RpcMessageDefinition { + id: number; + type: number; + routeType: RpcMessageRouteType; + composite?: RpcCreateMessageDef[]; + peerId?: string; + source?: Uint8Array; + destination?: Uint8Array; + body?: { + type: Type; + body: any; + }; +} + export function createRpcMessage( id: number, type: number, body?: T, routeType: RpcMessageRouteType.client | RpcMessageRouteType.server = RpcMessageRouteType.client, schema?: ReceiveType, -): Uint8Array { - const bodySize = schema && body ? getBSONSizer(undefined, schema)(body) : 0; - // [routeData] - const messageSize = 4 + 1 + 4 + 1 + 1 + 1 + bodySize; +): RpcMessageDefinition { + return { + id, + type, + routeType, + body: body && { + type: resolveReceiveType(schema), + body, + }, + }; - const writer = new Writer(createBuffer(messageSize)); - writer.writeUint32(messageSize); - writer.writeByte(1); //version - writer.writeUint32(id); +} - writer.writeByte(routeType); - writer.writeByte(0); //composite=false - writer.writeByte(type); +export function serializeBinaryRpcMessage(message: RpcMessageDefinition): Uint8Array { + if (message.composite) { + if (message.routeType === RpcMessageRouteType.sourceDest) { + return serializeBinaryRpcCompositeMessageSourceDest(message); + } + return serializeBinaryRpcCompositeMessage(message); + } - if (schema && body) { - const offset = writer.offset; - const serializer = getBSONSerializer(undefined, schema); - serializer(body, { writer }); + if (message.routeType === RpcMessageRouteType.peer) { + return serializeBinaryRpcMessagePeer(message); } - return writer.buffer; + if (message.routeType === RpcMessageRouteType.sourceDest) { + return serializeBinaryRpcMessageSourceDest(message); + } + + return serializeBinaryRpcMessageSingleBody(message); } -export function createRpcMessageForBody( - id: number, type: number, - body: Uint8Array, - routeType: RpcMessageRouteType.client | RpcMessageRouteType.server = RpcMessageRouteType.client, -): Uint8Array { - const bodySize = body.byteLength; +export function serializeBinaryRpcMessageSingleBody(message: RpcMessageDefinition): Uint8Array { + const bodySize = message.body ? getBSONSizer(undefined, message.body.type)(message.body.body) : 0; // [routeData] const messageSize = 4 + 1 + 4 + 1 + 1 + 1 + bodySize; const writer = new Writer(createBuffer(messageSize)); writer.writeUint32(messageSize); writer.writeByte(1); //version - writer.writeUint32(id); + writer.writeUint32(message.id); - writer.writeByte(routeType); + writer.writeByte(message.routeType); writer.writeByte(0); //composite=false - writer.writeByte(type); + writer.writeByte(message.type); - writer.writeBuffer(body); + if (message.body) { + const offset = writer.offset; + const serializer = getBSONSerializer(undefined, message.body.type); + serializer(message.body.body, { writer }); + } return writer.buffer; } @@ -366,56 +426,91 @@ export function createRpcMessagePeer( peerId: string, body?: T, schema?: ReceiveType, -): Uint8Array { - const bodySize = schema && body ? getBSONSizer(undefined, schema)(body) : 0; +): RpcMessageDefinition { + return { + id, + type, + routeType: RpcMessageRouteType.peer, + source, + peerId, + body: body && { + type: resolveReceiveType(schema), + body, + }, + }; +} + +export function serializeBinaryRpcMessagePeer(message: RpcMessageDefinition): Uint8Array { + if (!message.peerId) throw new Error('No peerId set'); + if (!message.source) throw new Error('No source set'); + + const bodySize = message.body ? getBSONSizer(undefined, message.body.type)(message.body.body) : 0; // [routeData] - const messageSize = 4 + 1 + 4 + 1 + (16 + peerId.length + 1) + 1 + 1 + bodySize; + const messageSize = 4 + 1 + 4 + 1 + (16 + message.peerId.length + 1) + 1 + 1 + bodySize; const writer = new Writer(createBuffer(messageSize)); writer.writeUint32(messageSize); writer.writeByte(1); //version - writer.writeUint32(id); + writer.writeUint32(message.id); writer.writeByte(RpcMessageRouteType.peer); - if (source.byteLength !== 16) throw new Error(`Source invalid byteLength of ${source.byteLength}`); - writer.writeBuffer(source); - writer.writeAsciiString(peerId); + if (message.source.byteLength !== 16) throw new Error(`Source invalid byteLength of ${message.source.byteLength}`); + writer.writeBuffer(message.source); + writer.writeAsciiString(message.peerId); writer.writeNull(); writer.writeByte(0); //composite=false - writer.writeByte(type); + writer.writeByte(message.type); - if (schema && body) getBSONSerializer(undefined, schema)(body, { writer }); + if (message.body) getBSONSerializer(undefined, message.body.type)(message.body.body, { writer }); return writer.buffer; } export function createRpcMessageSourceDest( - id: number, type: number, + id: number, + type: number, source: Uint8Array, destination: Uint8Array, body?: T, schema?: ReceiveType, -): Uint8Array { - const bodySize = schema && body ? getBSONSizer(undefined, schema)(body) : 0; +): RpcMessageDefinition { + return { + id, + type, + routeType: RpcMessageRouteType.sourceDest, + source, + destination, + body: body && { + type: resolveReceiveType(schema), + body, + }, + }; +} + +export function serializeBinaryRpcMessageSourceDest(message: RpcMessageDefinition): Uint8Array { + if (!message.source) throw new Error('No source set'); + if (!message.destination) throw new Error('No destination set'); + + const bodySize = message.body ? getBSONSizer(undefined, message.body.type)(message.body.body) : 0; // [routeData] const messageSize = 4 + 1 + 4 + 1 + (16 + 16) + 1 + 1 + bodySize; const writer = new Writer(createBuffer(messageSize)); writer.writeUint32(messageSize); writer.writeByte(1); //version - writer.writeUint32(id); + writer.writeUint32(message.id); writer.writeByte(RpcMessageRouteType.sourceDest); - if (source.byteLength !== 16) throw new Error(`Source invalid byteLength of ${source.byteLength}`); - if (destination.byteLength !== 16) throw new Error(`Destination invalid byteLength of ${destination.byteLength}`); - writer.writeBuffer(source); - writer.writeBuffer(destination); + if (message.source.byteLength !== 16) throw new Error(`Source invalid byteLength of ${message.source.byteLength}`); + if (message.destination.byteLength !== 16) throw new Error(`Destination invalid byteLength of ${message.destination.byteLength}`); + writer.writeBuffer(message.source); + writer.writeBuffer(message.destination); writer.writeByte(0); //composite=false - writer.writeByte(type); + writer.writeByte(message.type); - if (schema && body) getBSONSerializer(undefined, schema)(body, { writer }); + if (message.body) getBSONSerializer(undefined, message.body.type)(message.body.body, { writer }); return writer.buffer; } @@ -448,11 +543,11 @@ export function createRpcMessageSourceDestForBody( return writer.buffer; } -export class RpcMessageReader { +export class RpcBinaryMessageReader { protected chunks = new Map(); protected progress = new Map(); protected chunkAcks = new Map(); - protected bufferReader = new RpcBufferReader(this.gotMessage.bind(this)); + protected bufferReader = new RpcBinaryBufferReader(this.gotMessage.bind(this)); constructor( protected readonly onMessage: (response: RpcMessage) => void, @@ -473,7 +568,7 @@ export class RpcMessageReader { } protected gotMessage(buffer: Uint8Array) { - const message = readRpcMessage(buffer); + const message = readBinaryRpcMessage(buffer); // console.log('reader got', message.id, RpcTypes[message.type], {routeType: message.routeType, bodySize: message.bodySize, byteLength: buffer.byteLength}); if (message.type === RpcTypes.ChunkAck) { @@ -504,7 +599,7 @@ export class RpcMessageReader { newBuffer.set(buffer, offset); offset += buffer.byteLength; } - const packedMessage = readRpcMessage(newBuffer); + const packedMessage = readBinaryRpcMessage(newBuffer); this.onMessage(packedMessage); } } else { @@ -522,7 +617,7 @@ export function readUint32LE(buffer: Uint8Array, offset: number = 0): number { return buffer[offset] + (buffer[offset + 1] * 2 ** 8) + (buffer[offset + 2] * 2 ** 16) + (buffer[offset + 3] * 2 ** 24); } -export class RpcBufferReader { +export class RpcBinaryBufferReader { protected currentMessage?: Uint8Array; protected currentMessageSize: number = 0; diff --git a/packages/rpc/src/server/http.ts b/packages/rpc/src/server/http.ts new file mode 100644 index 000000000..360759640 --- /dev/null +++ b/packages/rpc/src/server/http.ts @@ -0,0 +1,76 @@ +import { RpcMessage, RpcMessageRouteType } from '../protocol.js'; +import { cast, ReceiveType, resolveReceiveType } from '@deepkit/type'; + +export interface RpcHttpRequest { + headers: { [name: string]: undefined | string | string[] }; + method?: string; + url?: string; + body?: Uint8Array; +} + +export interface RpcHttpResponse { + setHeader(name: string, value: number | string): this; + writeHead(statusCode: number): this; + end(data?: Uint8Array | string): void; +} + +export class HttpRpcMessage extends RpcMessage { + constructor( + public id: number, + public composite: boolean, + public type: number, + public routeType: RpcMessageRouteType, + public headers: RpcHttpRequest['headers'], + public json?: any, + ) { + super(id, composite, type, routeType); + } + + getJson(): any { + return this.json; + } + + getSource(): Uint8Array { + return Buffer.from(String(this.headers['X-Source'])); + } + + getDestination(): Uint8Array { + return Buffer.from(String(this.headers['X-Destination'])); + } + + getError(): Error { + return super.getError(); + } + + isError(): boolean { + return super.isError(); + } + + parseGenericBody(): object { + return this.getJson(); + } + + parseBody(type?: ReceiveType): T { + const json = this.getJson(); + if (!json) { + throw new Error('No body found') + } + return cast(json, undefined, undefined, undefined, resolveReceiveType(type)); + } + + getBodies(): RpcMessage[] { + const json = this.getJson(); + if (!Array.isArray(json)) throw new Error('Expected array of RpcMessage items'); + + const result: RpcMessage[] = []; + for (const item of json) { + result.push(new HttpRpcMessage(this.id, false, item.type, this.routeType, this.headers, item.body)); + } + + return result; + } +} + +// export function createHttpRpcMessage(type: +// +// } diff --git a/packages/rpc/src/server/kernel.ts b/packages/rpc/src/server/kernel.ts index 292128e71..3db1f6f0f 100644 --- a/packages/rpc/src/server/kernel.ts +++ b/packages/rpc/src/server/kernel.ts @@ -9,7 +9,16 @@ */ import { arrayRemoveItem, ClassType, getClassName } from '@deepkit/core'; -import { ReceiveType, ReflectionKind, resolveReceiveType, stringifyUuid, Type, typeOf, writeUuid } from '@deepkit/type'; +import { + ReceiveType, + ReflectionKind, + resolveReceiveType, + serialize, + stringifyUuid, + Type, + typeOf, + writeUuid, +} from '@deepkit/type'; import { RpcMessageSubject } from '../client/message-subject.js'; import { AuthenticationError, @@ -26,16 +35,15 @@ import { createRpcCompositeMessage, createRpcCompositeMessageSourceDest, createRpcMessage, - createRpcMessageForBody, createRpcMessageSourceDest, - createRpcMessageSourceDestForBody, + RpcBinaryMessageReader, RpcCreateMessageDef, rpcEncodeError, RpcMessage, - RpcMessageReader, + RpcMessageDefinition, RpcMessageRouteType, + serializeBinaryRpcMessage, } from '../protocol.js'; -import { RpcMessageWriter, RpcMessageWriterOptions } from '../writer.js'; import { ActionTypes, RpcServerAction } from './action.js'; import { RpcKernelSecurity, SessionState } from './security.js'; import { RpcActionClient, RpcControllerState } from '../client/action.js'; @@ -43,6 +51,16 @@ import { RemoteController } from '../client/client.js'; import { InjectorContext, InjectorModule, NormalizedProvider } from '@deepkit/injector'; import { Logger, LoggerInterface } from '@deepkit/logger'; import { rpcClass } from '../decorators.js'; +import { + createWriter, + RpcBinaryWriter, + TransportBinaryMessageChunkWriter, + TransportConnection, + TransportMessageWriter, + TransportOptions, +} from '../transport.js'; +import { HttpRpcMessage, RpcHttpRequest, RpcHttpResponse } from './http.js'; +import { SingleProgress } from '../progress.js'; const anyType: Type = { kind: ReflectionKind.any }; @@ -57,7 +75,8 @@ export class RpcCompositeMessage { protected logger: Logger, public type: number, protected id: number, - protected writer: RpcConnectionWriter, + protected writer: TransportMessageWriter, + protected transportOptions: TransportOptions, protected clientId?: Uint8Array, protected source?: Uint8Array, protected routeType: RpcMessageRouteType.client | RpcMessageRouteType.server = RpcMessageRouteType.client, @@ -68,9 +87,13 @@ export class RpcCompositeMessage { if (!this.strictSerialization) { receiveType = anyType; } + this.messages.push({ type, schema: receiveType ? resolveReceiveType(receiveType) : undefined, body }); + return this; + } + + write(message: RpcMessageDefinition): void { try { - this.messages.push({ type, schema: receiveType ? resolveReceiveType(receiveType) : undefined, body }); - return this; + this.writer(message, this.transportOptions); } catch (error) { if (this.logValidationErrors) { this.logger.warn(this.errorLabel, error); @@ -82,9 +105,9 @@ export class RpcCompositeMessage { send() { if (this.clientId && this.source) { //we route back accordingly - this.writer.write(createRpcCompositeMessageSourceDest(this.id, this.clientId, this.source, this.type, this.messages)); + this.write(createRpcCompositeMessageSourceDest(this.id, this.clientId, this.source, this.type, this.messages)); } else { - this.writer.write(createRpcCompositeMessage(this.id, this.type, this.messages, this.routeType)); + this.write(createRpcCompositeMessage(this.id, this.type, this.messages, this.routeType)); } } } @@ -99,62 +122,61 @@ export class RpcMessageBuilder { constructor( protected logger: Logger, - protected writer: RpcConnectionWriter, + protected writer: TransportMessageWriter, + protected transportOptions: TransportOptions, protected id: number, protected clientId?: Uint8Array, protected source?: Uint8Array, ) { } - protected messageFactory(type: RpcTypes, schemaOrBody?: ReceiveType | Uint8Array, data?: T): Uint8Array { - if (schemaOrBody instanceof Uint8Array) { - if (this.source && this.clientId) { - //we route back accordingly - return createRpcMessageSourceDestForBody(this.id, type, this.clientId, this.source, schemaOrBody); - } else { - return createRpcMessageForBody(this.id, type, schemaOrBody, this.routeType); - } + protected messageFactory(type: RpcTypes, schemaOrBody?: ReceiveType, data?: T): RpcMessageDefinition { + if (!this.strictSerialization) { + schemaOrBody = anyType; + } + + if (this.source && this.clientId) { + //we route back accordingly + return createRpcMessageSourceDest(this.id, type, this.clientId, this.source, data, schemaOrBody); } else { - if (!this.strictSerialization) { - schemaOrBody = anyType; - } + return createRpcMessage(this.id, type, data, this.routeType, schemaOrBody); + } + } - try { - if (this.source && this.clientId) { - //we route back accordingly - return createRpcMessageSourceDest(this.id, type, this.clientId, this.source, data, schemaOrBody); - } else { - return createRpcMessage(this.id, type, data, this.routeType, schemaOrBody); - } - } catch (error: any) { - if (this.logValidationErrors) { - this.logger.warn(this.errorLabel, error); - } - throw new Error(this.errorLabel + ': ' + error.message); + write(message: RpcMessageDefinition): void { + try { + this.writer(message, this.transportOptions); + } catch (error: any) { + if (this.logValidationErrors) { + this.logger.warn(this.errorLabel, error); } + throw new Error(this.errorLabel + ': ' + error.message); } } ack(): void { - this.writer.write(this.messageFactory(RpcTypes.Ack)); + this.write(this.messageFactory(RpcTypes.Ack)); } error(error: Error | string): void { const extracted = rpcEncodeError(error); - this.writer.write(this.messageFactory(RpcTypes.Error, typeOf(), extracted)); + this.write(this.messageFactory(RpcTypes.Error, typeOf(), extracted)); } reply(type: number, body?: T, receiveType?: ReceiveType): void { - this.writer.write(this.messageFactory(type, receiveType, body)); + this.write(this.messageFactory(type, receiveType, body)); } + /** + * @deprecated + */ replyBinary(type: number, body?: Uint8Array): void { - this.writer.write(this.messageFactory(type, body)); + throw new Error('replyBinary deprecated'); } composite(type: number): RpcCompositeMessage { - const composite = new RpcCompositeMessage(this.logger, type, this.id, this.writer, this.clientId, this.source); + const composite = new RpcCompositeMessage(this.logger, type, this.id, this.writer, this.transportOptions, this.clientId, this.source); composite.strictSerialization = this.strictSerialization; composite.logValidationErrors = this.logValidationErrors; composite.errorLabel = this.errorLabel; @@ -167,7 +189,7 @@ export class RpcMessageBuilder { * A real-life implementation would use an external message-bus, like Redis & co. */ export class RpcPeerExchange { - protected registeredPeers = new Map(); + protected registeredPeers = new Map(); async isRegistered(id: string): Promise { return this.registeredPeers.has(id); @@ -177,7 +199,7 @@ export class RpcPeerExchange { this.registeredPeers.delete('string' === typeof id ? id : stringifyUuid(id)); } - register(id: string | Uint8Array, writer: RpcConnectionWriter): void { + register(id: string | Uint8Array, writer: TransportConnection): void { this.registeredPeers.set('string' === typeof id ? id : stringifyUuid(id), writer); } @@ -190,7 +212,7 @@ export class RpcPeerExchange { console.log('NO writer found for peer', peerId); return; } - writer.write(message.getBuffer()); + if (writer.writeBinary) writer.writeBinary(message.getBuffer()); } if (message.routeType == RpcMessageRouteType.sourceDest) { @@ -204,29 +226,21 @@ export class RpcPeerExchange { //we silently ignore, as a pub/sub would do as well return; } - writer.write(message.getBuffer()); + if (writer.writeBinary) writer.writeBinary(message.getBuffer()); } } } -export interface RpcConnectionWriter { - write(buffer: Uint8Array): void; - - close(): void; - - bufferedAmount?(): number; - - clientAddress?(): string; -} - export abstract class RpcKernelBaseConnection { protected messageId: number = 0; public sessionState = new SessionState(); - protected reader = new RpcMessageReader( + public writer: TransportMessageWriter; + + protected reader = new RpcBinaryMessageReader( this.handleMessage.bind(this), (id: number) => { - this.writer.write(createRpcMessage(id, RpcTypes.ChunkAck)); + this.writer(createRpcMessage(id, RpcTypes.ChunkAck), this.transportOptions); }, ); @@ -235,9 +249,8 @@ export abstract class RpcKernelBaseConnection { protected id: Uint8Array = writeUuid(createBuffer(16)); protected replies = new Map void)>(); - public writerOptions: RpcMessageWriterOptions = new RpcMessageWriterOptions; - - public writer: RpcMessageWriter = new RpcMessageWriter(this.transportWriter, this.reader, this.writerOptions); + public transportOptions: TransportOptions = new TransportOptions(); + protected binaryChunkWriter = new TransportBinaryMessageChunkWriter(this.reader, this.transportOptions); protected timeoutTimers: any[] = []; public readonly onClose: Promise; @@ -245,21 +258,36 @@ export abstract class RpcKernelBaseConnection { constructor( protected logger: Logger, - protected transportWriter: RpcConnectionWriter, + public transportConnection: TransportConnection, protected connections: RpcKernelConnections, ) { + this.writer = createWriter(transportConnection, this.transportOptions, this.reader); + this.connections.connections.push(this); this.onClose = new Promise((resolve) => { this.onCloseResolve = resolve; }); } + write(message: RpcMessageDefinition): void { + this.writer(message, this.transportOptions); + } + + /** + * Serializes the message (binary) and sends it to the client using + * a chunk writer (splitting the message into smaller parts if necessary, + * so they can be tracked). + */ + sendBinary(message: RpcMessageDefinition, writer: RpcBinaryWriter): void { + this.binaryChunkWriter.write(writer, serializeBinaryRpcMessage(message)); + } + clientAddress(): string | undefined { - return this.transportWriter.clientAddress ? this.transportWriter.clientAddress() : undefined; + return this.transportConnection.clientAddress ? this.transportConnection.clientAddress() : undefined; } createMessageBuilder(): RpcMessageBuilder { - return new RpcMessageBuilder(this.logger, this.writer, this.messageId++); + return new RpcMessageBuilder(this.logger, this.writer, this.transportOptions, this.messageId++); } /** @@ -278,7 +306,7 @@ export abstract class RpcKernelBaseConnection { for (const timeout of this.timeoutTimers) clearTimeout(timeout); if (this.onCloseResolve) this.onCloseResolve(); arrayRemoveItem(this.connections.connections, this); - this.writer.close(); + this.transportConnection.close(); } public feed(buffer: Uint8Array, bytes?: number): void { @@ -295,10 +323,14 @@ export abstract class RpcKernelBaseConnection { } } - const response = new RpcMessageBuilder(this.logger, this.writer, message.id); + const response = new RpcMessageBuilder(this.logger, this.writer, this.transportOptions, message.id); this.onMessage(message, response); } + onRequest(basePath: string, request: RpcHttpRequest, response: RpcHttpResponse): void | Promise { + throw new Error('Not supported'); + } + abstract onMessage(message: RpcMessage, response: RpcMessageBuilder): void | Promise; public controller(nameOrDefinition: string | ControllerDefinition, timeoutInSeconds = 60): RemoteController { @@ -323,7 +355,7 @@ export abstract class RpcKernelBaseConnection { //send a message with the same id. Don't use sendMessage() again as this would lead to a memory leak // and a new id generated. We want to use the same id. const message = createRpcMessage(id, type, body, RpcMessageRouteType.server, receiveType); - this.writer.write(message); + this.writer(message, this.transportOptions); }; const subject = new RpcMessageSubject(continuation, () => { @@ -333,7 +365,7 @@ export abstract class RpcKernelBaseConnection { this.replies.set(id, (v: RpcMessage) => subject.next(v)); const message = createRpcMessage(id, type, body, RpcMessageRouteType.server, receiveType); - this.writer.write(message); + this.writer(message, this.transportOptions); return subject; } @@ -342,9 +374,9 @@ export abstract class RpcKernelBaseConnection { export class RpcKernelConnections { public connections: RpcKernelBaseConnection[] = []; - broadcast(buffer: Uint8Array) { + broadcast(buffer: RpcMessageDefinition) { for (const connection of this.connections) { - connection.writer.write(buffer); + connection.writer(buffer, connection.transportOptions); } } } @@ -361,7 +393,7 @@ export class RpcKernelConnection extends RpcKernelBaseConnection { constructor( logger: Logger, - writer: RpcConnectionWriter, + transport: TransportConnection, connections: RpcKernelConnections, protected cache: RpcCache, protected controllers: Map, @@ -369,7 +401,7 @@ export class RpcKernelConnection extends RpcKernelBaseConnection { protected injector: InjectorContext, protected peerExchange: RpcPeerExchange, ) { - super(logger, writer, connections); + super(logger, transport, connections); this.onClose.then(async () => { try { await this.peerExchange.deregister(this.id); @@ -379,14 +411,74 @@ export class RpcKernelConnection extends RpcKernelBaseConnection { } }); //register the current client so it can receive messages - this.peerExchange.register(this.id, this.writer); + this.peerExchange.register(this.id, this.transportConnection); + } + + async onRequest(basePath: string, request: RpcHttpRequest, response: RpcHttpResponse) { + let routeType: any = RpcMessageRouteType.client; + const id = 0; + let source: Uint8Array | undefined = undefined; + if (!basePath.endsWith('/')) basePath += '/'; + if (!basePath.startsWith('/')) basePath = '/' + basePath; + const url = new URL(request.url || '', 'http://localhost/' + basePath); + + try { + const messageResponse = new RpcMessageBuilder(this.logger, (message: RpcMessageDefinition, options: TransportOptions, progress?: SingleProgress) => { + response.setHeader('Content-Type', 'application/json'); + response.setHeader('X-Message-Type', message.type); + response.setHeader('X-Message-Composite', String(!!message.composite)); + response.setHeader('X-Message-RouteType', String(message.routeType)); + response.writeHead(200); + + if (message.body) { + let body = serialize(message.body.body, undefined, undefined, undefined, message.body.type); + if (message.type === RpcTypes.ResponseActionSimple) { + body = body.v; + } + response.end(JSON.stringify(body)); + } + }, this.transportOptions, id, this.id, routeType === RpcMessageRouteType.peer ? source : undefined); + messageResponse.routeType = this.routeType; + + const urlPath = url.pathname.substring(basePath.length); + const lastSlash = urlPath.lastIndexOf('/'); + const base: any = { + controller: urlPath.substring(0, lastSlash), + method: decodeURIComponent(urlPath.substring(lastSlash + 1)), + }; + + if (request.headers['Authorization']) { + const auth = String(request.headers['Authorization']); + const token = auth.startsWith('Bearer ') ? auth.substring(7) : auth; + const session = await this.security.authenticate(token, this); + this.sessionState.setSession(session); + } + + if (request.method === 'OPTIONS') { + await this.actionHandler.handleActionTypes( + new HttpRpcMessage(1, false, RpcTypes.ActionType, RpcMessageRouteType.client, request.headers, base), + messageResponse, + ); + } else { + const body = request.body && request.body.byteLength > 0 ? JSON.parse(Buffer.from(request.body).toString()) : {args: url.searchParams.getAll('arg').map(v => v)}; + base.args = body.args || []; + await this.actionHandler.handleAction( + new HttpRpcMessage(1, false, RpcTypes.Action, RpcMessageRouteType.client, request.headers, base), + messageResponse, + ); + } + } catch (error: any) { + this.logger.error('onRequest failed', error); + response.writeHead(400); + response.end(JSON.stringify({ error: error.message })); + } } async onMessage(message: RpcMessage): Promise { if (message.routeType == RpcMessageRouteType.peer && message.getPeerId() !== this.myPeerId) { // console.log('Redirect peer message', RpcTypes[message.type]); if (!await this.security.isAllowedToSendToPeer(this.sessionState.getSession(), message.getPeerId())) { - new RpcMessageBuilder(this.logger, this.writer, message.id).error(new Error('Access denied')); + new RpcMessageBuilder(this.logger, this.writer, this.transportOptions, message.id).error(new Error('Access denied')); return; } this.peerExchange.redirect(message); @@ -400,12 +492,12 @@ export class RpcKernelConnection extends RpcKernelBaseConnection { } if (message.type === RpcTypes.Ping) { - this.writer.write(createRpcMessage(message.id, RpcTypes.Pong)); + this.writer(createRpcMessage(message.id, RpcTypes.Pong), this.transportOptions); return; } //all outgoing replies need to be routed to the source via sourceDest messages. - const response = new RpcMessageBuilder(this.logger, this.writer, message.id, this.id, message.routeType === RpcMessageRouteType.peer ? message.getSource() : undefined); + const response = new RpcMessageBuilder(this.logger, this.writer, this.transportOptions, message.id, this.id, message.routeType === RpcMessageRouteType.peer ? message.getSource() : undefined); response.routeType = this.routeType; try { @@ -477,7 +569,7 @@ export class RpcKernelConnection extends RpcKernelBaseConnection { return; } - await this.peerExchange.register(body.id, this.writer); + await this.peerExchange.register(body.id, this.transportConnection); this.myPeerId = body.id; response.ack(); } catch (error) { @@ -559,10 +651,10 @@ export class RpcKernel { }); } - createConnection(writer: RpcConnectionWriter, injector?: InjectorContext): RpcKernelBaseConnection { + createConnection(transport: TransportConnection, injector?: InjectorContext): RpcKernelBaseConnection { if (!injector) injector = this.injector.createChildScope('rpc'); - const connection = new this.RpcKernelConnection(this.logger, writer, this.connections, this.cache, this.controllers, injector.get(RpcKernelSecurity), injector, this.peerExchange); + const connection = new this.RpcKernelConnection(this.logger, transport, this.connections, this.cache, this.controllers, injector.get(RpcKernelSecurity), injector, this.peerExchange); injector.set(RpcKernelConnection, connection); for (const on of this.onConnectionListeners) on(connection, injector, this.logger); return connection; diff --git a/packages/rpc/src/server/security.ts b/packages/rpc/src/server/security.ts index cea6fa6ec..292ba1a68 100644 --- a/packages/rpc/src/server/security.ts +++ b/packages/rpc/src/server/security.ts @@ -38,11 +38,11 @@ export class RpcKernelSecurity { } async isAllowedToRegisterAsPeer(session: Session, peerId: string): Promise { - return true; + return false; } async isAllowedToSendToPeer(session: Session, peerId: string): Promise { - return true; + return false; } async authenticate(token: any, connection: RpcKernelBaseConnection): Promise { diff --git a/packages/rpc/src/transport.ts b/packages/rpc/src/transport.ts new file mode 100644 index 000000000..de60c4d14 --- /dev/null +++ b/packages/rpc/src/transport.ts @@ -0,0 +1,146 @@ +import { + createRpcMessage, + readBinaryRpcMessage, + RpcBinaryMessageReader, + RpcMessage, + RpcMessageDefinition, + serializeBinaryRpcMessage, +} from './protocol.js'; +import { SingleProgress } from './progress.js'; +import { rpcChunk, RpcTypes } from './model.js'; + +export class TransportOptions { + /** + * Stores big buffers to the file system and stream it from there. + * In bytes. + * note: not implemented yet + */ + public cacheOnFileSystemWhenSizeIsAtLeast: number = 100_000_000; + + /** + * When back-pressure is bigger than this value, we wait with sending new data. + * In bytes. + * note: not implemented yet + */ + public stepBackWhenBackPressureBiggerThan: number = 5_000_000; + + /** + * Chunk size. + * In bytes. + */ + public chunkSize: number = 100_000; +} + +export interface TransportMessageWriter { + (message: RpcMessageDefinition, options: TransportOptions, progress?: SingleProgress): void; +} + +export interface TransportConnection { + /** + * Write is used either by Client->Server, or Server->Client. + * The method is responsible to serialize the message and send it over the wire. + */ + write?: TransportMessageWriter; + + /** + * Same as write, but sends binary directly. This enables chunking automatically. + */ + writeBinary?(message: Uint8Array): void; + + bufferedAmount?(): number; + + clientAddress?(): string; + + close(): void; +} + +export interface TransportClientConnection { + token?: any; + + onConnected(established: TransportConnection): void; + + onClose(reason: string): void; + + onError(error: Error): void; + + /** + * Called when data is received from the other side. + * The method is responsible to deserialize the message. + */ + read(message: RpcMessage): void; + + readBinary(message: Uint8Array, bytes?: number): void; +} + +export type RpcBinaryWriter = (buffer: Uint8Array) => void; + +/** + * This class acts as a layer between kernel/client and a connection writer. + * It automatically chunks long messages into multiple smaller one using the RpcType.Chunks type. + * + * todo: + * It keeps track of the back-pressure and sends only when the pressure is not too big. + * It automatically saves big buffer to the file system and streams data from there to not + * block valuable memory. + */ +export class TransportBinaryMessageChunkWriter { + protected chunkId = 0; + + constructor( + protected reader: RpcBinaryMessageReader, + protected options: TransportOptions, + ) { + } + + /** + * Writes a message buffer to the connection and chunks if necessary. + */ + write(writer: RpcBinaryWriter, message: Uint8Array, progress?: SingleProgress): void { + this.writeFull(writer, message, progress) + .catch(error => console.log('TransportBinaryMessageChunkWriter writeAsync error', error)); + } + + async writeFull(writer: RpcBinaryWriter, buffer: Uint8Array, progress?: SingleProgress): Promise { + if (this.options.chunkSize && buffer.byteLength >= this.options.chunkSize) { + //split up + const chunkId = this.chunkId++; + const message = readBinaryRpcMessage(buffer); //we need the original message-id, so the chunks are correctly assigned in Progress tracker + let offset = 0; + while (offset < buffer.byteLength) { + //todo: check back-pressure and wait if necessary + const slice = buffer.slice(offset, offset + this.options.chunkSize); + const chunkMessage = createRpcMessage(message.id, RpcTypes.Chunk, { + id: chunkId, + total: buffer.byteLength, + v: slice, + }); + offset += slice.byteLength; + const promise = new Promise((resolve) => { + this.reader.onChunkAck(message.id, resolve); + }); + writer(serializeBinaryRpcMessage(chunkMessage)); + await promise; + progress?.set(buffer.byteLength, offset); + } + } else { + writer(buffer); + progress?.set(buffer.byteLength, buffer.byteLength); + } + } +} + +export function createWriter(transport: TransportConnection, options: TransportOptions, reader: RpcBinaryMessageReader): TransportMessageWriter { + if (transport.writeBinary) { + const chunkWriter = new TransportBinaryMessageChunkWriter(reader, options); + return (message, options, progress) => { + const buffer = serializeBinaryRpcMessage(message); + chunkWriter.write(transport.writeBinary!, buffer, progress); + }; + } + + if (transport.write) { + return transport.write; + } + + throw new Error('No write method found on transport'); +} diff --git a/packages/rpc/src/writer.ts b/packages/rpc/src/writer.ts deleted file mode 100644 index 001466364..000000000 --- a/packages/rpc/src/writer.ts +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Deepkit Framework - * Copyright (C) 2021 Deepkit UG, Marc J. Schmidt - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the MIT License. - * - * You should have received a copy of the MIT License along with this program. - */ - -import { BehaviorSubject, Subject, Subscriber, Subscription, SubscriptionLike } from 'rxjs'; -import { rpcChunk, RpcTypes } from './model.js'; -import { createRpcMessage, readRpcMessage, RpcMessageReader } from './protocol.js'; -import type { RpcConnectionWriter } from './server/kernel.js'; - -export class SingleProgress extends Subject { - public done = false; - - public total = 0; - public current = 0; - public stats = 0; - - protected lastTime = 0; - - protected triggerFinished?: Function; - finished = new Promise((resolve) => { - this.triggerFinished = resolve; - }); - - constructor() { - super(); - } - - /** - * Acts like a BehaviorSubject. - */ - _subscribe(subscriber: Subscriber): Subscription { - //Subject does not expose protected _subscribe anymore, so we have to use prototype directly - const subscription = (Subject as any).prototype._subscribe.apply(this, [subscriber]); - if (subscription && !(subscription).closed) { - subscriber.next(this); - } - return subscription; - } - - public setStart(total: number) { - this.total = total; - this.lastTime = Date.now(); - } - - - public setBatch(size: number) { - this.current += size; - this.lastTime = Date.now(); - } - - get progress(): number { - if (this.done) return 1; - if (this.total === 0) return 0; - return this.current / this.total; - } - - set(total: number, current: number) { - if (this.done) return; - this.total = total; - this.current = current; - this.done = total === current; - this.stats++; - this.next(this); - if (this.done) { - this.complete(); - if (this.triggerFinished) this.triggerFinished(); - } - } -} - -export class Progress extends BehaviorSubject { - public readonly upload = new SingleProgress; - public readonly download = new SingleProgress; - - constructor() { - super(0); - } -} - -export class RpcMessageWriterOptions { - /** - * Stores big buffers to the file system and stream it from there. - * In bytes. - * note: not implemented yet - */ - public cacheOnFileSystemWhenSizeIsAtLeast: number = 100_000_000; - - /** - * When back-pressure is bigger than this value, we wait with sending new data. - * In bytes. - * note: not implemented yet - */ - public stepBackWhenBackPressureBiggerThan: number = 5_000_000; - - /** - * Chunk size. - * In bytes. - */ - public chunkSize: number = 100_000; - -} - -/** - * This class acts as a layer between kernel/client and a connection writer. - * It automatically chunks long messages into multiple smaller one using the RpcType.Chunks type. - * - * todo: - * It keeps track of the back-pressure and sends only when the pressure is not too big. - * It automatically saves big buffer to the file system and streams data from there to not - * block valuable memory. - */ -export class RpcMessageWriter implements RpcConnectionWriter { - protected chunkId = 0; - - constructor( - protected writer: RpcConnectionWriter, - protected reader: RpcMessageReader, - protected options: RpcMessageWriterOptions - ) { - } - - close(): void { - this.writer.close(); - } - - /** - * Writes a message buffer to the connection and chunks if necessary. - */ - write(buffer: Uint8Array, progress?: SingleProgress): void { - this.writeFull(buffer, progress).catch(error => console.log('RpcMessageWriter writeAsync error', error)); - } - - async writeFull(buffer: Uint8Array, progress?: SingleProgress): Promise { - if (this.options.chunkSize && buffer.byteLength >= this.options.chunkSize) { - //split up - const chunkId = this.chunkId++; - const message = readRpcMessage(buffer); //we need the original message-id, so the chunks are correctly assigned in Progress tracker - let offset = 0; - while (offset < buffer.byteLength) { - //todo: check back-pressure and wait if necessary - const slice = buffer.slice(offset, offset + this.options.chunkSize); - const chunkMessage = createRpcMessage(message.id, RpcTypes.Chunk, { - id: chunkId, - total: buffer.byteLength, - v: slice - }); - offset += slice.byteLength; - const promise = new Promise((resolve) => { - this.reader.onChunkAck(message.id, resolve); - }); - this.writer.write(chunkMessage); - await promise; - progress?.set(buffer.byteLength, offset); - } - } else { - this.writer.write(buffer); - progress?.set(buffer.byteLength, buffer.byteLength); - } - } -} - -export class ClientProgress { - static nextProgress?: Progress; - - /** - * Returns the current stack and sets a new one. - */ - static getNext(): Progress | undefined { - if (ClientProgress.nextProgress) { - const old = ClientProgress.nextProgress; - ClientProgress.nextProgress = undefined; - return old; - } - return undefined; - } - - /** - * Sets up a new Progress object for the next API request to be made. - * Only the very next API call will be tracked. - * - * @example - * ```typescript - * - * ClientProgress.track(); - * await api.myMethod(); - * - * ``` - */ - static track(): Progress { - const progress = new Progress; - ClientProgress.nextProgress = progress; - return progress; - } -} diff --git a/packages/rpc/tests/chunks.spec.ts b/packages/rpc/tests/chunks.spec.ts index 449c78166..1671fb131 100644 --- a/packages/rpc/tests/chunks.spec.ts +++ b/packages/rpc/tests/chunks.spec.ts @@ -3,8 +3,8 @@ import { skip } from 'rxjs/operators'; import { DirectClient } from '../src/client/client-direct.js'; import { rpc } from '../src/decorators.js'; import { RpcKernel } from '../src/server/kernel.js'; -import { ClientProgress } from '../src/writer.js'; -import { RpcBufferReader } from '../src/protocol.js'; +import { ClientProgress } from '../src/progress.js'; +import { RpcBinaryBufferReader } from '../src/protocol.js'; import { asyncOperation } from '@deepkit/core'; test('buffer read does not do copy', async () => { @@ -12,7 +12,7 @@ test('buffer read does not do copy', async () => { data.writeUint32LE(data.length, 0); let received: Uint8Array | undefined = undefined; - new RpcBufferReader((p) => { + new RpcBinaryBufferReader((p) => { received = p; }).feed(data); diff --git a/packages/rpc/tests/connection.spec.ts b/packages/rpc/tests/connection.spec.ts index f0ff855bc..23ad5220b 100644 --- a/packages/rpc/tests/connection.spec.ts +++ b/packages/rpc/tests/connection.spec.ts @@ -1,18 +1,19 @@ import { expect, test } from '@jest/globals'; import { RpcKernel } from '../src/server/kernel.js'; -import { RpcClient, TransportConnectionHooks } from '../src/client/client.js'; +import { RpcClient } from '../src/client/client.js'; +import { TransportClientConnection } from '../src/transport.js'; test('connect', async () => { const kernel = new RpcKernel(); - const connections: TransportConnectionHooks[] = []; + const connections: TransportClientConnection[] = []; const client = new RpcClient({ - connect(connection: TransportConnectionHooks) { + connect(connection: TransportClientConnection) { const kernelConnection = kernel.createConnection({ - write: (buffer) => connection.onData(buffer), + writeBinary: (buffer) => connection.readBinary(buffer), close: () => { - connection.onClose(); + connection.onClose(''); }, }); @@ -28,7 +29,7 @@ test('connect', async () => { close() { kernelConnection.close(); }, - send(buffer) { + writeBinary(buffer) { kernelConnection.feed(buffer); }, }); diff --git a/packages/rpc/tests/custom-message.spec.ts b/packages/rpc/tests/custom-message.spec.ts index 10f5b9f47..173949b24 100644 --- a/packages/rpc/tests/custom-message.spec.ts +++ b/packages/rpc/tests/custom-message.spec.ts @@ -27,13 +27,13 @@ test('back controller', async () => { class MyRpcKernelConnection extends RpcKernelConnection { async onMessage(message: RpcMessage): Promise { if (message.type === MyTypes.QueryAndAnswer) { - this.writer.write(createRpcMessage<{ v: string }>(message.id, MyTypes.Answer, { v: '42 is the answer' })); + this.write(createRpcMessage<{ v: string }>(message.id, MyTypes.Answer, { v: '42 is the answer' })); return; } if (message.type === MyTypes.BroadcastWithAck) { broadcastWithAckCalled = message.parseBody<{v: string}>() - this.writer.write(createRpcMessage(message.id, MyTypes.Ack)); + this.write(createRpcMessage(message.id, MyTypes.Ack)); return; } diff --git a/packages/rpc/tests/http.spec.ts b/packages/rpc/tests/http.spec.ts new file mode 100644 index 000000000..980392c21 --- /dev/null +++ b/packages/rpc/tests/http.spec.ts @@ -0,0 +1,63 @@ +import { expect, test } from '@jest/globals'; +import { RpcKernel } from '../src/server/kernel.js'; +import { rpc } from '../src/decorators.js'; +import { createServer, IncomingMessage, ServerResponse } from 'http'; +import { asyncOperation } from '@deepkit/core'; +import { RpcClient } from '../src/client/client.js'; +import { RpcHttpClientAdapter } from '../src/client/http.js'; + +test('http', async () => { + @rpc.controller('test') + class Controller { + @rpc.action() + hello(): string { + return 'world'; + } + } + + const kernel = new RpcKernel(); + kernel.registerController(Controller); + + function handler(request: IncomingMessage & { body?: Uint8Array }, response: ServerResponse) { + const connection = kernel.createConnection({ + write: (data) => { + + }, + bufferedAmount() { + return 0; + }, + close() { + + }, + clientAddress() { + return request.socket.remoteAddress || ''; + }, + }); + + const chunks: Buffer[] = []; + + function read(chunk: Buffer) { + chunks.push(chunk); + } + + request.on('data', read); + request.once('end', () => { + request.body = Buffer.concat(chunks); + request.off('data', read); + }); + + connection.onRequest('/rpc', request, response); + } + + await asyncOperation((resolve) => { + const server = createServer(handler).listen(0, async () => { + const port = (server.address() as any).port; + const client = new RpcClient(new RpcHttpClientAdapter('http://localhost:' + port + '/rpc')); + const controller = client.controller('test'); + const res = await controller.hello(); + resolve(); + server.close(); + expect(res).toBe('world'); + }); + }); +}); diff --git a/packages/rpc/tests/rpc.spec.ts b/packages/rpc/tests/rpc.spec.ts index 0aca69480..9262edd37 100644 --- a/packages/rpc/tests/rpc.spec.ts +++ b/packages/rpc/tests/rpc.spec.ts @@ -1,5 +1,4 @@ import { expect, test } from '@jest/globals'; -import { Progress, RpcMessageWriter, RpcMessageWriterOptions } from '../src/writer.js'; import { DirectClient } from '../src/client/client-direct.js'; import { rpc } from '../src/decorators.js'; import { @@ -8,17 +7,21 @@ import { createRpcMessage, createRpcMessagePeer, createRpcMessageSourceDest, - readRpcMessage, + readBinaryRpcMessage, readUint32LE, - RpcBufferReader, + RpcBinaryBufferReader, + RpcBinaryMessageReader, RpcMessage, - RpcMessageReader, - RpcMessageRouteType + RpcMessageRouteType, + serializeBinaryRpcMessage, } from '../src/protocol.js'; import { RpcKernel } from '../src/server/kernel.js'; import { RpcTypes } from '../src/model.js'; import { Writer } from '@deepkit/bson'; import { typeOf } from '@deepkit/type'; +import { RpcBinaryWriter, TransportBinaryMessageChunkWriter, TransportOptions } from '../src/transport.js'; +import { Progress } from '../src/progress.js'; +import { RpcKernelSecurity } from '../src/server/security.js'; test('readUint32LE', () => { { @@ -40,12 +43,12 @@ test('readUint32LE', () => { test('protocol basics', () => { interface schema { - name: string + name: string; } { const message = createRpcMessage(1024, 123); - const parsed = readRpcMessage(message); + const parsed = readBinaryRpcMessage(serializeBinaryRpcMessage(message)); expect(parsed.id).toBe(1024); expect(parsed.type).toBe(123); expect(parsed.composite).toBe(false); @@ -56,7 +59,7 @@ test('protocol basics', () => { { const message = createRpcMessage(1024, 130, { name: 'foo' }); - const parsed = readRpcMessage(message); + const parsed = readBinaryRpcMessage(serializeBinaryRpcMessage(message)); expect(parsed.id).toBe(1024); expect(parsed.type).toBe(130); expect(parsed.composite).toBe(false); @@ -67,7 +70,7 @@ test('protocol basics', () => { { const message = createRpcMessage(1024, 130, { name: 'foo' }, RpcMessageRouteType.server); - const parsed = readRpcMessage(message); + const parsed = readBinaryRpcMessage(serializeBinaryRpcMessage(message)); expect(parsed.id).toBe(1024); expect(parsed.type).toBe(130); expect(parsed.composite).toBe(false); @@ -78,7 +81,7 @@ test('protocol basics', () => { const peerSource = Buffer.alloc(16); peerSource[0] = 22; const message = createRpcMessagePeer(1024, 130, peerSource, 'myPeer', { name: 'foo' }); - const parsed = readRpcMessage(message); + const parsed = readBinaryRpcMessage(serializeBinaryRpcMessage(message)); expect(parsed.id).toBe(1024); expect(parsed.type).toBe(130); expect(parsed.composite).toBe(false); @@ -94,7 +97,7 @@ test('protocol basics', () => { const destination = Buffer.alloc(16); destination[0] = 20; const message = createRpcMessageSourceDest(1024, 130, source, destination, { name: 'foo' }); - const parsed = readRpcMessage(message); + const parsed = readBinaryRpcMessage(serializeBinaryRpcMessage(message)); expect(parsed.id).toBe(1024); expect(parsed.type).toBe(130); expect(parsed.composite).toBe(false); @@ -113,7 +116,7 @@ test('protocol composite', () => { { const message = createRpcCompositeMessage(1024, 33, [{ type: 4, schema: typeOf(), body: { name: 'foo' } }]); - const parsed = readRpcMessage(message); + const parsed = readBinaryRpcMessage(serializeBinaryRpcMessage(message)); expect(parsed.id).toBe(1024); expect(parsed.type).toBe(33); expect(parsed.composite).toBe(true); @@ -131,7 +134,7 @@ test('protocol composite', () => { { const message = createRpcCompositeMessage(1024, 5, [{ type: 4 }, { type: 5, schema: typeOf(), body: { name: 'foo' } }]); - const parsed = readRpcMessage(message); + const parsed = readBinaryRpcMessage(serializeBinaryRpcMessage(message)); expect(parsed.id).toBe(1024); expect(parsed.type).toBe(5); expect(parsed.composite).toBe(true); @@ -153,10 +156,10 @@ test('protocol composite', () => { const message = createRpcCompositeMessage(1024, 6, [{ type: 4, schema: typeOf(), body: { name: 'foo' } }, { type: 12, schema: typeOf(), - body: { name: 'bar' } + body: { name: 'bar' }, }]); - const parsed = readRpcMessage(message); + const parsed = readBinaryRpcMessage(serializeBinaryRpcMessage(message)); expect(parsed.id).toBe(1024); expect(parsed.type).toBe(6); expect(parsed.composite).toBe(true); @@ -179,10 +182,10 @@ test('protocol composite', () => { const message = createRpcCompositeMessageSourceDest(1024, source, destination, 55, [{ type: 4, schema: typeOf(), - body: { name: 'foo' } + body: { name: 'foo' }, }, { type: 12, schema: typeOf(), body: { name: 'bar' } }]); - const parsed = readRpcMessage(message); + const parsed = readBinaryRpcMessage(serializeBinaryRpcMessage(message)); expect(parsed.id).toBe(1024); expect(parsed.type).toBe(55); expect(parsed.composite).toBe(true); @@ -236,7 +239,18 @@ test('rpc kernel', async () => { }); test('rpc peer', async () => { - const kernel = new RpcKernel(); + + class MyRpcSecurity extends RpcKernelSecurity { + async isAllowedToSendToPeer() { + return true; + } + async isAllowedToRegisterAsPeer() { + return true; + } + } + const kernel = new RpcKernel([ + { provide: RpcKernelSecurity, useClass: MyRpcSecurity, scope: 'rpc' }, + ]); const client1 = new DirectClient(kernel); @@ -259,7 +273,7 @@ test('rpc peer', async () => { test('message reader', async () => { const messages: Buffer[] = []; - const reader = new RpcBufferReader(Array.prototype.push.bind(messages)); + const reader = new RpcBinaryBufferReader(Array.prototype.push.bind(messages)); let buffer: any; @@ -381,31 +395,31 @@ test('message reader', async () => { test('message chunks', async () => { const messages: RpcMessage[] = []; - const reader = new RpcMessageReader(v => messages.push(v)); + const reader = new RpcBinaryMessageReader(v => messages.push(v)); + interface schema { v: string; } + const bigString = 'x'.repeat(1_000_000); //1mb const buffers: Uint8Array[] = []; - const writer = new RpcMessageWriter({ - write(b) { - buffers.push(b); - reader.feed(createRpcMessage(2, RpcTypes.ChunkAck)); //confirm chunk, this is done automatically in the kernel - reader.feed(b); //echo back - }, - close() {} - }, reader, new RpcMessageWriterOptions); - - const message = createRpcMessage(2, RpcTypes.ResponseActionSimple, { v: bigString }); - await writer.writeFull(message); + const binaryWriter: RpcBinaryWriter = (b) => { + buffers.push(b); + reader.feed(serializeBinaryRpcMessage(createRpcMessage(2, RpcTypes.ChunkAck))); //confirm chunk, this is done automatically in the kernel + reader.feed(b); //echo back + }; + const writer = new TransportBinaryMessageChunkWriter(reader, new TransportOptions()); + + const message = serializeBinaryRpcMessage(createRpcMessage(2, RpcTypes.ResponseActionSimple, { v: bigString })); + await writer.writeFull(binaryWriter, message); expect(buffers.length).toBe(11); //total size is 1_000_025, chunk is 100k, so we have 11 packages - expect(readRpcMessage(buffers[0]).id).toBe(2); - expect(readRpcMessage(buffers[0]).type).toBe(RpcTypes.Chunk); + expect(readBinaryRpcMessage(buffers[0]).id).toBe(2); + expect(readBinaryRpcMessage(buffers[0]).type).toBe(RpcTypes.Chunk); - expect(readRpcMessage(buffers[10]).id).toBe(2); - expect(readRpcMessage(buffers[10]).type).toBe(RpcTypes.Chunk); + expect(readBinaryRpcMessage(buffers[10]).id).toBe(2); + expect(readBinaryRpcMessage(buffers[10]).type).toBe(RpcTypes.Chunk); expect(messages.length).toBe(1); const lastReceivedMessage = messages[0]; @@ -418,25 +432,24 @@ test('message chunks', async () => { test('message progress', async () => { const messages: RpcMessage[] = []; - const reader = new RpcMessageReader(v => messages.push(v)); + const reader = new RpcBinaryMessageReader(v => messages.push(v)); + interface schema { v: string; } + const bigString = 'x'.repeat(1_000_000); //1mb - const writer = new RpcMessageWriter({ - write(b) { - reader.feed(createRpcMessage(2, RpcTypes.ChunkAck)); //confirm chunk, this is done automatically in the kernel - reader.feed(b); //echo - }, - close() { - } - }, reader, new RpcMessageWriterOptions); + const binaryWriter: RpcBinaryWriter = (b) => { + reader.feed(serializeBinaryRpcMessage(createRpcMessage(2, RpcTypes.ChunkAck))); //confirm chunk, this is done automatically in the kernel + reader.feed(b); //echo + }; + const writer = new TransportBinaryMessageChunkWriter(reader, new TransportOptions); - const message = createRpcMessage(2, RpcTypes.ResponseActionSimple, { v: bigString }); + const message = serializeBinaryRpcMessage(createRpcMessage(2, RpcTypes.ResponseActionSimple, { v: bigString })); const progress = new Progress(); reader.registerProgress(2, progress.download); - await writer.writeFull(message, progress.upload); + await writer.writeFull(binaryWriter, message, progress.upload); await progress.upload.finished; expect(progress.upload.done).toBe(true);