Skip to content

Commit f1928e0

Browse files
committed
Fixes for concurrency in WS streams, custom promise-based Mutex
1 parent b761511 commit f1928e0

File tree

8 files changed

+390
-56
lines changed

8 files changed

+390
-56
lines changed

js/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"clean": "rimraf coverage build tmp",
3434
"prebuild": "npm run lint",
3535
"build": "tsc -p tsconfig.json",
36-
"build:bundle": "esbuild --bundle index.ts --format=esm --outfile=build/actionengine.js",
36+
"build:bundle": "esbuild --bundle index.ts --format=esm --outfile=build/actionengine.js && esbuild --bundle src/wsworker.ts --format=esm --outfile=build/wsworker.js",
3737
"build:watch": "tsc -w -p tsconfig.json",
3838
"build:release": "npm run clean && tsc -p tsconfig.release.json",
3939
"build:typings": "tsc --declaration --emitDeclarationOnly --outDir build",

js/src/asyncNode.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { endOfStream } from './data.js';
33
import { ChunkStoreReader, NumberedChunk } from './chunkStoreReader.js';
44
import { ChunkStoreWriter } from './chunkStoreWriter.js';
55
import { EvergreenStream } from './stream.js';
6-
import { Mutex } from 'async-mutex';
6+
import { Mutex } from './utils.js';
77

