Skip to content

Commit

Permalink
more progress on pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
floodfx committed Mar 4, 2022
1 parent 1e6fed7 commit a4b0eb8
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 6 deletions.
6 changes: 4 additions & 2 deletions src/examples/volunteers/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { form_for } from "../../server/templates/helpers/form_for";
import { error_tag, telephone_input, text_input } from "../../server/templates/helpers/inputs";
import { BaseLiveViewComponent } from "../../server/component/base_component";
import { PubSub } from "../../server/pubsub/SingleProcessPubSub";
import { RedisPubSub } from "../../server/pubsub/RedisPubSub";

export interface VolunteerContext {
volunteers: Volunteer[]
Expand All @@ -19,10 +20,10 @@ export class VolunteerComponent extends BaseLiveViewComponent<VolunteerContext,
LiveViewExternalEventListener<VolunteerContext, VolunteerEvents, Volunteer>,
LiveViewInternalEventListener<VolunteerContext, VolunteerData> {


mount(params: LiveViewMountParams, session: Partial<SessionData>, socket: LiveViewSocket<VolunteerContext>) {
if (socket.connected) {
PubSub.subscribe('volunteer', socket.sendInternal);
console.log("subscribing", socket.id)
socket.subscribe('volunteer');
}
return {
volunteers: listVolunteers(),
Expand Down Expand Up @@ -130,6 +131,7 @@ export class VolunteerComponent extends BaseLiveViewComponent<VolunteerContext,
}

handleInfo(event: VolunteerData, socket: LiveViewSocket<VolunteerContext>): VolunteerContext | Promise<VolunteerContext> {
console.log("received", event);
return {
volunteers: listVolunteers(),
changeset: changeset({}, {})
Expand Down
6 changes: 5 additions & 1 deletion src/examples/volunteers/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { LiveViewChangeset, LiveViewComponent, LiveViewExternalEventListener, Li
import { newChangesetFactory } from '../../server/component/changeset';
import { PubSub } from '../../server/pubsub/SingleProcessPubSub';
import { VolunteerComponent } from './component';
import { RedisPubSub } from '../../server/pubsub/RedisPubSub';

const phoneRegex = /^\d{3}[\s-.]?\d{3}[\s-.]?\d{4}$/

Expand Down Expand Up @@ -52,8 +53,11 @@ export const updateVolunteer = (currentVolunteer: Volunteer, updated: Partial<Vo
return result;
}

const pubSub: RedisPubSub<VolunteerData> = new RedisPubSub<VolunteerData>({
url: 'redis://localhost:6379'
});
function broadcast(event: VolunteerEvent, volunteer: Volunteer) {
PubSub.broadcast('volunteer', {
pubSub.broadcast('volunteer', {
event,
volunteer,
});
Expand Down
1 change: 1 addition & 0 deletions src/server/component/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface LiveViewSocket<T> {
sendInternal: (event: unknown) => void;
repeat: (fn: () => void, intervalMillis: number) => void;
pageTitle: (newPageTitle: string) => void;
subscribe: (topic: string) => void;
}

export interface LiveViewTemplate extends HtmlSafeString {
Expand Down
1 change: 1 addition & 0 deletions src/server/live_view_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export class LiveViewServer {
sendInternal: emptyVoid,
repeat: emptyVoid,
pageTitle: emptyVoid,
subscribe: emptyVoid,
}

// look up component for route
Expand Down
12 changes: 10 additions & 2 deletions src/server/pubsub/RedisPubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { RedisClientType, RedisClientOptions } from '@node-redis/client';
import { createClient } from 'redis';
import { Publisher, Subscriber } from '.';



/**
* A PubSub implementation that uses Redis as a backend.
*
Expand All @@ -16,13 +18,19 @@ export class RedisPubSub<T> implements Subscriber<T>, Publisher<T> {
}

public async subscribe(topic: string, listener: (data: T) => void) {
this.redis.on(topic, (data: string) => {
const redisSub = this.redis.duplicate();
await redisSub.connect();
redisSub.subscribe(topic, (data: string) => {
listener(JSON.parse(data) as T);
});
}

public async broadcast(topic: string, data: T) {
this.redis.publish(topic, JSON.stringify(data));
console.log(`Broadcasting to ${topic}`);
if (!this.redis.isOpen) {
await this.redis.connect();
}
await this.redis.publish(topic, JSON.stringify(data));
}

public async unsubscribe(topic: string, listener: (data: T) => void) {
Expand Down
7 changes: 6 additions & 1 deletion src/server/pubsub/SingleProcessPubSub.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { EventEmitter } from 'events';
import { Publisher, Subscriber } from ".";


/**
* A PubSub implementation that uses the Node.js EventEmitter as a backend.
*
* Should only be used in single process environments like local development
* or a single instance. In a multi-process environment, use RedisPubSub.
*/
class SingleProcessPubSub<T> implements Subscriber<T>, Publisher<T> {

private eventEmitter: EventEmitter;
Expand Down
8 changes: 8 additions & 0 deletions src/server/socket/component_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { newHeartbeatReply, newPhxReply } from "./util";
import { BaseLiveViewComponent } from "../component/base_component";
import { deepDiff } from "../templates/diff";
import { Parts } from "..";
import { RedisPubSub } from "../pubsub/RedisPubSub";

export class LiveViewComponentManager {

Expand All @@ -20,6 +21,9 @@ export class LiveViewComponentManager {
csrfToken?: string;
private _pageTitle: string | undefined;
pageTitleChanged: boolean = false;
private pubSub: RedisPubSub<unknown> = new RedisPubSub<unknown>(
{ url: process.env.REDIS_URL || "redis://localhost:6379" }
);

constructor(component: LiveViewComponent<unknown, unknown>, signingSecret: string) {
this.component = component;
Expand Down Expand Up @@ -247,6 +251,10 @@ export class LiveViewComponentManager {
sendInternal: (event) => this.sendInternal(ws, event, topic),
repeat: (fn, intervalMillis) => this.repeat(fn, intervalMillis),
pageTitle: (newTitle: string) => { this.pageTitle = newTitle },
subscribe: (topic: string) => {
console.log('subscribted to', topic);
this.pubSub.subscribe(topic, (event) => this.sendInternal(ws, event, topic))
},
}
}

Expand Down

0 comments on commit a4b0eb8

Please sign in to comment.