Skip to content

Commit

Permalink
redis p/s start
Browse files Browse the repository at this point in the history
  • Loading branch information
MeowningMaster committed May 24, 2024
1 parent c2c848b commit 0607d99
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 10 deletions.
Binary file modified bun.lockb
Binary file not shown.
5 changes: 3 additions & 2 deletions libs/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
},
"dependencies": {
"@typeschema/main": "^0.13.9",
"@wsx/shared": "workspace:*"
"@wsx/shared": "workspace:*",
"ioredis": "^5.4.1"
},
"dependenciesMeta": {
"dependenciesMeta": {
"@typeschema/main": {
"injected": true
}
Expand Down
29 changes: 26 additions & 3 deletions libs/server/src/broadcast/broadcast.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import type { MaybePromise } from "@wsx/shared"
import { type WsxSocket, socketSymbols } from "../socket"
import * as symbols from "./symbols"
import { Topic } from "./topic"
export { symbols as broadcastSymbols }

/**
* General broadcast implementation
*/
export class Broadcast {
topics: Map<string, Topic> = new Map()

topic(key: string): Topic {
topic(key: string): MaybePromise<Topic> {
const topic = this.topics.get(key)
if (topic) return topic
return this.create(key)
}

create(key: string): Topic {
create(key: string): MaybePromise<Topic> {
const topic = new Topic(this, key)
this.topics.set(key, topic)
return topic
Expand All @@ -21,9 +25,28 @@ export class Broadcast {
remove(topic: string) {
this.topics.delete(topic)
}

[symbols.publish](
topic: Topic,
data: unknown,
except?: WsxSocket,
): MaybePromise<void> {
for (const socket of topic.sockets) {
if (socket === except) continue
socket[socketSymbols.send](data)
}
}
}

/**
* Local broadcast. Shares actions only across the same server instance
*/
export class Localcast extends Broadcast {}
export class Localcast extends Broadcast {
topic(key: string): Topic {
return super.topic(key) as Topic
}

create(key: string): Topic {
return super.create(key) as Topic
}
}
60 changes: 59 additions & 1 deletion libs/server/src/broadcast/rediscast.ts
Original file line number Diff line number Diff line change
@@ -1 +1,59 @@
export class Rediscast {}
import { Redis, type RedisOptions } from "ioredis"
import type { WsxSocket } from "../socket"
import { Broadcast, broadcastSymbols } from "./broadcast"
import type { Topic } from "./topic"

/**
* Redis broadcast. Shares actions via Redis Pub/Sub
*/
export class Rediscast extends Broadcast {
redis: Redis
prefix: string

constructor({
redis,
prefix = "wsx__",
}: { redis: RedisOptions; prefix?: string }) {
super()
this.redis = new Redis(redis)
this.prefix = prefix

this.redis.on("message", (channel, message) => {
const key = this.unredisKey(channel)
const topic = this.topics.get(key)
if (topic) {
this[broadcastSymbols.publish](topic, JSON.parse(message))
}
})
}

private redisKey(key: string) {
return `${this.prefix}${key}`
}

private unredisKey(redisKey: string) {
return redisKey.slice(this.prefix.length)
}

async create(key: string) {
const redisKey = this.redisKey(key)
const topic = super.create(redisKey)
await this.redis.ssubscribe(redisKey)
return topic
}

remove(topic: string) {
super.remove(topic)
this.redis.sunsubscribe(this.redisKey(topic))
}

async [broadcastSymbols.publish](
topic: Topic,
data: unknown,
except?: WsxSocket,
) {
super[broadcastSymbols.publish](topic, data, except)
const redisKey = this.redisKey(topic.key)
await this.redis.spublish(redisKey, JSON.stringify(data))
}
}
1 change: 1 addition & 0 deletions libs/server/src/broadcast/symbols.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const publish = Symbol("publish")
6 changes: 2 additions & 4 deletions libs/server/src/broadcast/topic/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type WsxSocket, socketSymbols } from "../../socket"
import type { Broadcast } from "../broadcast"
import * as symbols from "./symbols"
export { symbols as topicSymbols }
import * as broadcastSymbols from "../symbols"

/**
* Pub/Sub topic
Expand Down Expand Up @@ -33,9 +34,6 @@ export class Topic {
}

[symbols.publish](data: unknown, except?: WsxSocket) {
for (const socket of this.sockets) {
if (socket === except) continue
socket[socketSymbols.send](data)
}
this.broadcast[broadcastSymbols.publish](this, data, except)
}
}

0 comments on commit 0607d99

Please sign in to comment.