Skip to content

Commit

Permalink
add PubSub for Redis and SingleProcess; refactor ComponentManager, Ba…
Browse files Browse the repository at this point in the history
…seLiveViewComponent, and MessageRouter to use PubSub; refactor examples based on PubSub and PushPatch api change
  • Loading branch information
floodfx committed Mar 6, 2022
1 parent a4b0eb8 commit 12220ed
Show file tree
Hide file tree
Showing 16 changed files with 672 additions and 641 deletions.
785 changes: 396 additions & 389 deletions coverage/clover.xml

Large diffs are not rendered by default.

39 changes: 20 additions & 19 deletions coverage/coverage-final.json

Large diffs are not rendered by default.

19 changes: 17 additions & 2 deletions src/examples/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import path from 'path';
import { LiveViewServer } from '../server';
import { LiveViewRouter } from '../server/component/types';
import { configLiveViewHandler } from '../server/live_view_route';
import { AsyncFetchLiveViewComponent } from './asyncfetch/component';
import { AutocompleteLiveViewComponent } from './autocomplete/component';
import { LicenseLiveViewComponent } from './license_liveview';
import { LightLiveViewComponent } from './light_liveview';
import { SearchLiveViewComponent } from './live-search/component';
import { PaginateLiveViewComponent } from './pagination/component';
import { routeDetails } from './routeDetails';
import { SalesDashboardLiveViewComponent } from './sales_dashboard_liveview';
import { ServersLiveViewComponent } from './servers/component';
import { SortLiveViewComponent } from './sorting/component';
import { routeDetails } from './routeDetails';
import { VolunteerComponent } from './volunteers/component';
import { AsyncFetchLiveViewComponent } from './asyncfetch/component';

