Skip to content

Commit 5e824b7

Browse files
authored
Use mutex for disconnect calls (#525)
* use mutex for disconnection * changeset * remove buffer drain, as close call will do that already * simplify closePromise
1 parent 17f62a5 commit 5e824b7

File tree

5 files changed

+138
-69
lines changed

5 files changed

+138
-69
lines changed

.changeset/gentle-lizards-smile.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'livekit-client': patch
3+
---
4+
5+
use mutex to prevent simultaneous calls to disconnect

src/api/SignalClient.ts

+28-28
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import {
3030
UpdateTrackSettings,
3131
} from '../proto/livekit_rtc';
3232
import { ConnectionError, ConnectionErrorReason } from '../room/errors';
33-
import { getClientInfo, sleep } from '../room/utils';
33+
import { getClientInfo, Mutex, sleep } from '../room/utils';
3434

3535
// internal options
3636
interface ConnectOpts {
@@ -139,12 +139,15 @@ export class SignalClient {
139139

140140
private pingInterval: ReturnType<typeof setInterval> | undefined;
141141

142+
private closingLock: Mutex;
143+
142144
constructor(useJSON: boolean = false) {
143145
this.isConnected = false;
144146
this.isReconnecting = false;
145147
this.useJSON = useJSON;
146148
this.requestQueue = new Queue();
147149
this.queuedRequests = [];
150+
this.closingLock = new Mutex();
148151
}
149152

150153
async join(
@@ -306,35 +309,32 @@ export class SignalClient {
306309
}
307310

308311
async close() {
309-
this.isConnected = false;
310-
if (this.ws) {
311-
this.ws.onclose = null;
312-
this.ws.onmessage = null;
313-
this.ws.onopen = null;
314-
315-
const emptyBufferPromise = new Promise(async (resolve) => {
316-
while (this.ws && this.ws.bufferedAmount > 0) {
317-
await sleep(50);
318-
}
319-
resolve(true);
320-
});
321-
// 250ms grace period for buffer to be cleared
322-
await Promise.race([emptyBufferPromise, sleep(250)]);
323-
324-
let closeResolver: (args: any) => void;
325-
const closePromise = new Promise((resolve) => {
326-
closeResolver = resolve;
327-
});
328-
329-
// calling `ws.close()` only starts the closing handshake (CLOSING state), prefer to wait until state is actually CLOSED
330-
this.ws.onclose = () => closeResolver(true);
312+
const unlock = await this.closingLock.lock();
313+
try {
314+
this.isConnected = false;
315+
if (this.ws) {
316+
this.ws.onclose = null;
317+
this.ws.onmessage = null;
318+
this.ws.onopen = null;
319+
320+
// calling `ws.close()` only starts the closing handshake (CLOSING state), prefer to wait until state is actually CLOSED
321+
const closePromise = new Promise((resolve) => {
322+
if (this.ws) {
323+
this.ws.onclose = resolve;
324+
} else {
325+
resolve(true);
326+
}
327+
});
331328

332-
this.ws.close();
333-
// 250ms grace period for ws to close gracefully
334-
await Promise.race([closePromise, sleep(250)]);
329+
this.ws.close();
330+
// 250ms grace period for ws to close gracefully
331+
await Promise.race([closePromise, sleep(250)]);
332+
}
333+
this.ws = undefined;
334+
this.clearPingInterval();
335+
} finally {
336+
unlock();
335337
}
336-
this.ws = undefined;
337-
this.clearPingInterval();
338338
}
339339

340340
// initial offer after joining

src/room/RTCEngine.ts

+30-21
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import type { TrackPublishOptions, VideoCodec } from './track/options';
3838
import { Track } from './track/Track';
3939
import {
4040
isWeb,
41+
Mutex,
4142
sleep,
4243
supportsAddTrack,
4344
supportsSetCodecPreferences,
@@ -130,12 +131,15 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
130131
/** specifies how often an initial join connection is allowed to retry */
131132
private maxJoinAttempts: number = 1;
132133

134+
private closingLock: Mutex;
135+
133136
constructor(private options: InternalRoomOptions) {
134137
super();
135138
this.client = new SignalClient();
136139
this.client.signalLatency = this.options.expSignalLatency;
137140
this.reconnectPolicy = this.options.reconnectPolicy;
138141
this.registerOnLineListener();
142+
this.closingLock = new Mutex();
139143
}
140144

141145
async join(
@@ -180,29 +184,34 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
180184
}
181185

182186
async close() {
183-
this._isClosed = true;
184-
this.removeAllListeners();
185-
this.deregisterOnLineListener();
186-
this.clearPendingReconnect();
187-
if (this.publisher && this.publisher.pc.signalingState !== 'closed') {
188-
this.publisher.pc.getSenders().forEach((sender) => {
189-
try {
190-
// TODO: react-native-webrtc doesn't have removeTrack yet.
191-
if (this.publisher?.pc.removeTrack) {
192-
this.publisher?.pc.removeTrack(sender);
187+
const unlock = await this.closingLock.lock();
188+
try {
189+
this._isClosed = true;
190+
this.removeAllListeners();
191+
this.deregisterOnLineListener();
192+
this.clearPendingReconnect();
193+
if (this.publisher && this.publisher.pc.signalingState !== 'closed') {
194+
this.publisher.pc.getSenders().forEach((sender) => {
195+
try {
196+
// TODO: react-native-webrtc doesn't have removeTrack yet.
197+
if (this.publisher?.pc.removeTrack) {
198+
this.publisher?.pc.removeTrack(sender);
199+
}
200+
} catch (e) {
201+
log.warn('could not removeTrack', { error: e });
193202
}
194-
} catch (e) {
195-
log.warn('could not removeTrack', { error: e });
196-
}
197-
});
198-
this.publisher.close();
199-
this.publisher = undefined;
200-
}
201-
if (this.subscriber) {
202-
this.subscriber.close();
203-
this.subscriber = undefined;
203+
});
204+
this.publisher.close();
205+
this.publisher = undefined;
206+
}
207+
if (this.subscriber) {
208+
this.subscriber.close();
209+
this.subscriber = undefined;
210+
}
211+
await this.client.close();
212+
} finally {
213+
unlock();
204214
}
205-
await this.client.close();
206215
}
207216

208217
addTrack(req: AddTrackRequest): Promise<TrackInfo> {

src/room/Room.ts

+40-20
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import { Track } from './track/Track';
5050
import type { TrackPublication } from './track/TrackPublication';
5151
import type { AdaptiveStreamSettings } from './track/types';
5252
import { getNewAudioContext } from './track/utils';
53-
import { Future, isWeb, supportsSetSinkId, unpackStreamId } from './utils';
53+
import { Future, isWeb, Mutex, supportsSetSinkId, unpackStreamId } from './utils';
5454

5555
export enum ConnectionState {
5656
Disconnected = 'disconnected',
@@ -118,6 +118,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
118118
/** future holding client initiated connection attempt */
119119
private connectFuture?: Future<void>;
120120

121+
private disconnectLock: Mutex;
122+
121123
/**
122124
* Creates a new Room, the primary construct for a LiveKit session.
123125
* @param options
@@ -144,6 +146,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
144146

145147
this.maybeCreateEngine();
146148

149+
this.disconnectLock = new Mutex();
150+
147151
this.localParticipant = new LocalParticipant('', '', this.engine, this.options);
148152
}
149153

@@ -394,26 +398,42 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
394398
* disconnects the room, emits [[RoomEvent.Disconnected]]
395399
*/
396400
disconnect = async (stopTracks = true) => {
397-
log.info('disconnect from room', { identity: this.localParticipant.identity });
398-
if (this.state === ConnectionState.Connecting || this.state === ConnectionState.Reconnecting) {
399-
// try aborting pending connection attempt
400-
log.warn('abort connection attempt');
401-
this.abortController?.abort();
402-
// in case the abort controller didn't manage to cancel the connection attempt, reject the connect promise explicitly
403-
this.connectFuture?.reject?.(new ConnectionError('Client initiated disconnect'));
404-
this.connectFuture = undefined;
405-
}
406-
// send leave
407-
if (this.engine?.client.isConnected) {
408-
await this.engine.client.sendLeave();
409-
}
410-
// close engine (also closes client)
411-
if (this.engine) {
412-
await this.engine.close();
401+
const unlock = await this.disconnectLock.lock();
402+
try {
403+
if (this.state === ConnectionState.Disconnected) {
404+
log.debug('already disconnected');
405+
return;
406+
}
407+
log.info('disconnect from room', { identity: this.localParticipant.identity });
408+
if (
409+
this.state === ConnectionState.Connecting ||
410+
this.state === ConnectionState.Reconnecting
411+
) {
412+
// try aborting pending connection attempt
413+
log.warn('abort connection attempt');
414+
this.abortController?.abort();
415+
// in case the abort controller didn't manage to cancel the connection attempt, reject the connect promise explicitly
416+
this.connectFuture?.reject?.(new ConnectionError('Client initiated disconnect'));
417+
this.connectFuture = undefined;
418+
}
419+
// send leave
420+
if (this.engine?.client.isConnected) {
421+
await this.engine.client.sendLeave();
422+
}
423+
// close engine (also closes client)
424+
if (this.engine) {
425+
log.info(`closing engine ${this.localParticipant.identity}: ${this.localParticipant.sid}`);
426+
await this.engine.close();
427+
log.info(
428+
`successfully closed engine ${this.localParticipant.identity}: ${this.localParticipant.sid}`,
429+
);
430+
}
431+
this.handleDisconnect(stopTracks, DisconnectReason.CLIENT_INITIATED);
432+
/* @ts-ignore */
433+
this.engine = undefined;
434+
} finally {
435+
unlock();
413436
}
414-
this.handleDisconnect(stopTracks, DisconnectReason.CLIENT_INITIATED);
415-
/* @ts-ignore */
416-
this.engine = undefined;
417437
};
418438

419439
/**

src/room/utils.ts

+35
Original file line numberDiff line numberDiff line change
@@ -320,3 +320,38 @@ export function createAudioAnalyser(
320320

321321
return { calculateVolume, analyser, cleanup };
322322
}
323+
324+
export class Mutex {
325+
private _locking: Promise<void>;
326+
327+
private _locks: number;
328+
329+
constructor() {
330+
this._locking = Promise.resolve();
331+
this._locks = 0;
332+
}
333+
334+
isLocked() {
335+
return this._locks > 0;
336+
}
337+
338+
lock() {
339+
this._locks += 1;
340+
341+
let unlockNext: () => void;
342+
343+
const willLock = new Promise<void>(
344+
(resolve) =>
345+
(unlockNext = () => {
346+
this._locks -= 1;
347+
resolve();
348+
}),
349+
);
350+
351+
const willUnlock = this._locking.then(() => unlockNext);
352+
353+
this._locking = this._locking.then(() => willLock);
354+
355+
return willUnlock;
356+
}
357+
}

0 commit comments

Comments
 (0)