@@ -3,72 +3,135 @@ import z from 'zod';
3
3
import { type RawData , WebSocket } from 'ws' ;
4
4
import { WebSocketMessageSchema } from '@srcbook/shared' ;
5
5
6
- const VALID_TOPIC_RE = / ^ [ a - z A - Z 0 - 9 _ : ] + $ / ;
6
+ type TopicPart = { dynamic : false ; segment : string } | { dynamic : true ; parameter : string } ;
7
+
8
+ export type MessageContextType < Key extends string = string > = {
9
+ topic : string ;
10
+ event : string ;
11
+ params : Record < Key , string > ;
12
+ } ;
13
+
14
+ type TopicMatch = Pick < MessageContextType , 'topic' | 'params' > ;
15
+
16
+ export interface ConnectionContextType {
17
+ reply : ( topic : string , event : string , payload : Record < string , any > ) => void ;
18
+ }
7
19
8
20
/**
9
- * Channel is responsible for dispatching incoming and outgoing messages for a given topic.
21
+ * Channel is responsible for dispatching incoming messages for a given topic.
22
+ *
23
+ * Topics are strings that represent a channel for messages. Topics
24
+ * can be broken into multiple parts separated by a colon. The following
25
+ * are all examples of valid topics:
26
+ *
27
+ * - session
28
+ * - session:123
29
+ * - room:123:users:456:messages
30
+ *
31
+ * When we define a topic, we can use the `<variable>` syntax to indicate a
32
+ * wildcard match. For example, the topic `room:<roomId>:messages` would match
33
+ * `room:123:messages`, `room:456:messages`, etc.
34
+ *
35
+ * The wildcard syntax must be between two colons (or at the start/end of the string).
36
+ * The text inside must be a valid JavaScript identifier.
10
37
*
11
38
* Examples:
12
39
*
13
- * const channel = new Channel("session") // matches "session" only
14
- * const channel = new Channel("session:* ") // matches "session:123", "session:456", etc.
40
+ * const channel = new Channel("session") // matches "session" only
41
+ * const channel = new Channel("session:<sessionId> ") // matches "session:123", "session:456", etc.
15
42
*
16
43
*/
17
44
export class Channel {
45
+ // The topic pattern, e.g. "sessions:<sessionId>"
18
46
readonly topic : string ;
19
47
20
- readonly events : {
21
- incoming : Record <
22
- string ,
23
- { schema : z . ZodTypeAny ; handler : ( payload : Record < string , any > ) => void }
24
- > ;
25
- outgoing : Record < string , z . ZodTypeAny > ;
26
- } = { incoming : { } , outgoing : { } } ;
27
-
28
- private wildcardMatch = false ;
48
+ // The parts of the topic string, e.g. "sessions" and "<sessionId>" for "sessions:<sessionId>"
49
+ private readonly parts : TopicPart [ ] ;
50
+
51
+ readonly events : Record <
52
+ string ,
53
+ {
54
+ schema : z . ZodTypeAny ;
55
+ handler : (
56
+ payload : Record < string , any > ,
57
+ context : MessageContextType ,
58
+ conn : ConnectionContextType ,
59
+ ) => void ;
60
+ }
61
+ > = { } ;
29
62
30
63
constructor ( topic : string ) {
31
- if ( topic . endsWith ( ':*' ) ) {
32
- // Remove asterisk from topic
33
- topic = topic . slice ( 0 , - 1 ) ;
34
- this . wildcardMatch = true ;
35
- }
64
+ this . topic = topic ;
65
+ this . parts = this . splitIntoParts ( topic ) ;
66
+ }
67
+
68
+ private splitIntoParts ( topic : string ) {
69
+ const parts : TopicPart [ ] = [ ] ;
70
+
71
+ for ( const part of topic . split ( ':' ) ) {
72
+ const parameter = part . match ( / ^ < ( [ a - z A - Z _ ] + [ a - z A - Z 0 - 9 _ ] * ) > $ / ) ;
36
73
37
- if ( ! VALID_TOPIC_RE . test ( topic ) ) {
38
- throw new Error ( `Invalid channel topic '${ topic } '` ) ;
74
+ if ( parameter !== null ) {
75
+ parts . push ( { dynamic : true , parameter : parameter [ 1 ] as string } ) ;
76
+ continue ;
77
+ }
78
+
79
+ if ( / ^ [ a - z A - Z 0 - 9 _ ] + $ / . test ( part ) ) {
80
+ parts . push ( { dynamic : false , segment : part } ) ;
81
+ continue ;
82
+ }
83
+
84
+ throw new Error ( `Invalid channel topic: ${ topic } ` ) ;
39
85
}
40
86
41
- this . topic = topic ;
87
+ return parts ;
42
88
}
43
89
44
- matches ( topic : string ) {
45
- if ( topic === this . topic ) {
46
- return true ;
90
+ match ( topic : string ) : TopicMatch | null {
91
+ const parts = topic . split ( ':' ) ;
92
+
93
+ if ( parts . length !== this . parts . length ) {
94
+ return null ;
47
95
}
48
96
49
- if ( this . wildcardMatch ) {
50
- return topic . startsWith ( this . topic ) && topic . length > this . topic . length ;
97
+ const match : TopicMatch = {
98
+ topic : topic ,
99
+ params : { } ,
100
+ } ;
101
+
102
+ for ( let i = 0 , len = this . parts . length ; i < len ; i ++ ) {
103
+ const thisPart = this . parts [ i ] as TopicPart ;
104
+
105
+ if ( thisPart . dynamic ) {
106
+ match . params [ thisPart . parameter ] = parts [ i ] as string ;
107
+ continue ;
108
+ } else if ( thisPart . segment === parts [ i ] ) {
109
+ continue ;
110
+ }
111
+
112
+ return null ;
51
113
}
52
114
53
- return false ;
115
+ return match ;
54
116
}
55
117
56
- incoming < T extends z . ZodTypeAny > (
118
+ on < T extends z . ZodTypeAny > (
57
119
event : string ,
58
120
schema : T ,
59
- handler : ( payload : z . infer < T > ) => void ,
121
+ handler : (
122
+ payload : z . infer < T > ,
123
+ context : MessageContextType ,
124
+ conn : ConnectionContextType ,
125
+ ) => void ,
60
126
) {
61
- this . events . incoming [ event ] = { schema, handler } ;
62
- return this ;
63
- }
64
-
65
- outgoing < T extends z . ZodTypeAny > ( event : string , schema : T ) {
66
- this . events . outgoing [ event ] = schema ;
127
+ this . events [ event ] = { schema, handler } ;
67
128
return this ;
68
129
}
69
130
}
70
131
71
132
type ConnectionType = {
133
+ // Reply only to this connection, not to all connections.
134
+ reply : ( topic : string , event : string , payload : Record < string , any > ) => void ;
72
135
socket : WebSocket ;
73
136
subscriptions : string [ ] ;
74
137
} ;
@@ -90,7 +153,13 @@ export default class WebSocketServer {
90
153
return ;
91
154
}
92
155
93
- const connection = { socket, subscriptions : [ ] } ;
156
+ const connection = {
157
+ socket,
158
+ subscriptions : [ ] ,
159
+ reply : ( topic : string , event : string , payload : Record < string , any > ) => {
160
+ this . send ( connection , topic , event , payload ) ;
161
+ } ,
162
+ } ;
94
163
95
164
this . connections . push ( connection ) ;
96
165
@@ -115,23 +184,9 @@ export default class WebSocketServer {
115
184
}
116
185
117
186
broadcast ( topic : string , event : string , payload : Record < string , any > ) {
118
- const channel = this . findChannel ( topic ) ;
119
-
120
- if ( channel === undefined ) {
121
- throw new Error ( `Cannot broadcast to unknown topic '${ topic } '` ) ;
122
- }
123
-
124
- const schema = channel . events . outgoing [ event ] ;
125
-
126
- if ( schema === undefined ) {
127
- throw new Error ( `Cannot broadcast to unknown event '${ event } '` ) ;
128
- }
129
-
130
- const validatedPayload = schema . parse ( payload ) ;
131
-
132
187
for ( const conn of this . connections ) {
133
188
if ( conn . subscriptions . includes ( topic ) ) {
134
- conn . socket . send ( JSON . stringify ( [ topic , event , validatedPayload ] ) ) ;
189
+ this . send ( conn , topic , event , payload ) ;
135
190
}
136
191
}
137
192
}
@@ -140,9 +195,9 @@ export default class WebSocketServer {
140
195
const parsed = JSON . parse ( message . toString ( 'utf8' ) ) ;
141
196
const [ topic , event , payload ] = WebSocketMessageSchema . parse ( parsed ) ;
142
197
143
- const channel = this . findChannel ( topic ) ;
198
+ const channelMatch = this . findChannelMatch ( topic ) ;
144
199
145
- if ( channel === undefined ) {
200
+ if ( channelMatch === null ) {
146
201
console . warn ( `Server received unknown topic '${ topic } '` ) ;
147
202
return ;
148
203
}
@@ -157,7 +212,9 @@ export default class WebSocketServer {
157
212
return ;
158
213
}
159
214
160
- const registeredEvent = channel . events . incoming [ event ] ;
215
+ const { channel, match } = channelMatch ;
216
+
217
+ const registeredEvent = channel . events [ event ] ;
161
218
162
219
if ( registeredEvent === undefined ) {
163
220
console . warn ( `Server received unknown event '${ event } ' for topic '${ topic } '` ) ;
@@ -175,20 +232,28 @@ export default class WebSocketServer {
175
232
return ;
176
233
}
177
234
178
- handler ( result . data ) ;
235
+ handler ( result . data , { topic : match . topic , event : event , params : match . params } , conn ) ;
179
236
}
180
237
181
- private findChannel ( topic : string ) {
238
+ private findChannelMatch ( topic : string ) : { channel : Channel ; match : TopicMatch } | null {
182
239
for ( const channel of this . channels ) {
183
- if ( channel . matches ( topic ) ) {
184
- return channel ;
240
+ const match = channel . match ( topic ) ;
241
+
242
+ if ( match !== null ) {
243
+ return { channel, match } ;
185
244
}
186
245
}
246
+
247
+ return null ;
187
248
}
188
249
189
250
private removeConnection ( socket : WebSocket ) {
190
251
this . connections = this . connections . filter ( ( conn ) => {
191
252
return conn . socket !== socket ;
192
253
} ) ;
193
254
}
255
+
256
+ private send ( conn : ConnectionType , topic : string , event : string , payload : Record < string , any > ) {
257
+ conn . socket . send ( JSON . stringify ( [ topic , event , payload ] ) ) ;
258
+ }
194
259
}
0 commit comments