const lvServer = new LiveViewServer({
signingSecret: "MY_VERY_SECRET_KEY",
Expand Down Expand Up @@ -51,6 +52,20 @@ lvServer.registerLiveViewRoutes(router)
// register single route
// lvServer.registerLiveViewRoute("/volunteers", new VolunteerComponent())


lvServer.expressApp.get("/foo/bar", configLiveViewHandler(
new LightLiveViewComponent(),
"root.html.ejs",
"signing-secret-foo",
(req) => {
return { ...req.session, csrfToken: "csrfToken"}
},
{
title: "Examples",
},
)
)

// add your own routes to the express app
lvServer.expressApp.get("/", (req, res) => {

Expand Down
12 changes: 6 additions & 6 deletions src/examples/pagination/component.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { options_for_select } from "../../server/templates/helpers/options_for_select";
import { live_patch } from "../../server/templates/helpers/live_patch";
import { html, HtmlSafeString, join } from "../../server/templates";
import { LiveViewExternalEventListener, LiveViewMountParams, LiveViewSocket, StringPropertyValues } from "../../server/component/types";
import { almostExpired, Donation, listItems } from "./data";
import { SessionData } from "express-session";
import { BaseLiveViewComponent } from "../../server/component/base_component";
import { LiveViewExternalEventListener, LiveViewMountParams, LiveViewSocket, StringPropertyValues } from "../../server/component/types";
import { html, HtmlSafeString, join } from "../../server/templates";
import { live_patch } from "../../server/templates/helpers/live_patch";
import { options_for_select } from "../../server/templates/helpers/options_for_select";
import { almostExpired, Donation, listItems } from "./data";

interface Options {
page: number;
Expand Down Expand Up @@ -84,7 +84,7 @@ export class PaginateLiveViewComponent extends BaseLiveViewComponent<PaginateCon
const page = socket.context.options.page;
const perPage = Number(params.perPage || 10);

this.pushPatch(socket, { to: { path: "/paginate", params: { page: String(page), perPage: String(perPage) } } });
socket.pushPatch({ to: { path: "/paginate", params: { page: String(page), perPage: String(perPage) } } });

return {
options: { page, perPage },
Expand Down
12 changes: 6 additions & 6 deletions src/examples/sorting/component.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { options_for_select } from "../../server/templates/helpers/options_for_select";
import { live_patch } from "../../server/templates/helpers/live_patch";
import { html, HtmlSafeString, join } from "../../server/templates";
import { LiveViewExternalEventListener, LiveViewMountParams, LiveViewSocket, StringPropertyValues } from "../../server/component/types";
import { almostExpired, Donation, listItems, donations } from "./data";
import { SessionData } from "express-session";
import { BaseLiveViewComponent } from "../../server/component/base_component";
import { LiveViewExternalEventListener, LiveViewMountParams, LiveViewSocket, StringPropertyValues } from "../../server/component/types";
import { html, HtmlSafeString, join } from "../../server/templates";
import { live_patch } from "../../server/templates/helpers/live_patch";
import { options_for_select } from "../../server/templates/helpers/options_for_select";
import { almostExpired, Donation, donations, listItems } from "./data";

export interface PaginateOptions {
page: number;
Expand Down Expand Up @@ -118,7 +118,7 @@ export class SortLiveViewComponent extends BaseLiveViewComponent<SortContext, Pa
}


this.pushPatch(socket, { to: { path: "/sort", params: { page: String(page), perPage: String(perPage), sortOrder, sort_by } } });
socket.pushPatch({ to: { path: "/sort", params: { page: String(page), perPage: String(perPage), sortOrder, sort_by } } });

return {
options: { page, perPage, sort_by, sortOrder },
Expand Down
52 changes: 18 additions & 34 deletions src/examples/volunteers/component.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { html } from "../../server/templates";
import { LiveViewChangeset, LiveViewExternalEventListener, LiveViewInternalEventListener, LiveViewMountParams, LiveViewSocket, StringPropertyValues } from "../../server/component/types";
import { SessionData } from "express-session";
import { Volunteer, changeset, createVolunteer, listVolunteers, getVolunteer, updateVolunteer, VolunteerData } from "./data";
import { submit } from "../../server/templates/helpers/submit";
import { BaseLiveViewComponent } from "../../server/component/base_component";
import { LiveViewChangeset, LiveViewExternalEventListener, LiveViewInternalEventListener, LiveViewMountParams, LiveViewSocket, StringPropertyValues } from "../../server/component/types";
import { html } from "../../server/templates";
import { form_for } from "../../server/templates/helpers/form_for";
import { error_tag, telephone_input, text_input } from "../../server/templates/helpers/inputs";
import { BaseLiveViewComponent } from "../../server/component/base_component";
import { PubSub } from "../../server/pubsub/SingleProcessPubSub";
import { RedisPubSub } from "../../server/pubsub/RedisPubSub";
import { submit } from "../../server/templates/helpers/submit";
import { changeset, createVolunteer, getVolunteer, listVolunteers, updateVolunteer, Volunteer, VolunteerMutationEvent } from "./data";

export interface VolunteerContext {
volunteers: Volunteer[]
Expand All @@ -18,7 +16,7 @@ type VolunteerEvents = "save" | "validate" | "toggle-status";

export class VolunteerComponent extends BaseLiveViewComponent<VolunteerContext, unknown> implements
LiveViewExternalEventListener<VolunteerContext, VolunteerEvents, Volunteer>,
LiveViewInternalEventListener<VolunteerContext, VolunteerData> {
LiveViewInternalEventListener<VolunteerContext, VolunteerMutationEvent> {

mount(params: LiveViewMountParams, session: Partial<SessionData>, socket: LiveViewSocket<VolunteerContext>) {
if (socket.connected) {
Expand All @@ -36,17 +34,17 @@ export class VolunteerComponent extends BaseLiveViewComponent<VolunteerContext,
return html`
<h1>Volunteer Check-In</h1>
<div id="checkin">
${form_for<Volunteer>("#", {
phx_submit: "save",
phx_change: "validate"
})}
<div class="field">
${text_input<Volunteer>(changeset, "name", { placeholder: "Name", autocomplete: "off", phx_debounce: 1000 })}
${error_tag(changeset, "name")}
</div>
<div class="field">
${telephone_input<Volunteer>(changeset, "phone", {
placeholder: "Phone", autocomplete: "off", phx_debounce: "blur"
Expand All @@ -55,7 +53,7 @@ export class VolunteerComponent extends BaseLiveViewComponent<VolunteerContext,
</div>
${submit("Check In", { phx_disable_with: "Saving..." })}
</form>
<div id="volunteers" phx-update="prepend">
${volunteers.map(this.renderVolunteer)}
</div>
Expand Down Expand Up @@ -107,33 +105,19 @@ export class VolunteerComponent extends BaseLiveViewComponent<VolunteerContext,
}
// attempt to create the volunteer from the form data
const createChangeset = createVolunteer(volunteer);

// valid form data
if (createChangeset.valid) {
const newVolunteer = createChangeset.data as Volunteer;
// only add new volunteer since we're using phx-update="prepend"
// which means the new volunteer will be added to the top of the list
const newVolunteers = [newVolunteer];
const emptyChangeset = changeset({}, {}); // reset form
return {
volunteers: newVolunteers,
changeset: emptyChangeset
}
}
// form data was invalid
else {
return {
volunteers: [], // no volunteers to prepend
changeset: createChangeset // errors for form
}
return {
volunteers: [], // no volunteers to prepend
changeset: createChangeset // errors for form
}

}
}

handleInfo(event: VolunteerData, socket: LiveViewSocket<VolunteerContext>): VolunteerContext | Promise<VolunteerContext> {
console.log("received", event);
handleInfo(event: VolunteerMutationEvent, socket: LiveViewSocket<VolunteerContext>): VolunteerContext | Promise<VolunteerContext> {
console.log("received", event, socket.id);
const { volunteer } = event;
return {
volunteers: listVolunteers(),
volunteers: [volunteer],
changeset: changeset({}, {})
}
}
Expand Down
28 changes: 9 additions & 19 deletions src/examples/volunteers/data.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { z } from 'zod';
import { nanoid } from 'nanoid';
import { LiveViewChangeset, LiveViewComponent, LiveViewExternalEventListener, LiveViewInternalEventListener, LiveViewSocket } from '../../server/component/types';
import { z } from 'zod';
import { newChangesetFactory } from '../../server/component/changeset';
import { LiveViewChangeset } from '../../server/component/types';
import { PubSub } from '../../server/pubsub/SingleProcessPubSub';
import { VolunteerComponent } from './component';
import { RedisPubSub } from '../../server/pubsub/RedisPubSub';

const phoneRegex = /^\d{3}[\s-.]?\d{3}[\s-.]?\d{4}$/

Expand Down Expand Up @@ -38,7 +36,7 @@ export const createVolunteer = (newVolunteer: Partial<Volunteer>): LiveViewChang
if (result.valid) {
const volunteer = result.data as Volunteer;
volunteers[volunteer.id] = volunteer;
broadcast('created', volunteer);
broadcast({type: "created", volunteer});
}
return result;
}
Expand All @@ -48,25 +46,17 @@ export const updateVolunteer = (currentVolunteer: Volunteer, updated: Partial<Vo
if (result.valid) {
const volunteer = result.data as Volunteer;
volunteers[volunteer.id] = volunteer;
broadcast('updated', volunteer);
broadcast({type: "updated", volunteer});
}
return result;
}

const pubSub: RedisPubSub<VolunteerData> = new RedisPubSub<VolunteerData>({
url: 'redis://localhost:6379'
});
function broadcast(event: VolunteerEvent, volunteer: Volunteer) {
pubSub.broadcast('volunteer', {
event,
volunteer,
});
function broadcast(event: VolunteerMutationEvent) {
PubSub.broadcast('volunteer', event);
}

type VolunteerEvent = 'created' | 'updated';
export type VolunteerMutationEvent =
| {type: "created", volunteer: Volunteer }
| {type: "updated", volunteer: Volunteer }

export interface VolunteerData {
event: VolunteerEvent,
volunteer: Volunteer
}

21 changes: 0 additions & 21 deletions src/server/component/base_component.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,13 @@
import { SessionData } from "express-session";
import { LiveViewComponent, LiveViewMountParams, LiveViewSocket, LiveViewTemplate, StringPropertyValues } from "./types";
import { LiveViewComponentManager } from "../socket/component_manager";

export abstract class BaseLiveViewComponent<T, P> implements LiveViewComponent<T, P> {

private componentManager: LiveViewComponentManager;

abstract mount(params: LiveViewMountParams, session: Partial<SessionData>, socket: LiveViewSocket<T>): T | Promise<T>;
abstract render(context: T): LiveViewTemplate;

handleParams(params: StringPropertyValues<P>, url: string, socket: LiveViewSocket<T>): T | Promise<T> {
return socket.context;
}

pushPatch(socket: LiveViewSocket<unknown>, patch: { to: { path: string, params: StringPropertyValues<any> } }) {
if (this.componentManager) {
this.componentManager.onPushPatch(socket, patch);
} else {
console.error("component manager not registered for component", this);
}
}

csrfToken(): string | undefined {
if (this.componentManager) {
return this.componentManager.csrfToken;
}
}

registerComponentManager(manager: LiveViewComponentManager) {
this.componentManager = manager;
}

}
5 changes: 5 additions & 0 deletions src/server/component/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ export interface LiveViewChangeset<T> {
valid: boolean // true if no validation errors
}

export interface PushPatchPathAndParams {
to: { path: string, params: StringPropertyValues<any> }
}

export interface LiveViewSocket<T> {
id: string;
connected: boolean; // true for websocket, false for http request
Expand All @@ -27,6 +31,7 @@ export interface LiveViewSocket<T> {
repeat: (fn: () => void, intervalMillis: number) => void;
pageTitle: (newPageTitle: string) => void;
subscribe: (topic: string) => void;
pushPatch: (params: PushPatchPathAndParams) => void;
}

export interface LiveViewTemplate extends HtmlSafeString {
Expand Down
13 changes: 7 additions & 6 deletions src/server/live_view_server.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { LiveViewComponent, LiveViewRouter, LiveViewSocket } from "./component/types";
import WebSocket from 'ws';
import { Server } from 'http';
import express from "express";
import { nanoid } from "nanoid";
import jwt from "jsonwebtoken";
import session, { MemoryStore, SessionData } from "express-session";
import { Server } from 'http';
import jwt from "jsonwebtoken";
import { nanoid } from "nanoid";
import path from "path";
import { MessageRouter } from "./socket/message_router";
import WebSocket from 'ws';
import { live_title_tag } from ".";
import { LiveViewComponent, LiveViewRouter, LiveViewSocket } from "./component/types";
import { MessageRouter } from "./socket/message_router";


// extend / define session interface
Expand Down Expand Up @@ -165,6 +165,7 @@ export class LiveViewServer {
repeat: emptyVoid,
pageTitle: emptyVoid,
subscribe: emptyVoid,
pushPatch: emptyVoid,
}

// look up component for route
Expand Down
37 changes: 28 additions & 9 deletions src/server/pubsub/RedisPubSub.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { RedisClientType, RedisClientOptions } from '@node-redis/client';
import { RedisClientOptions, RedisClientType } from '@node-redis/client';
import { nanoid } from 'nanoid';
import { createClient } from 'redis';
import { Publisher, Subscriber } from '.';
import { Publisher, Subscriber, SubscriberFunction } from '.';



Expand All @@ -9,32 +10,50 @@ import { Publisher, Subscriber } from '.';
*
* See: https://github.com/redis/node-redis#pubsub
*/
export class RedisPubSub<T> implements Subscriber<T>, Publisher<T> {
class RedisPubSub<T> implements Subscriber<T>, Publisher<T> {

private redis: RedisClientType;
private subscribers: Record<string, RedisClientType> = {};

constructor(options: RedisClientOptions) {
this.redis = createClient(options);
this.redis.connect();
}

public async subscribe(topic: string, listener: (data: T) => void) {
public async subscribe(topic: string, subscriber: SubscriberFunction<T>): Promise<string> {
// create new connection for each subscription
const redisSub = this.redis.duplicate();
await redisSub.connect();

// parse data to JSON before passing to subscriber
redisSub.subscribe(topic, (data: string) => {
listener(JSON.parse(data) as T);
subscriber(JSON.parse(data) as T);
});

// store connection id for unsubscribe and return for caller
const subscriberId = nanoid();
this.subscribers[subscriberId] = redisSub;
return subscriberId;
}

public async broadcast(topic: string, data: T) {
public async broadcast(topic: string, data: T): Promise<void> {
console.log(`Broadcasting to ${topic}`);
if (!this.redis.isOpen) {
await this.redis.connect();
}
await this.redis.publish(topic, JSON.stringify(data));
}

public async unsubscribe(topic: string, listener: (data: T) => void) {
this.redis.unsubscribe(topic);
public async unsubscribe(topic: string, subscriberId: string): Promise<void> {
// look up redis connection from subscriber id
const redisSub = this.subscribers[subscriberId];
await redisSub.unsubscribe(topic);
// remove subscriber from subscribers
delete this.subscribers[subscriberId];
}

}
}

export const PubSub = new RedisPubSub(
{ url: process.env.REDIS_URL || "redis://localhost:6379" }
);
Loading

0 comments on commit 12220ed

Please sign in to comment.