Skip to content

Commit

Permalink
Fix live_patch bug caused by returning early from handleMsg
Browse files Browse the repository at this point in the history
- returning early meant active was never reset so got into bad state
- also add optional onError to tap into errors from server config
- make errors in handleMsg and send routed through onError
- fix import
  • Loading branch information
floodfx committed Jan 12, 2023
1 parent e713cf0 commit 0985538
Show file tree
Hide file tree
Showing 14 changed files with 310 additions and 250 deletions.
8 changes: 8 additions & 0 deletions .changeset/clean-brooms-clap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@liveviewjs/lambda-examples": patch
"liveviewjs": patch
"@liveviewjs/examples": patch
"@liveviewjs/express": patch
---

Fix live_patch early return bug and add option onError server config
341 changes: 173 additions & 168 deletions packages/core/coverage/clover.xml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/core/coverage/coverage-final.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions packages/core/dist/liveview.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,7 @@ interface WsHandlerConfig {
wrapperTemplate?: LiveViewWrapperTemplate;
flashAdaptor: FlashAdaptor;
pubSub: PubSub;
onError?: (err: any) => void;
}
declare class WsHandlerContext {
#private;
Expand Down Expand Up @@ -1357,6 +1358,7 @@ declare class WsHandler {
handleMsg(msg: Phx.Msg<unknown>): Promise<void>;
close(): Promise<void>;
send(reply: PhxReply.Reply): void;
private maybeHandleError;
private cleanupPostReply;
private viewToDiff;
private viewToRendered;
Expand Down
65 changes: 39 additions & 26 deletions packages/core/dist/liveview.js
Original file line number Diff line number Diff line change
Expand Up @@ -3190,16 +3190,17 @@ class WsHandler {
this.#ws.subscribeToClose(() => this.close);
}
async handleMsg(msg) {
// attempt to prevent race conditions by queuing messages
// if we are already processing a message
if (this.#activeMsg) {
this.#msgQueue.push(msg);
return;
}
this.#activeMsg = true;
const event = msg[exports.Phx.MsgIdx.event];
const topic = msg[exports.Phx.MsgIdx.topic];
console.log("dispatching message", msg);
try {
// attempt to prevent race conditions by queuing messages
// if we are already processing a message
if (this.#activeMsg) {
this.#msgQueue.push(msg);
return;
}
this.#activeMsg = true;
const event = msg[exports.Phx.MsgIdx.event];
const topic = msg[exports.Phx.MsgIdx.topic];
switch (event) {
case "phx_join":
// phx_join event used for both LiveView joins and LiveUpload joins
Expand Down Expand Up @@ -3303,6 +3304,7 @@ class WsHandler {
this.send(msg);
break;
case "live_patch":
console.log("live_patch", msg);
// two cases of live_patch: server-side (pushPatch) or client-side (click on link)
try {
const payload = msg[exports.Phx.MsgIdx.payload];
Expand All @@ -3315,16 +3317,17 @@ class WsHandler {
const diff = await this.viewToDiff(view);
this.send(PhxReply.diffReply(msg, diff));
this.cleanupPostReply();
return;
}
// case 2: server-side live_patch
const { to } = payload;
// to is relative so need to provide the urlBase determined on initial join
this.#ctx.url = new URL(to, this.#ctx.url);
// let the `LiveView` udpate its context based on the new url
await this.#ctx.liveView.handleParams(this.#ctx.url, this.#ctx.socket);
// send the message on to the client
this.send(msg);
else {
// case 2: server-side live_patch
const { to } = payload;
// to is relative so need to provide the urlBase determined on initial join
this.#ctx.url = new URL(to, this.#ctx.url);
// let the `LiveView` udpate its context based on the new url
await this.#ctx.liveView.handleParams(this.#ctx.url, this.#ctx.socket);
// send the message on to the client
this.send(msg);
}
}
catch (e) {
/* istanbul ignore next */
Expand Down Expand Up @@ -3405,15 +3408,15 @@ class WsHandler {
default:
throw new Error(`unexpected phx protocol event ${event}`);
}
// we're done with this message, so we can process the next one if there is one
this.#activeMsg = false;
const nextMsg = this.#msgQueue.pop();
if (nextMsg) {
this.handleMsg(nextMsg);
}
}
catch (e) {
console.error("error handling phx message", e);
}
// we're done with this message, so we can process the next one if there is one
this.#activeMsg = false;
const nextMsg = this.#msgQueue.pop();
if (nextMsg) {
this.handleMsg(nextMsg);
this.maybeHandleError(e);
}
}
async close() {
Expand All @@ -3422,7 +3425,17 @@ class WsHandler {
this.handleMsg([null, null, joinId, "phx_leave", null]);
}
send(reply) {
this.#ws.send(PhxReply.serialize(reply));
try {
this.#ws.send(PhxReply.serialize(reply), this.maybeHandleError);
}
catch (e) {
this.maybeHandleError(e);
}
}
maybeHandleError(err) {
if (err && this.#config.onError) {
this.#config.onError(err);
}
}
async cleanupPostReply() {
// do post-send lifecycle step
Expand Down
65 changes: 39 additions & 26 deletions packages/core/dist/liveview.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3183,16 +3183,17 @@ class WsHandler {
this.#ws.subscribeToClose(() => this.close);
}
async handleMsg(msg) {
// attempt to prevent race conditions by queuing messages
// if we are already processing a message
if (this.#activeMsg) {
this.#msgQueue.push(msg);
return;
}
this.#activeMsg = true;
const event = msg[Phx.MsgIdx.event];
const topic = msg[Phx.MsgIdx.topic];
console.log("dispatching message", msg);
try {
// attempt to prevent race conditions by queuing messages
// if we are already processing a message
if (this.#activeMsg) {
this.#msgQueue.push(msg);
return;
}
this.#activeMsg = true;
const event = msg[Phx.MsgIdx.event];
const topic = msg[Phx.MsgIdx.topic];
switch (event) {
case "phx_join":
// phx_join event used for both LiveView joins and LiveUpload joins
Expand Down Expand Up @@ -3296,6 +3297,7 @@ class WsHandler {
this.send(msg);
break;
case "live_patch":
console.log("live_patch", msg);
// two cases of live_patch: server-side (pushPatch) or client-side (click on link)
try {
const payload = msg[Phx.MsgIdx.payload];
Expand All @@ -3308,16 +3310,17 @@ class WsHandler {
const diff = await this.viewToDiff(view);
this.send(PhxReply.diffReply(msg, diff));
this.cleanupPostReply();
return;
}
// case 2: server-side live_patch
const { to } = payload;
// to is relative so need to provide the urlBase determined on initial join
this.#ctx.url = new URL(to, this.#ctx.url);
// let the `LiveView` udpate its context based on the new url
await this.#ctx.liveView.handleParams(this.#ctx.url, this.#ctx.socket);
// send the message on to the client
this.send(msg);
else {
// case 2: server-side live_patch
const { to } = payload;
// to is relative so need to provide the urlBase determined on initial join
this.#ctx.url = new URL(to, this.#ctx.url);
// let the `LiveView` udpate its context based on the new url
await this.#ctx.liveView.handleParams(this.#ctx.url, this.#ctx.socket);
// send the message on to the client
this.send(msg);
}
}
catch (e) {
/* istanbul ignore next */
Expand Down Expand Up @@ -3398,15 +3401,15 @@ class WsHandler {
default:
throw new Error(`unexpected phx protocol event ${event}`);
}
// we're done with this message, so we can process the next one if there is one
this.#activeMsg = false;
const nextMsg = this.#msgQueue.pop();
if (nextMsg) {
this.handleMsg(nextMsg);
}
}
catch (e) {
console.error("error handling phx message", e);
}
// we're done with this message, so we can process the next one if there is one
this.#activeMsg = false;
const nextMsg = this.#msgQueue.pop();
if (nextMsg) {
this.handleMsg(nextMsg);
this.maybeHandleError(e);
}
}
async close() {
Expand All @@ -3415,7 +3418,17 @@ class WsHandler {
this.handleMsg([null, null, joinId, "phx_leave", null]);
}
send(reply) {
this.#ws.send(PhxReply.serialize(reply));
try {
this.#ws.send(PhxReply.serialize(reply), this.maybeHandleError);
}
catch (e) {
this.maybeHandleError(e);
}
}
maybeHandleError(err) {
if (err && this.#config.onError) {
this.#config.onError(err);
}
}
async cleanupPostReply() {
// do post-send lifecycle step
Expand Down
67 changes: 39 additions & 28 deletions packages/core/src/server/socket/ws/wsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface WsHandlerConfig {
wrapperTemplate?: LiveViewWrapperTemplate;
flashAdaptor: FlashAdaptor;
pubSub: PubSub;
onError?: (err: any) => void;
}

export class WsHandlerContext {
Expand Down Expand Up @@ -151,16 +152,16 @@ export class WsHandler {
}

async handleMsg(msg: Phx.Msg<unknown>) {
// attempt to prevent race conditions by queuing messages
// if we are already processing a message
if (this.#activeMsg) {
this.#msgQueue.push(msg);
return;
}
this.#activeMsg = true;
const event = msg[Phx.MsgIdx.event];
const topic = msg[Phx.MsgIdx.topic];
try {
// attempt to prevent race conditions by queuing messages
// if we are already processing a message
if (this.#activeMsg) {
this.#msgQueue.push(msg);
return;
}
this.#activeMsg = true;
const event = msg[Phx.MsgIdx.event];
const topic = msg[Phx.MsgIdx.topic];
switch (event) {
case "phx_join":
// phx_join event used for both LiveView joins and LiveUpload joins
Expand Down Expand Up @@ -283,8 +284,8 @@ export class WsHandler {
// two cases of live_patch: server-side (pushPatch) or client-side (click on link)
try {
const payload = msg[Phx.MsgIdx.payload] as Phx.LivePatchPayload | Phx.LiveNavPushPayload;
// case 1: client-side live_patch
if (payload.hasOwnProperty("url")) {
// case 1: client-side live_patch
const url = new URL((payload as Phx.LivePatchPayload).url);
this.#ctx!.url = url;
await this.#ctx!.liveView.handleParams(url, this.#ctx!.socket);
Expand All @@ -295,16 +296,16 @@ export class WsHandler {
const diff = await this.viewToDiff(view);
this.send(PhxReply.diffReply(msg, diff));
this.cleanupPostReply();
return;
} else {
// case 2: server-side live_patch
const { to } = payload as Phx.LiveNavPushPayload;
// to is relative so need to provide the urlBase determined on initial join
this.#ctx!.url = new URL(to, this.#ctx!.url);
// let the `LiveView` udpate its context based on the new url
await this.#ctx!.liveView.handleParams(this.#ctx!.url, this.#ctx!.socket);
// send the message on to the client
this.send(msg as PhxReply.Reply);
}
// case 2: server-side live_patch
const { to } = payload as Phx.LiveNavPushPayload;
// to is relative so need to provide the urlBase determined on initial join
this.#ctx!.url = new URL(to, this.#ctx!.url);
// let the `LiveView` udpate its context based on the new url
await this.#ctx!.liveView.handleParams(this.#ctx!.url, this.#ctx!.socket);
// send the message on to the client
this.send(msg as PhxReply.Reply);
} catch (e) {
/* istanbul ignore next */
console.error("Error handling live_patch", e);
Expand Down Expand Up @@ -379,15 +380,15 @@ export class WsHandler {
default:
throw new Error(`unexpected phx protocol event ${event}`);
}
} catch (e) {
console.error("error handling phx message", e);
}

// we're done with this message, so we can process the next one if there is one
this.#activeMsg = false;
const nextMsg = this.#msgQueue.pop();
if (nextMsg) {
this.handleMsg(nextMsg);
// we're done with this message, so we can process the next one if there is one
this.#activeMsg = false;
const nextMsg = this.#msgQueue.pop();
if (nextMsg) {
this.handleMsg(nextMsg);
}
} catch (e) {
this.maybeHandleError(e);
}
}

Expand All @@ -398,7 +399,17 @@ export class WsHandler {
}

send(reply: PhxReply.Reply) {
this.#ws.send(PhxReply.serialize(reply));
try {
this.#ws.send(PhxReply.serialize(reply), this.maybeHandleError);
} catch (e) {
this.maybeHandleError(e);
}
}

private maybeHandleError(err: any) {
if (err && this.#config.onError) {
this.#config.onError(err);
}
}

private async cleanupPostReply() {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/server/socket/ws/wsUploadHandler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { nanoid } from "nanoid";
import { LiveViewTemplate } from "src/server/live";
import { FileSystemAdaptor } from "../../../server/adaptor";
import { LiveViewTemplate } from "../../../server/live";
import { Phx } from "../../../server/protocol/phx";
import { PhxReply } from "../../../server/protocol/reply";
import { UploadConfig } from "../../../server/upload";
Expand Down
2 changes: 2 additions & 0 deletions packages/deno/src/deno/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ interface DenoOakLiveViewServerOptions {
flashAdaptor?: FlashAdaptor;
fileSystemAdaptor?: FileSystemAdaptor;
wrapperTemplate?: LiveViewWrapperTemplate;
onError?: (err: any) => void;
}

type DenoMiddleware = (
Expand Down Expand Up @@ -87,6 +88,7 @@ export class DenoOakLiveViewServer implements LiveViewServerAdaptor<DenoMiddlewa
wrapperTemplate: this.wrapperTemplate,
flashAdaptor: this.flashAdapter,
pubSub: this.pubSub,
onError: options?.onError,
};
}

Expand Down
1 change: 1 addition & 0 deletions packages/express/dist/liveviewjs-express.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ interface NodeExpressLiveViewServerOptions {
flashAdaptor?: FlashAdaptor;
fileSystemAdaptor?: FileSystemAdaptor;
wrapperTemplate?: LiveViewWrapperTemplate;
onError?: (err: any) => void;
}
declare class NodeExpressLiveViewServer implements LiveViewServerAdaptor<RequestHandler, (wsServer: WebSocketServer) => Promise<void>> {
#private;
Expand Down
1 change: 1 addition & 0 deletions packages/express/dist/liveviewjs-express.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class NodeExpressLiveViewServer {
wrapperTemplate: this.wrapperTemplate,
flashAdaptor: this.flashAdapter,
pubSub: this.pubSub,
onError: options === null || options === void 0 ? void 0 : options.onError,
}, "f");
}
wsMiddleware() {
Expand Down
1 change: 1 addition & 0 deletions packages/express/dist/liveviewjs-express.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class NodeExpressLiveViewServer {
wrapperTemplate: this.wrapperTemplate,
flashAdaptor: this.flashAdapter,
pubSub: this.pubSub,
onError: options === null || options === void 0 ? void 0 : options.onError,
}, "f");
}
wsMiddleware() {
Expand Down
Loading

0 comments on commit 0985538

Please sign in to comment.