Skip to content

Commit

Permalink
cleanup disconnected sockets and some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
floodfx committed Jan 26, 2022
1 parent d9d9f0c commit 121ce05
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 113 deletions.
5 changes: 2 additions & 3 deletions src/examples/sales_dashboard_liveview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ export class SalesDashboardLiveViewComponent extends BaseLiveViewComponent<Sales

mount(params: any, session: any, socket: LiveViewSocket<SalesDashboardContext>): SalesDashboardContext {
if (socket.connected) {
// TODO clean up interval on unmount
const intervalId = setInterval(() => {
socket.repeat(() => {
socket.sendInternal("tick");
}, 1000);
}
Expand Down Expand Up @@ -66,7 +65,7 @@ export class SalesDashboardLiveViewComponent extends BaseLiveViewComponent<Sales
</div>
</div>
<button phx-click="refresh">
<img src="images/refresh.svg" />
Refresh
</button>
</div>
Expand Down
7 changes: 5 additions & 2 deletions src/server/live_view_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import http, { Server, createServer } from 'http';
import express from "express";
import { nanoid } from "nanoid";
import jwt from "jsonwebtoken";
import { onMessage } from "./socket/message_router";
import session, { MemoryStore } from "express-session";
import path from "path";
import { LiveViewComponentManager } from "./socket/component_manager";
import { MessageRouter } from "./socket/message_router";


// extend / define session interface
declare module 'express-session' {
Expand All @@ -32,6 +33,7 @@ export class LiveViewServer {
private sessionStore: session.Store = new MemoryStore();

private _router: LiveViewRouter = {};
private messageRouter = new MessageRouter()

constructor(options: Partial<LiveViewServerOptions>) {
this.port = options.port ?? this.port;
Expand Down Expand Up @@ -67,7 +69,7 @@ export class LiveViewServer {
const connectionId = nanoid();
// handle ws messages
socket.on('message', message => {
onMessage(socket, message, this._router, connectionId, this.signingSecret);
this.messageRouter.onMessage(socket, message, this._router, connectionId, this.signingSecret);
});
});

Expand Down Expand Up @@ -110,6 +112,7 @@ export class LiveViewServer {
connected: false, // ws socket not connected on http request
context: {},
sendInternal: () => { },
repeat: () => { },
}

// look up component for route
Expand Down
32 changes: 30 additions & 2 deletions src/server/socket/component_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ export class LiveViewComponentManager {
private context: unknown;
private component: LiveViewComponent<unknown, unknown>;
private signingSecret: string;
private intervals: NodeJS.Timeout[] = [];
private lastHeartbeat: number = Date.now();
private socketIsClosed: boolean = false;

constructor(component: LiveViewComponent<unknown, unknown>, signingSecret: string) {
this.component = component;
Expand Down Expand Up @@ -52,7 +55,7 @@ export class LiveViewComponentManager {
}

onHeartbeat(ws: WebSocket, message: PhxHeartbeatIncoming) {
// TODO keep track of last heartbeat and disconnect if no heartbeat for a while?
this.lastHeartbeat = Date.now();
this.sendPhxReply(ws, newHeartbeatReply(message));
}

Expand Down Expand Up @@ -120,6 +123,24 @@ export class LiveViewComponentManager {
this.sendPhxReply(ws, newPhxReply(message, replyPayload));
}

repeat(fn: () => void, intervalMillis: number) {
this.intervals.push(setInterval(fn, intervalMillis));
}

shutdown() {
this.intervals.forEach(clearInterval);
}

isHealthy() {
if (this.socketIsClosed) {
return false;
} else {
const now = Date.now();
const diff = now - this.lastHeartbeat;
return diff < 60000;
}
}

private sendInternal(ws: WebSocket, event: any, topic: string): void {
// console.log("sendInternal", event);

Expand Down Expand Up @@ -151,13 +172,20 @@ export class LiveViewComponentManager {
ws, // the websocket
context: this.context,
sendInternal: (event) => this.sendInternal(ws, event, topic),
repeat: (fn, intervalMillis) => this.repeat(fn, intervalMillis),
}
}

private sendPhxReply(ws: WebSocket, reply: PhxOutgoingMessage<any>) {
ws.send(JSON.stringify(reply), { binary: false }, (err: any) => {
if (err) {
console.error("error", err);
if (ws.CLOSED) {
this.socketIsClosed = true;
this.shutdown();
console.error("socket is closed", err, "...shutting down topic", reply[2], "for component", this.component);
} else {

}
}
});
}
Expand Down
202 changes: 96 additions & 106 deletions src/server/socket/message_router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,121 +4,111 @@ import { LiveViewComponent } from '../types';
import { LiveViewRouter } from '../types';
import jwt from 'jsonwebtoken';
import { LiveViewComponentManager } from './component_manager';
import { LiveViewSocket } from '..';

const topicComponentManager: { [key: string]: LiveViewComponentManager } = {};
const heartbeatRouter: { [key: string]: LiveViewComponentManager } = {};

export function onMessage(ws: WebSocket, message: WebSocket.RawData, router: LiveViewRouter, connectionId: string, signingSecret: string) {

// parse string to JSON
const rawPhxMessage: PhxIncomingMessage<unknown> = JSON.parse(message.toString());

// rawPhxMessage must be an array with 5 elements
if (typeof rawPhxMessage === 'object' && Array.isArray(rawPhxMessage) && rawPhxMessage.length === 5) {
const [joinRef, messageRef, topic, event, payload] = rawPhxMessage;

let componentManager: LiveViewComponentManager | undefined;
switch (event) {
case "phx_join":
// assume componentManager is not defined since join creates a new component manager
onPhxJoin(ws, rawPhxMessage as PhxJoinIncoming, router, signingSecret, connectionId);
break;
case "heartbeat":
// heartbeat comes in as a "phoenix" topic so lookup via connectionId
componentManager = heartbeatRouter[connectionId];
if (componentManager) {
componentManager.onHeartbeat(ws, rawPhxMessage as PhxHeartbeatIncoming);
} else {
console.log("expected component manager for topic", topic);
}
break;
case "event":
// lookup component manager for this topic
componentManager = topicComponentManager[topic];
if (componentManager) {
componentManager.onEvent(ws, rawPhxMessage as PhxIncomingMessage<PhxClickPayload | PhxFormPayload>);
} else {
console.log("expected component manager for topic", topic);
}
break;
case "live_patch":
componentManager = topicComponentManager[topic];
if (componentManager) {
componentManager.onLivePatch(ws, rawPhxMessage as PhxLivePatchIncoming);
} else {
console.log("expected component manager for topic", topic);
}
break;
default:
console.error("unhandeded protocol event", event);
}
}
else {
// unknown message type
console.error("unknown message type", rawPhxMessage);
}
}


export function onPhxJoin(ws: WebSocket, message: PhxJoinIncoming, router: LiveViewRouter, signingSecret: string, connectionId: string) {
export class MessageRouter {

topicComponentManager: { [key: string]: LiveViewComponentManager } = {};
heartbeatRouter: { [key: string]: LiveViewComponentManager } = {};

onMessage(ws: WebSocket, message: WebSocket.RawData, router: LiveViewRouter, connectionId: string, signingSecret: string) {
console.log('connectionId', connectionId);
// parse string to JSON
const rawPhxMessage: PhxIncomingMessage<unknown> = JSON.parse(message.toString());

// rawPhxMessage must be an array with 5 elements
if (typeof rawPhxMessage === 'object' && Array.isArray(rawPhxMessage) && rawPhxMessage.length === 5) {
const [joinRef, messageRef, topic, event, payload] = rawPhxMessage;

let componentManager: LiveViewComponentManager | undefined;
switch (event) {
case "phx_join":
// assume componentManager is not defined since join creates a new component manager
this.onPhxJoin(ws, rawPhxMessage as PhxJoinIncoming, router, signingSecret, connectionId);
break;
case "heartbeat":
// heartbeat comes in as a "phoenix" topic so lookup via connectionId
this.onHeartbeat(ws, rawPhxMessage as PhxHeartbeatIncoming, topic, connectionId);
break;
case "event":
// lookup component manager for this topic
componentManager = this.topicComponentManager[topic];
if (componentManager) {
componentManager.onEvent(ws, rawPhxMessage as PhxIncomingMessage<PhxClickPayload | PhxFormPayload>);
} else {
console.log("expected component manager for topic", topic);
}
break;
case "live_patch":
componentManager = this.topicComponentManager[topic];
if (componentManager) {
componentManager.onLivePatch(ws, rawPhxMessage as PhxLivePatchIncoming);
} else {
console.log("expected component manager for topic", topic);
}
break;
default:
console.error("unhandeded protocol event", event);
}
}
else {
// unknown message type
console.error("unknown message type", rawPhxMessage);
}

// use url to route join request to component
const [joinRef, messageRef, topic, event, payload] = message;
const { url: urlString } = payload;
const url = new URL(urlString);
const component = router[url.pathname];
if (!component) {
console.error("no component found for", url);
return;
// cleanup unhealthy component managers
Object.keys(this.topicComponentManager).forEach(key => {
const cm = this.topicComponentManager[key];
if (!cm.isHealthy()) {
cm.shutdown()
console.log("deleting", key, " from topicComponentManager");
delete this.topicComponentManager[key];
}
})

// cleanup unhealthy heartbeat routers
Object.keys(this.heartbeatRouter).forEach(key => {
const cm = this.heartbeatRouter[key];
if (!cm.isHealthy()) {
cm.shutdown()
console.log("deleting", key, " from heartbeatRouter");
delete this.heartbeatRouter[key];
}
})
}

const componentManager = new LiveViewComponentManager(component, signingSecret);
topicComponentManager[topic] = componentManager;
heartbeatRouter[connectionId] = componentManager;
componentManager.handleJoin(ws, message);

}

// export function sendInternalMessage(socket: LiveViewSocket<unknown>, component: LiveViewComponent<any, any>, event: any, payload?: any) {
onPhxJoin(ws: WebSocket, message: PhxJoinIncoming, router: LiveViewRouter, signingSecret: string, connectionId: string) {

// // check if component has event handler
// if (!(component as any).handleInfo) {
// console.warn("no info handler for component", component);
// return;
// }

// // @ts-ignore
// const ctx = component.handleInfo(event, socket);

// const view = component.render(ctx);
// use url to route join request to component
const [joinRef, messageRef, topic, event, payload] = message;
const { url: urlString } = payload;
const url = new URL(urlString);
const component = router[url.pathname];
if (!component) {
console.error("no component found for", url);
return;
}

// const reply: PhxDiffReply = [
// null, // no join reference
// null, // no message reference
// socket.id,
// "diff",
// view.partsTree(false) as any
// ]
// TODO - iterate through other component managers and detect dead ones via heartbeat
// remove from heartbeat and topic routers

// sendPhxReply(socket.ws!, reply);
// }
const componentManager = new LiveViewComponentManager(component, signingSecret);
this.topicComponentManager[topic] = componentManager;
this.heartbeatRouter[connectionId] = componentManager;
componentManager.handleJoin(ws, message);

console.log("heartbeatRouter", Object.keys(this.heartbeatRouter));
console.log("topicComponentManager", Object.keys(this.topicComponentManager));

// function sendPhxReply(ws: WebSocket, reply: PhxOutgoingMessage<any>) {
// ws.send(JSON.stringify(reply), { binary: false }, (err: any) => {
// if (err) {
// console.error("error", err);
// }
// });
// }
}

onHeartbeat(ws: WebSocket, message: PhxHeartbeatIncoming, topic: string, connectionId: string) {
const componentManager = this.heartbeatRouter[connectionId];

// function printHtml(rendered: RenderedNode) {
// const statics = rendered.s;
// let html = statics[0];
// for (let i = 1; i < statics.length; i++) {
// html += rendered[i - 1] + statics[i];
// }
// console.log("html:\n", html);
// }
if (componentManager) {
componentManager.onHeartbeat(ws, message);
} else {
console.log("expected component manager for topic", topic);
}
}
}
1 change: 1 addition & 0 deletions src/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export interface LiveViewSocket<T> {
context: T;
ws?: WebSocket;
sendInternal: (event: unknown) => void;
repeat: (fn: () => void, intervalMillis: number) => void;
}

export interface LiveViewTemplate extends HtmlSafeString {
Expand Down

0 comments on commit 121ce05

Please sign in to comment.