Skip to content

Commit

Permalink
Update ESLint plugins and refactor socket-client exports; enhance cod…
Browse files Browse the repository at this point in the history
…e readability and maintainability
  • Loading branch information
ZNackasha committed Nov 15, 2024
1 parent 2b47311 commit 4092055
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 125 deletions.
18 changes: 3 additions & 15 deletions libs/vanilla-utils/socket-client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@

import SubscribableSocket, { SubscribableSocketEvent } from './socket';
import SubscribableReconnectingSocket, { SubscribableReconnectingSocketEvent } from './reconnectingSocket';
import { SubscribableSleepWakeUp } from './wakeUp';

export {
SubscribableSocket,
SubscribableSleepWakeUp,
SubscribableReconnectingSocket,
};

export type {
SubscribableSocketEvent,
SubscribableReconnectingSocketEvent,
};
export * from './socket';
export * from './reconnectingSocket';
export * from './wakeUp';
66 changes: 35 additions & 31 deletions libs/vanilla-utils/socket-client/src/reconnectingSocket.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import SubscribableSocket, { SocketState, SubscribableSocketEvent, WebSocketProps } from './socket';
import { SubscribableSocket, SocketState, SubscribableSocketEvent, WebSocketProps } from './socket';
import { ExtendedSubscribable, SubscribableEvent, Subscription } from '@benzinga/subscribable';
import { SubscribableSleepWakeUp } from './wakeUp';
interface SocketDisconnectEvent extends SubscribableEvent<'disconnected'> {
Expand All @@ -25,20 +25,20 @@ interface ReconnectingSocketFunctions {
sendObject: SubscribableReconnectingSocket['sendObject'];
}

class SubscribableReconnectingSocket<RESPFormat = unknown, REQFormat = unknown> extends ExtendedSubscribable<
export class SubscribableReconnectingSocket<RESPFormat = unknown, REQFormat = unknown> extends ExtendedSubscribable<
SubscribableReconnectingSocketEvent<RESPFormat>,
ReconnectingSocketFunctions
> {
private socket: SubscribableSocket<RESPFormat, REQFormat>;
private socketSubscription?: Subscription<SubscribableSocket<RESPFormat>>;
private sleepWakeUp: SubscribableSleepWakeUp;
private readonly sleepWakeUp: SubscribableSleepWakeUp;
private sleepWakeUpSubscription?: Subscription<SubscribableSleepWakeUp>;
private disconnectTime?: Date;
private forceReconnect = false;
private webSocketProps: WebSocketProps;
private url: URL;
private readonly webSocketProps: WebSocketProps;
private readonly url: URL;
private state: ReconnectSocketState = 'closed';
private getTimeoutLength?: (disconnectTime: Date) => number;
private readonly getTimeoutLength?: (disconnectTime: Date) => number;

constructor(url: URL, webSocketProps?: WebSocketProps, getTimeoutLength?: (disconnectTime: Date) => number) {
super();
Expand All @@ -50,7 +50,7 @@ class SubscribableReconnectingSocket<RESPFormat = unknown, REQFormat = unknown>
this.getTimeoutLength = getTimeoutLength;
}

private static getTimeoutLength = (disconnectTime: Date): number => {
private static getTimeoutLength(disconnectTime: Date): number {
const timeDelta = new Date().getTime() - disconnectTime.getTime();
if (timeDelta > 10000) {
return 10000;
Expand All @@ -59,9 +59,9 @@ class SubscribableReconnectingSocket<RESPFormat = unknown, REQFormat = unknown>
} else {
return timeDelta;
}
};
}

public open = (): void => {
public open(): void {
if (this.state === 'closing') {
this.socketSubscription?.unsubscribe();
this.socketSubscription = undefined;
Expand All @@ -76,9 +76,9 @@ class SubscribableReconnectingSocket<RESPFormat = unknown, REQFormat = unknown>
this.socket.open();
this.sleepWakeUpSubscription = this.sleepWakeUp.subscribe(() => this.reconnect());
}
};
}

public reconnect = (): void => {
public reconnect(): void {
if (this.forceReconnect === false) {
this.forceReconnect = true;
const socketState = this.socket.getState();
Expand All @@ -89,35 +89,41 @@ class SubscribableReconnectingSocket<RESPFormat = unknown, REQFormat = unknown>
this.timedReconnect();
}
}
};
}

public send = (data: string | ArrayBuffer | ArrayBufferView | Blob): void => {
public send(data: string | ArrayBuffer | ArrayBufferView | Blob): void {
this.socket.send(data);
};
}

public sendObject = <T = REQFormat>(data: T): void => {
this.socket.sendObject(data);
};

public close = (): void => {
public close(): void {
this.socket.close();
this.sleepWakeUpSubscription?.unsubscribe();
this.sleepWakeUpSubscription = undefined;
};
}

public getState = (): ReconnectSocketState => this.state;
public getState(): ReconnectSocketState {
return this.state;
}

protected onSubscribe = (): ReconnectingSocketFunctions => ({
close: this.close,
open: this.open,
reconnect: this.reconnect,
send: this.send,
sendObject: this.sendObject,
});
protected onSubscribe(): ReconnectingSocketFunctions {
return {
close: this.close,
open: this.open,
reconnect: this.reconnect,
send: this.send,
sendObject: this.sendObject,
};
}

protected override onZeroSubscriptions = (): void => this.close();
protected override onZeroSubscriptions(): void {
this.close();
}

private onMessage = (event: SubscribableSocketEvent<RESPFormat>) => {
private onMessage(event: SubscribableSocketEvent<RESPFormat>) {
switch (event.type) {
case 'close':
if (event.event.wasClean && this.forceReconnect === false) {
Expand Down Expand Up @@ -152,9 +158,9 @@ class SubscribableReconnectingSocket<RESPFormat = unknown, REQFormat = unknown>
this.dispatch(event);
break;
}
};
}

private timedReconnect = () => {
private timedReconnect() {
this.socket.close();
if (this.disconnectTime === undefined) {
this.disconnectTime = new Date();
Expand All @@ -166,7 +172,5 @@ class SubscribableReconnectingSocket<RESPFormat = unknown, REQFormat = unknown>
this.socket.open();
}, getTimeoutLength(this.disconnectTime));
}
};
}
}

