Skip to content

Commit

Permalink
feat(nestjs-json-rpc-sdk): add takeUntil for disconnect socket
Browse files Browse the repository at this point in the history
  • Loading branch information
klerick committed Apr 4, 2024
1 parent 31450b3 commit aefaf95
Show file tree
Hide file tree
Showing 14 changed files with 389 additions and 110 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

- *[json-api-nestjs](https://github.com/klerick/nestjs-json-api/tree/master/libs/json-api/json-api-nestjs)* - plugin for create CRUD overs JSON API
- *[json-api-nestjs-sdk](https://github.com/klerick/nestjs-json-api/tree/master/libs/json-api/json-api-nestjs-sdk)* - tool for client, call api over *json-api-nestjs*
- *[nestjs-json-rpc](https://github.com/klerick/nestjs-json-api/tree/master/libs/json-rpc/nestjs-json-rpc)* - plugin for create RPC server using [JSON-RPC](https://www.jsonrpc.org/)
- *[nestjs-json-rpc-sdk](https://github.com/klerick/nestjs-json-api/tree/master/libs/json-rpc/nestjs-json-rpc-sdk)* - tool for client, call RPC server *nestjs-json-rpc*
- *json-api-nestjs-acl* - tool for acl over *json-api-nestjs*(coming soon...)
## Installation

Expand Down
68 changes: 58 additions & 10 deletions apps/json-api-front/src/app/app.config.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,60 @@
import { ApplicationConfig, importProvidersFrom } from '@angular/core';
import {
ApplicationConfig,
importProvidersFrom,
InjectionToken,
} from '@angular/core';
import { JsonApiAngular } from 'json-api-nestjs-sdk/json-api-nestjs-sdk.module';
import {
JsonRpcAngular,
JsonRpcAngularConfig,
TransportType,
} from '@klerick/nestjs-json-rpc-sdk/json-rpc-sdk.module';
import io from 'socket.io-client';
import { Subject } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';
import { io } from 'socket.io-client';

const destroySubject = new Subject<boolean>();
setTimeout(() => {
console.log('Disconnect');
destroySubject.next(true);
destroySubject.complete();
}, 5000);
const destroySubjectToken = new InjectionToken('destroySubjectToken', {
factory: () => destroySubject,
});
destroySubject.subscribe((r) => console.log(r));
const tokenSocketInst = new InjectionToken('tokenSocketInst', {
factory: () => webSocket('ws://localhost:4200/rpc'),
});

const tokenIoSocketInst = new InjectionToken('tokenIoSocketInst', {
factory: () => io('http://localhost:3000', { path: '/rpc' }),
});

const httpConfig: JsonRpcAngularConfig = {
transport: TransportType.HTTP,
rpcPath: '/api/rpc',
rpcHost: 'http://localhost:4200',
};
const wsConfig: JsonRpcAngularConfig = {
transport: TransportType.WS,
useWsNativeSocket: true,
rpcPath: 'rpc',
rpcHost: 'ws://localhost:4200',
destroySubjectToken,
};
const wsConfigWithToken: JsonRpcAngularConfig = {
transport: TransportType.WS,
useWsNativeSocket: true,
tokenSocketInst,
destroySubjectToken,
};
const ioConfig: JsonRpcAngularConfig = {
transport: TransportType.WS,
useWsNativeSocket: false,
destroySubjectToken,
tokenSocketInst: tokenIoSocketInst,
};

export const appConfig: ApplicationConfig = {
providers: [
Expand All @@ -17,14 +67,12 @@ export const appConfig: ApplicationConfig = {
})
),
importProvidersFrom(
JsonRpcAngular.forRoot({
transport: TransportType.WS,
rpcPath: 'rpc',
rpcHost: 'ws://localhost:4200',
useWsNativeSocket: true,
// useWsNativeSocket: false,
// webSocketCtor: io('http://localhost:3000', { path: '/rpc' }),
})
JsonRpcAngular.forRoot(
// httpConfig
// wsConfig
// wsConfigWithToken,
ioConfig
)
),
],
};
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import {
RpcError,
} from '@klerick/nestjs-json-rpc-sdk';

import { creatWsRpcSdk, MapperRpc, run } from '../utils/run-application';
import {
creatWsRpcSdk,
MapperRpc,
run,
destroySubject,
} from '../utils/run-application';

let app: INestApplication;

Expand All @@ -14,6 +19,8 @@ beforeAll(async () => {
});

afterAll(async () => {
destroySubject.next(true);
destroySubject.complete();
await app.close();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { AppModule } from '../../../../json-api-server/src/app/app.module';

import { JsonConfig } from '../../../../../libs/json-api/json-api-nestjs-sdk/src/lib/types';
import { WsAdapter } from '@nestjs/platform-ws';
import { Subject } from 'rxjs';

export const axiosAdapter = adapterForAxios(axios);
let saveApp: INestApplication;
Expand Down Expand Up @@ -70,15 +71,16 @@ export const creatRpcSdk = (config: Partial<RpcConfig> = {}) =>
},
true
);

export const destroySubject = new Subject<boolean>();
export const creatWsRpcSdk = (config: Partial<RpcConfig> = {}) =>
RpcFactory<MapperRpc>(
{
transport: TransportType.WS,
useWsNativeSocket: true,
webSocketCtor: WebSocket,
nativeSocketImplementation: WebSocket,
rpcHost: `http://localhost:${port}`,
rpcPath: `/rpc`,
destroySubject,
},
true
);
94 changes: 94 additions & 0 deletions libs/json-rpc/nestjs-json-rpc-sdk/src/lib/angular/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { inject, InjectionToken } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Subject } from 'rxjs';
import { WebSocketSubject } from 'rxjs/internal/observable/dom/WebSocketSubject';
import { Socket } from 'socket.io-client';

import {
LoopFunc,
PayloadRpc,
RpcResult,
RpcReturnList,
RpcConfig,
TransportType,
} from '../types';
import { transportFactory } from '../factory';
import { webSocketFactory, WsResponse } from '../factory/ws-transport.factory';

import { JSON_RPC_SDK_CONFIG, JSON_RPC_SDK_TRANSPORT } from './tokens';
import { RpcBatchFactory, rpcProxy } from '../utils';

export function rpcBatchFactory() {
return RpcBatchFactory(inject(JSON_RPC_SDK_TRANSPORT));
}

export function rpcFactory() {
return rpcProxy<RpcReturnList<any, true>>(
inject(JSON_RPC_SDK_TRANSPORT),
false
);
}

export function angularTransportFactory() {
const angularConfig = inject(JSON_RPC_SDK_CONFIG);
const httpClient = inject(HttpClient);

if (angularConfig.transport === TransportType.HTTP) {
const rpcConfig: RpcConfig = {
transport: angularConfig.transport,
httpAgentFactory: (url: string) => (body: PayloadRpc<LoopFunc>) =>
httpClient.post<RpcResult<LoopFunc>>(url, body),
rpcPath: angularConfig.rpcPath,
rpcHost: angularConfig.rpcHost,
};
return transportFactory(rpcConfig);
}

const destroySubject =
(angularConfig.destroySubjectToken &&
inject<Subject<boolean>>(angularConfig.destroySubjectToken, {
optional: true,
})) ||
new Subject<boolean>();

if (angularConfig.useWsNativeSocket) {
let socketInst:
| WebSocketSubject<WsResponse<PayloadRpc<LoopFunc> | RpcResult<LoopFunc>>>
| undefined = undefined;
if ('tokenSocketInst' in angularConfig) {
socketInst =
inject<
WebSocketSubject<
WsResponse<PayloadRpc<LoopFunc> | RpcResult<LoopFunc>>
>
>(angularConfig['tokenSocketInst'], { optional: true }) || undefined;
} else {
const url = new URL(
angularConfig.rpcPath,
angularConfig.rpcHost
).toString();
socketInst = webSocketFactory(
url,
angularConfig.nativeSocketImplementation
);
}

if (socketInst === undefined) throw new Error('Cant create socket inst');
const rpcConfig: RpcConfig = {
transport: angularConfig.transport,
useWsNativeSocket: angularConfig.useWsNativeSocket,
nativeSocketInstance: socketInst,
destroySubject,
};

return transportFactory(rpcConfig);
}
const ioSocketInstance = inject<Socket>(angularConfig['tokenSocketInst']);
const rpcConfig: RpcConfig = {
transport: angularConfig.transport,
useWsNativeSocket: angularConfig.useWsNativeSocket,
ioSocketInstance: ioSocketInstance,
destroySubject,
};
return transportFactory(rpcConfig);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { ModuleWithProviders, NgModule } from '@angular/core';
import { HttpClientModule } from '@angular/common/http';

import { JSON_RPC_SDK_CONFIG } from './tokens';
import { JsonRpcAngularConfig } from '../types';

@NgModule({
imports: [HttpClientModule],
})
export class JsonRpcAngular {
static forRoot(
config: JsonRpcAngularConfig
): ModuleWithProviders<JsonRpcAngular> {
return {
ngModule: JsonRpcAngular,
providers: [
{
useValue: config,
provide: JSON_RPC_SDK_CONFIG,
},
],
};
}
}
31 changes: 31 additions & 0 deletions libs/json-rpc/nestjs-json-rpc-sdk/src/lib/angular/tokens.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { InjectionToken } from '@angular/core';
import { LoopFunc, RpcBatch, RpcReturnList, Transport } from '../types';

import { JsonRpcAngularConfig } from '@klerick/nestjs-json-rpc-sdk/json-rpc-sdk.module';
import {
angularTransportFactory,
rpcBatchFactory,
rpcFactory,
} from './factory';

export const JSON_RPC_SDK_CONFIG = new InjectionToken<JsonRpcAngularConfig>(
'Main config object for sdk'
);

export const JSON_RPC_SDK_TRANSPORT = new InjectionToken<Transport<LoopFunc>>(
'Transport for RPC',
{
factory: angularTransportFactory,
}
);

export const JSON_RPC = new InjectionToken<RpcReturnList<object, false>>(
'Rpc client',
{
factory: rpcFactory,
}
);

export const RPC_BATCH = new InjectionToken<RpcBatch>('Rpc client for batch', {
factory: rpcBatchFactory,
});
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
import { filter, of, Subject, switchMap, take, tap } from 'rxjs';
import type { Socket } from 'socket.io-client';
import {
filter,
Observable,
Observer,
of,
Subject,
Subscription,
switchMap,
take,
takeUntil,
tap,
} from 'rxjs';
import { Subscriber } from 'rxjs/internal/Subscriber';
import { TeardownLogic } from 'rxjs/internal/types';

import { LoopFunc, PayloadRpc, RpcResult, Transport } from '../types';
import { WS_EVENT_NAME } from '../constans';

Expand All @@ -11,18 +25,51 @@ interface ClientToServerEvents<T extends LoopFunc> {
rpc: (payload: PayloadRpc<T>) => void;
}

class SocketIo<T extends LoopFunc> extends Observable<RpcResult<T>> {
private messageQueue: PayloadRpc<T>[] = [];
constructor(
private io: Socket<ServerToClientEvents<T>, ClientToServerEvents<T>>
) {
super((subscriber) => this.subscribeForObservable(subscriber));
this.io.on('connect', () => {
while (this.messageQueue.length > 0) {
const msg = this.messageQueue.shift();
if (!msg) break;
this.io.emit(WS_EVENT_NAME, msg);
}
});
}

private subscribeForObservable(
subscriber: Subscriber<RpcResult<T>>
): TeardownLogic {
this.io.on(WS_EVENT_NAME, (value) => subscriber.next(value));
this.io.on('connect_error', (error: Error) => subscriber.error(error));
this.io.on('disconnect', () => subscriber.complete());
return { unsubscribe: () => this.io.close() };
}

public next(message: PayloadRpc<T>): void {
if (!this.io.connected) {
this.messageQueue.push(message);
return;
}

this.io.emit(WS_EVENT_NAME, message);
}
}

export function ioTransportFactory<T extends LoopFunc>(
io: Socket<ServerToClientEvents<T>, ClientToServerEvents<T>>
io: Socket<ServerToClientEvents<T>, ClientToServerEvents<T>>,
destroyFactory: Subject<boolean>
): Transport<T> {
const subjectData = new Subject<RpcResult<T>>();
io.on(WS_EVENT_NAME, (event) => subjectData.next(event));

const socketSubject = new SocketIo(io).pipe(takeUntil(destroyFactory));
return (body: PayloadRpc<T>) => {
const { id } = body;
return of(true).pipe(
tap(() => io.emit(WS_EVENT_NAME, body)),
switchMap(() =>
subjectData.pipe(filter((response) => response.id === id))
socketSubject.pipe(filter((response) => response.id === id))
),
take(1)
);
Expand Down
Loading

0 comments on commit aefaf95

Please sign in to comment.