88
export class NodeMap {
99
private nodes: Map<string, AsyncNode>;

js/src/chunkStore.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import { CondVar } from './utils.js';
2-
import { Mutex } from 'async-mutex';
1+
import { CondVar, Mutex } from './utils.js';
32

43
export interface ChunkStore {
54
get(seqId: number, timeout: number): Promise<Chunk>;

js/src/msgpack.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
import { Encoder, Decoder } from '@msgpack/msgpack';
1+
import { encode, decode, decodeAsync, decodeMulti } from '@msgpack/msgpack';
22

3-
const encoder = new Encoder();
4-
const decoder = new Decoder();
5-
6-
const encode = (value: unknown) => {
7-
return encoder.encode(value);
8-
};
9-
10-
const decode = (bytes: Uint8Array) => {
11-
return decoder.decode(bytes);
12-
};
13-
14-
const decodeMulti = (bytes: Uint8Array) => {
15-
return decoder.decodeMulti(bytes);
3+
const rawDecode = async (blob: Blob | Uint8Array) => {
4+
if (blob instanceof Blob) {
5+
if (blob.stream) {
6+
return await decodeAsync(blob.stream());
7+
} else {
8+
return await decode(await blob.arrayBuffer());
9+
}
10+
} else if (blob instanceof Uint8Array) {
11+
return decode(blob);
12+
}
13+
throw new Error(`Unsupported blob type for decoding: ${blob}`);
1614
};
1715

1816
export const encodeChunkMetadata = (metadata: ChunkMetadata) => {
@@ -206,8 +204,10 @@ export const encodeSessionMessage = (message: SessionMessage) => {
206204
return encode(packedMessage);
207205
};
208206

209-
export const decodeSessionMessage = (bytes: Uint8Array): SessionMessage => {
210-
const unpackedMessage = decode(bytes) as Uint8Array;
207+
export const decodeSessionMessage = async (
208+
bytes: Blob | Uint8Array,
209+
): Promise<SessionMessage> => {
210+
const unpackedMessage = (await rawDecode(bytes)) as Uint8Array;
211211
// @ts-expect-error decodeMulti is not strictly typed
212212
const [packedNodeFragments, packedActions]: [Uint8Array[], Uint8Array[]] =
213213
decodeMulti(unpackedMessage);

js/src/session.ts

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,44 +2,48 @@ import { EvergreenStream } from './stream.js';
22
import { NodeMap } from './asyncNode.js';
33
import { isNullChunk } from './data.js';
44
import { ActionRegistry, fromActionMessage } from './action.js';
5+
import { Mutex } from './utils.js';
56

67
export class Session {
78
stream: EvergreenStream;
89
nodeMap: NodeMap;
910
actionRegistry: ActionRegistry;
1011

1112
loop: Promise<void> | null = null;
13+
private _mutex: Mutex;
1214

1315
constructor(
1416
stream: EvergreenStream,
1517
nodeMap: NodeMap | null = null,
1618
actionRegistry: ActionRegistry | null = null,
1719
) {
18-
this.stream = stream;
20+
this._mutex = new Mutex();
21+
1922
this.nodeMap = nodeMap || new NodeMap();
2023
this.actionRegistry = actionRegistry;
21-
this.loop = this.run();
24+
25+
this.bindStream(stream).then();
2226
}
2327

2428
async dispatchMessage(message: SessionMessage) {
2529
for (const fragment of message.nodeFragments) {
2630
const node = await this.nodeMap.getNode(fragment.id);
2731
if (fragment.chunk !== null) {
28-
const isFinal =
29-
(!fragment.continued && fragment.seq != -1) ||
30-
isNullChunk(fragment.chunk);
31-
await node.put(fragment.chunk, fragment.seq, isFinal);
32+
const isFinal = !fragment.continued || isNullChunk(fragment.chunk);
33+
node.put(fragment.chunk, fragment.seq, isFinal).then();
3234
}
3335
}
3436
for (const actionMessage of message.actions) {
35-
const action = fromActionMessage(
36-
actionMessage,
37-
this.actionRegistry,
38-
this.nodeMap,
39-
this.stream,
40-
this,
41-
);
42-
action.run().then();
37+
await this._mutex.runExclusive(() => {
38+
const action = fromActionMessage(
39+
actionMessage,
40+
this.actionRegistry,
41+
this.nodeMap,
42+
this.stream,
43+
this,
44+
);
45+
action.run().then();
46+
});
4347
}
4448
}
4549

@@ -51,12 +55,36 @@ export class Session {
5155
return this.nodeMap;
5256
}
5357

58+
async bindActionRegistry(registry: ActionRegistry) {
59+
await this._mutex.runExclusive(() => {
60+
this.actionRegistry = registry;
61+
});
62+
}
63+
64+
async bindStream(stream: EvergreenStream) {
65+
// bindStream may be called in ctor, where this.stream can be undefined
66+
if (this.stream !== undefined) {
67+
await this.stream.close();
68+
}
69+
70+
if (this.loop !== null) {
71+
await this.loop;
72+
}
73+
74+
await this._mutex.runExclusive(() => {
75+
this.stream = stream;
76+
this.loop = this.run();
77+
});
78+
}
79+
5480
async run() {
5581
while (true) {
5682
let message: SessionMessage | null = null;
5783

5884
try {
59-
message = await this.stream.receive();
85+
await this._mutex.runExclusive(async () => {
86+
message = await this.stream.receive();
87+
});
6088
} catch (e) {
6189
console.error('error receiving message:', e);
6290
break;

js/src/stream.ts

Lines changed: 169 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,154 @@
1-
import { Channel, CondVar } from './utils.js';
2-
import { decodeSessionMessage, encodeSessionMessage } from './msgpack.js';
3-
import { Mutex } from 'async-mutex';
1+
import { v4 as uuidv4 } from 'uuid';
2+
3+
import { Channel, CondVar, Mutex } from './utils.js';
4+
5+
let kWorker: Worker | null = null;
6+
7+
class WSWorkerManager {
8+
private static instance: WSWorkerManager | null = null;
9+
10+
private _openHandlers: Map<string, () => void | Promise<void>>;
11+
private _messageHandlers: Map<
12+
string,
13+
(msg: SessionMessage) => void | Promise<void>
14+
>;
15+
private _errorHandlers: Map<
16+
string,
17+
(message: string) => void | Promise<void>
18+
>;
19+
private _closeHandlers: Map<
20+
string,
21+
(event?: CloseEvent) => void | Promise<void>
22+
>;
23+
24+
private constructor() {
25+
this._openHandlers = new Map();
26+
this._messageHandlers = new Map();
27+
this._errorHandlers = new Map();
28+
this._closeHandlers = new Map();
29+
30+
kWorker = new Worker(new URL('./wsworker.js', import.meta.url), {
31+
type: 'module',
32+
});
33+
34+
kWorker.onmessage = async (event: MessageEvent) => {
35+
const { socketId, type, message } = event.data;
36+
37+
switch (type) {
38+
case 'open': {
39+
const openHandler = this._openHandlers.get(socketId);
40+
if (openHandler) {
41+
await openHandler();
42+
}
43+
break;
44+
}
45+
case 'message': {
46+
const messageHandler = this._messageHandlers.get(socketId);
47+
if (messageHandler) {
48+
await messageHandler(message);
49+
}
50+
break;
51+
}
52+
case 'error': {
53+
const errorHandler = this._errorHandlers.get(socketId);
54+
if (errorHandler) {
55+
await errorHandler(message);
56+
}
57+
break;
58+
}
59+
case 'close': {
60+
const closeHandler = this._closeHandlers.get(socketId);
61+
if (closeHandler) {
62+
await closeHandler();
63+
}
64+
break;
65+
}
66+
}
67+
};
68+
}
69+
70+
public static getInstance(): WSWorkerManager {
71+
if (!WSWorkerManager.instance) {
72+
WSWorkerManager.instance = new WSWorkerManager();
73+
}
74+
return WSWorkerManager.instance;
75+
}
76+
77+
public createSocket(
78+
socketId: string,
79+
url: string,
80+
onOpen?: () => void | Promise<void>,
81+
onMessage?: (msg: SessionMessage) => void | Promise<void>,
82+
onError?: (message: string) => void | Promise<void>,
83+
onClose?: (event?: CloseEvent) => void | Promise<void>,
84+
) {
85+
if (this._messageHandlers.has(socketId)) {
86+
throw new Error(`Socket with id ${socketId} already exists`);
87+
}
88+
89+
this._openHandlers.set(socketId, onOpen || (() => {}));
90+
this._messageHandlers.set(socketId, onMessage || (() => {}));
91+
this._errorHandlers.set(socketId, onError || (() => {}));
92+
this._closeHandlers.set(socketId, () => {
93+
onClose();
94+
this._openHandlers.delete(socketId);
95+
this._messageHandlers.delete(socketId);
96+
this._errorHandlers.delete(socketId);
97+
this._closeHandlers.delete(socketId);
98+
});
99+
100+
const send = (message: SessionMessage) => {
101+
if (!this._messageHandlers.has(socketId)) {
102+
throw new Error(`Socket with id ${socketId} does not exist`);
103+
}
104+
kWorker.postMessage({
105+
command: 'send',
106+
socketId,
107+
message,
108+
});
109+
};
110+
111+
const close = () => {
112+
if (!this._messageHandlers.has(socketId)) {
113+
return;
114+
}
115+
kWorker.postMessage({
116+
command: 'close',
117+
socketId,
118+
});
119+
};
120+
121+
kWorker.postMessage({
122+
command: 'open',
123+
socketId,
124+
url,
125+
});
126+
console.log(`Creating socket with id ${socketId} for URL ${url}`);
127+
128+
return {
129+
send,
130+
close,
131+
};
132+
}
133+
}
4134

5135
export class EvergreenStream {
6-
private socket: WebSocket;
7-
private readonly url: string;
136+
private readonly uid: string;
8137

9138
private channel: Channel<SessionMessage | null>;
10139

11140
private readonly mutex: Mutex;
12141
private readonly cv: CondVar;
13142

143+
private socket: {
144+
send: (message: SessionMessage) => void;
145+
close: () => void;
146+
};
14147
private socketOpen: boolean;
15148
private closed: boolean;
16149

17150
constructor(url: string) {
18-
this.url = url;
19-
this.socket = new WebSocket(this.url);
151+
this.uid = uuidv4();
20152

21153
this.channel = new Channel<SessionMessage | null>();
22154

@@ -26,27 +158,50 @@ export class EvergreenStream {
26158
this.mutex = new Mutex();
27159
this.cv = new CondVar();
28160

29-
this.socket.onopen = async () => {
161+
const onopen = async () => {
30162
await this.mutex.runExclusive(async () => {
31163
console.log('socket opened');
32164
this.socketOpen = true;
33165
this.cv.notifyAll();
34166
});
35167
};
36168

37-
this.socket.onerror = async (event) => {
169+
const onerror = async (message: string) => {
170+
await this.mutex.runExclusive(async () => {
171+
if (this.closed) {
172+
return;
173+
}
174+
console.error('socket error:', message);
175+
await this.closeInternal();
176+
});
177+
};
178+
179+
const onmessage = async (message: SessionMessage) => {
180+
await this.channel.sendNowait(message);
181+
};
182+
183+
const onclose = async () => {
38184
await this.mutex.runExclusive(async () => {
39185
if (this.closed) {
40186
return;
41187
}
42-
console.error('socket error:', event);
188+
console.log('socket closed.');
43189
await this.closeInternal();
44190
});
45191
};
46192

47-
this.socket.onmessage = async (event) => {
48-
const array = new Uint8Array(await event.data.arrayBuffer());
49-
await this.channel.sendNowait(decodeSessionMessage(array));
193+
const wsWorkerManager = WSWorkerManager.getInstance();
194+
const { send, close } = wsWorkerManager.createSocket(
195+
this.uid,
196+
url,
197+
onopen,
198+
onmessage,
199+
onerror,
200+
onclose,
201+
);
202+
this.socket = {
203+
send: send,
204+
close: close,
50205
};
51206
}
52207

@@ -64,7 +219,7 @@ export class EvergreenStream {
64219
}
65220
});
66221

67-
this.socket.send(encodeSessionMessage(message));
222+
this.socket.send(message);
68223
}
69224

70225
private async closeInternal() {

0 commit comments

Comments
 (0)