export default SubscribableReconnectingSocket;
55 changes: 31 additions & 24 deletions libs/vanilla-utils/socket-client/src/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ interface SocketCloseEvent extends SubscribableEvent<'close'> {
event: CloseEvent;
}

interface UrlUpdateEvent extends SubscribableEvent<'url-update'> {
url: URL;
}

export type SubscribableSocketEvent<RESPFormat> =
| SocketCloseEvent
| SocketErrorEvent
| SocketRequestEvent
| SocketResponseEvent<RESPFormat>
| SubscribableEvent<'open'>
| SubscribableEvent<'closing'>;
| SubscribableEvent<'closing'>
| UrlUpdateEvent;

export type SocketState = 'closed' | 'closing' | 'open' | 'opening';

Expand All @@ -35,14 +40,14 @@ interface SocketFunctions {
sendObject: SubscribableSocket['sendObject'];
}

class SubscribableSocket<RESPFormat = unknown, REQFormat = unknown> extends ExtendedSubscribable<
export class SubscribableSocket<RESPFormat = unknown, REQFormat = unknown> extends ExtendedSubscribable<
SubscribableSocketEvent<RESPFormat>,
SocketFunctions
> {
private socket?: WebSocket;
private url: URL;
private state: SocketState;
private webSocketProps: WebSocketProps;
private readonly webSocketProps: WebSocketProps;
private queueSend: (string | ArrayBuffer | ArrayBufferView | Blob)[] = [];
private socketsOpened: WebSocket[] = [];

Expand All @@ -53,7 +58,7 @@ class SubscribableSocket<RESPFormat = unknown, REQFormat = unknown> extends Exte
this.state = 'closed';
}

public open = async (): Promise<void> => {
public async open(): Promise<void> {
if (this.socket === undefined && this.state !== 'opening') {
this.state = 'opening';
const socket = await safeResilient(
Expand Down Expand Up @@ -117,18 +122,18 @@ class SubscribableSocket<RESPFormat = unknown, REQFormat = unknown> extends Exte
this.dispatch({ type: 'open' });
}
}
};
}

public close = (): void => {
public close(): void {
this.close_internal();
this.queueSend = [];
};
}

public sendObject = <T = REQFormat>(data: T): void => {
public sendObject<T = REQFormat>(data: T): void {
this.send(JSON.stringify(data));
};
}

public send = (data: string | ArrayBuffer | ArrayBufferView | Blob): void => {
public send(data: string | ArrayBuffer | ArrayBufferView | Blob): void {
switch (this.state) {
case 'opening':
this.queueSend.push(data);
Expand All @@ -145,11 +150,13 @@ class SubscribableSocket<RESPFormat = unknown, REQFormat = unknown> extends Exte
console.log('cannot send data if socket is not open');
break;
}
};
}

public getState = (): SocketState => this.state;
public getState(): SocketState {
return this.state;
}

protected close_internal = (): void => {
protected close_internal(): void {
switch (this.state) {
case 'open': {
try {
Expand All @@ -173,18 +180,18 @@ class SubscribableSocket<RESPFormat = unknown, REQFormat = unknown> extends Exte
}
}
this.socket = undefined;
};
}

protected override onSubscribe = (): SocketFunctions => ({
close: this.close,
open: this.open,
send: this.send,
sendObject: this.sendObject,
});
protected override onSubscribe(): SocketFunctions {
return {
close: this.close,
open: this.open,
send: this.send,
sendObject: this.sendObject,
};
}

protected override onZeroSubscriptions = (): void => {
protected override onZeroSubscriptions(): void {
this.close();
};
}
}

export default SubscribableSocket;
Loading

0 comments on commit 4092055

Please sign in to comment.