Skip to content

Commit

Permalink
Detect closed sockets and handle more gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
floodfx committed Feb 5, 2023
1 parent e0a718a commit 87b658d
Show file tree
Hide file tree
Showing 19 changed files with 364 additions and 227 deletions.
9 changes: 9 additions & 0 deletions .changeset/young-ducks-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@liveviewjs/lambda-examples": patch
"liveviewjs": patch
"@liveviewjs/examples": patch
"@liveviewjs/express": patch
"@liveviewjs/gen": patch
---

Detect closed sockets and handle more gracefully
361 changes: 186 additions & 175 deletions packages/core/coverage/clover.xml

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions packages/core/coverage/coverage-final.json

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion packages/core/dist/liveview.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ interface WsAdaptor {
send(message: string, errorHandler?: (err: any) => void): void;
subscribeToMessages(msgListener: WsMsgListener): Promise<void> | void;
subscribeToClose(closeListener: WsCloseListener): Promise<void> | void;
isClosed(): boolean;
}

type SubscriberFunction<T> = (data: T) => void;
Expand Down Expand Up @@ -1431,9 +1432,18 @@ declare class WsHandler {
* @param msg a Phx.Msg to be routed
*/
handleMsg(msg: Phx.Msg<unknown>): Promise<void>;
close(): Promise<void>;
send(reply: PhxReply.Reply): void;
private close;
/**
* Check if the websocket is closed and if so, shutdown the liveview
*/
private maybeShutdown;
/**
* Call the config.onError callback on send errors and if the
* websocket is closed, shutdown the liveview
*/
private maybeHandleError;
private maybeDebug;
private cleanupPostReply;
private viewToDiff;
private viewToRendered;
Expand Down
50 changes: 36 additions & 14 deletions packages/core/dist/liveview.js
Original file line number Diff line number Diff line change
Expand Up @@ -3399,14 +3399,7 @@ class WsHandler {
* @param msg a Phx.Msg to be routed
*/
async handleMsg(msg) {
if (this.#config.debug) {
try {
this.#config.debug(JSON.stringify(msg));
}
catch (e) {
console.error("error debugging message", e);
}
}
this.maybeDebug(JSON.stringify(msg));
try {
// attempt to prevent race conditions by queuing messages
// if we are already processing a message
Expand Down Expand Up @@ -3639,24 +3632,53 @@ class WsHandler {
this.maybeHandleError(e);
}
}
async close() {
// redirect this through handleMsg after adding the joinId
const joinId = this.#ctx?.joinId ?? "unknown";
this.handleMsg([null, null, joinId, "phx_leave", null]);
}
send(reply) {
try {
this.#ws.send(PhxReply.serialize(reply), this.maybeHandleError.bind(this));
const shutdown = this.maybeShutdown();
if (!shutdown) {
this.#ws.send(PhxReply.serialize(reply), this.maybeHandleError.bind(this));
}
}
catch (e) {
this.maybeHandleError(e);
}
}
async close() {
// redirect this through handleMsg after adding the joinId
const joinId = this.#ctx?.joinId ?? "unknown";
this.handleMsg([null, null, joinId, "phx_leave", null]);
}
/**
* Check if the websocket is closed and if so, shutdown the liveview
*/
maybeShutdown() {
if (this.#ws.isClosed()) {
this.maybeDebug(`ws closed, shutting down liveview: ${this.#ctx?.joinId}`);
this.close();
return true;
}
return false;
}
/**
* Call the config.onError callback on send errors and if the
* websocket is closed, shutdown the liveview
*/
maybeHandleError(err) {
this.maybeShutdown();
if (err && this.#config && this.#config.onError) {
this.#config.onError(err);
}
}
maybeDebug(msg) {
if (this.#config.debug) {
try {
this.#config.debug(msg);
}
catch (e) {
console.error("error debugging message", e);
}
}
}
async cleanupPostReply() {
// do post-send lifecycle step
this.#ctx.socket.updateContextWithTempAssigns();
Expand Down
50 changes: 36 additions & 14 deletions packages/core/dist/liveview.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3392,14 +3392,7 @@ class WsHandler {
* @param msg a Phx.Msg to be routed
*/
async handleMsg(msg) {
if (this.#config.debug) {
try {
this.#config.debug(JSON.stringify(msg));
}
catch (e) {
console.error("error debugging message", e);
}
}
this.maybeDebug(JSON.stringify(msg));
try {
// attempt to prevent race conditions by queuing messages
// if we are already processing a message
Expand Down Expand Up @@ -3632,24 +3625,53 @@ class WsHandler {
this.maybeHandleError(e);
}
}
async close() {
// redirect this through handleMsg after adding the joinId
const joinId = this.#ctx?.joinId ?? "unknown";
this.handleMsg([null, null, joinId, "phx_leave", null]);
}
send(reply) {
try {
this.#ws.send(PhxReply.serialize(reply), this.maybeHandleError.bind(this));
const shutdown = this.maybeShutdown();
if (!shutdown) {
this.#ws.send(PhxReply.serialize(reply), this.maybeHandleError.bind(this));
}
}
catch (e) {
this.maybeHandleError(e);
}
}
async close() {
// redirect this through handleMsg after adding the joinId
const joinId = this.#ctx?.joinId ?? "unknown";
this.handleMsg([null, null, joinId, "phx_leave", null]);
}
/**
* Check if the websocket is closed and if so, shutdown the liveview
*/
maybeShutdown() {
if (this.#ws.isClosed()) {
this.maybeDebug(`ws closed, shutting down liveview: ${this.#ctx?.joinId}`);
this.close();
return true;
}
return false;
}
/**
* Call the config.onError callback on send errors and if the
* websocket is closed, shutdown the liveview
*/
maybeHandleError(err) {
this.maybeShutdown();
if (err && this.#config && this.#config.onError) {
this.#config.onError(err);
}
}
maybeDebug(msg) {
if (this.#config.debug) {
try {
this.#config.debug(msg);
}
catch (e) {
console.error("error debugging message", e);
}
}
}
async cleanupPostReply() {
// do post-send lifecycle step
this.#ctx.socket.updateContextWithTempAssigns();
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/server/adaptor/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ export interface WsAdaptor {
send(message: string, errorHandler?: (err: any) => void): void;
subscribeToMessages(msgListener: WsMsgListener): Promise<void> | void;
subscribeToClose(closeListener: WsCloseListener): Promise<void> | void;
isClosed(): boolean;
}
3 changes: 3 additions & 0 deletions packages/core/src/server/live/liveView.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class CallbackMessenger implements WsAdaptor {
subscribeToMessages(msgListener: WsMsgListener): void | Promise<void> {
throw new Error("Method not implemented.");
}
isClosed(): boolean {
throw new Error("Method not implemented.");
}
}

function newManager(callback: (message: string) => void): LiveViewManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,13 @@ function newMgr(opts?: NewMgrOpts): LiveViewManager {
const serDe = opts?.serDe ?? new JsonSerDe();
const fileSystemAdaptor = opts?.fileSystemAdaptor ?? new TestNodeFileSystemAdatptor();
const wsAdaptor =
opts?.wsAdaptor ?? ({ send: jest.fn(), subscribeToClose: jest.fn(), subscribeToMessages: jest.fn() } as WsAdaptor);
opts?.wsAdaptor ??
({
send: jest.fn(),
subscribeToClose: jest.fn(),
subscribeToMessages: jest.fn(),
isClosed: jest.fn(),
} as WsAdaptor);
const lv = opts?.liveView ?? createLiveView({ render: () => html`` });
const cid = opts?.cid ?? nanoid();
return new LiveViewManager(
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/server/socket/liveViewManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ describe("test liveview manager", () => {
send: jest.fn(),
subscribeToClose: jest.fn(),
subscribeToMessages: jest.fn(),
isClosed: jest.fn(),
};
cmLiveView = new LiveViewManager(
new TestLiveViewComponent(),
Expand Down Expand Up @@ -511,6 +512,7 @@ describe("test liveview manager", () => {
},
subscribeToClose: () => {},
subscribeToMessages: () => {},
isClosed: () => true,
};
const cm = new LiveViewManager(
tc,
Expand Down Expand Up @@ -565,6 +567,7 @@ describe("test liveview manager", () => {
},
subscribeToClose: () => {},
subscribeToMessages: () => {},
isClosed: () => false,
};
const cm = new LiveViewManager(
testArrayOfLiveTemplatesLV,
Expand Down Expand Up @@ -592,6 +595,7 @@ describe("test liveview manager", () => {
},
subscribeToClose: () => {},
subscribeToMessages: () => {},
isClosed: () => false,
};
const cm = new LiveViewManager(
testArrayOfLCLV,
Expand Down Expand Up @@ -736,6 +740,7 @@ describe("test liveview manager", () => {
},
subscribeToClose: () => {},
subscribeToMessages: () => {},
isClosed: () => false,
};
const c = new PutFlashComponent();
const cm = new LiveViewManager(
Expand Down Expand Up @@ -1193,6 +1198,9 @@ class TestWsAdaptor implements WsAdaptor {
subscribeToClose(closeListener: WsCloseListener): void | Promise<void> {
throw new Error("Method not implemented.");
}
isClosed(): boolean {
throw new Error("Method not implemented.");
}

send(message: string, errorHandler?: (err: any) => void): void {
this.msgCb(message);
Expand Down
50 changes: 37 additions & 13 deletions packages/core/src/server/socket/ws/wsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,7 @@ export class WsHandler {
* @param msg a Phx.Msg to be routed
*/
async handleMsg(msg: Phx.Msg<unknown>): Promise<void> {
if (this.#config.debug) {
try {
this.#config.debug(JSON.stringify(msg));
} catch (e) {
console.error("error debugging message", e);
}
}
this.maybeDebug(JSON.stringify(msg));
try {
// attempt to prevent race conditions by queuing messages
// if we are already processing a message
Expand Down Expand Up @@ -414,26 +408,56 @@ export class WsHandler {
}
}

async close() {
send(reply: PhxReply.Reply) {
try {
const shutdown = this.maybeShutdown();
if (!shutdown) {
this.#ws.send(PhxReply.serialize(reply), this.maybeHandleError.bind(this));
}
} catch (e) {
this.maybeHandleError(e);
}
}

private async close() {
// redirect this through handleMsg after adding the joinId
const joinId = this.#ctx?.joinId ?? "unknown";
this.handleMsg([null, null, joinId, "phx_leave", null]);
}

send(reply: PhxReply.Reply) {
try {
this.#ws.send(PhxReply.serialize(reply), this.maybeHandleError.bind(this));
} catch (e) {
this.maybeHandleError(e);
/**
* Check if the websocket is closed and if so, shutdown the liveview
*/
private maybeShutdown() {
if (this.#ws.isClosed()) {
this.maybeDebug(`ws closed, shutting down liveview: ${this.#ctx?.joinId}`);
this.close();
return true;
}
return false;
}

/**
* Call the config.onError callback on send errors and if the
* websocket is closed, shutdown the liveview
*/
private maybeHandleError(err: any) {
this.maybeShutdown();
if (err && this.#config && this.#config.onError) {
this.#config.onError(err);
}
}

private maybeDebug(msg: string) {
if (this.#config.debug) {
try {
this.#config.debug(msg);
} catch (e) {
console.error("error debugging message", e);
}
}
}

private async cleanupPostReply() {
// do post-send lifecycle step
this.#ctx!.socket.updateContextWithTempAssigns();
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/server/socket/wsMessageRouter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@ describe("test message router", () => {
let send: jest.Mock;
let subscribeToClose: jest.Mock;
let subscribeToMessages: jest.Mock;
let isClosed: jest.Mock;
let pubSub: PubSub;
let flashAdaptor: FlashAdaptor;
let filesAdaptor: TestNodeFileSystemAdatptor;
beforeEach(() => {
send = jest.fn();
subscribeToClose = jest.fn();
subscribeToMessages = jest.fn();
isClosed = jest.fn();
pubSub = new SingleProcessPubSub();
flashAdaptor = new SessionFlashAdaptor();
filesAdaptor = new TestNodeFileSystemAdatptor();
ws = {
send,
subscribeToClose,
subscribeToMessages,
isClosed,
};
mr = new WsMessageRouter(router, pubSub, flashAdaptor, new JsonSerDe(), filesAdaptor);
});
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/server/test/wsAdaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export class TestWsAdaptor implements WsAdaptor {
private sendFn: (msg: string) => void;
#closeListener: WsCloseListener;
#msgListener: WsMsgListener;
closed: boolean = false;
constructor(sendFn: (msg: string) => void) {
this.sendFn = sendFn;
}
Expand All @@ -16,6 +17,9 @@ export class TestWsAdaptor implements WsAdaptor {
subscribeToMessages(msgListener: WsMsgListener): void {
this.#msgListener = msgListener;
}
isClosed(): boolean {
return this.closed;
}

async sendIncomingMsg(msg: string) {
await this.#msgListener(Buffer.from(msg), false);
Expand Down
1 change: 0 additions & 1 deletion packages/deno/src/deno/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ export class DenoOakLiveViewServer implements LiveViewServerAdaptor<DenoMiddlewa
return async (ctx: Context<Record<string, any>, Record<string, any>>) => {
// upgrade the request to a websocket connection
const ws = await ctx.upgrade();
// const connectionId = nanoid();
new WsHandler(new DenoWsAdaptor(ws), this.#config);
};
}
Expand Down
Loading

0 comments on commit 87b658d

Please sign in to comment.