Skip to content

Commit 111971d

Browse files
authored
Replace async queue with mutex lock for mute operations (#632)
* Replace async queue with mutex lock for mute operations * decouple pauseUpstream and mute lock * Create .changeset/twelve-maps-double.md
1 parent f085fbe commit 111971d

File tree

5 files changed

+56
-34
lines changed

5 files changed

+56
-34
lines changed

.changeset/twelve-maps-double.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"livekit-client": patch
3+
---
4+
5+
Replace async queue with mutex lock for mute operations

src/room/track/LocalAudioTrack.ts

+13-6
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,24 @@ export default class LocalAudioTrack extends LocalTrack {
3939
}
4040

4141
async mute(): Promise<LocalAudioTrack> {
42-
await this.muteQueue.run(async () => {
42+
const unlock = await this.muteLock.lock();
43+
try {
4344
// disabled special handling as it will cause BT headsets to switch communication modes
4445
if (this.source === Track.Source.Microphone && this.stopOnMute && !this.isUserProvided) {
4546
log.debug('stopping mic track');
4647
// also stop the track, so that microphone indicator is turned off
4748
this._mediaStreamTrack.stop();
4849
}
4950
await super.mute();
50-
});
51-
return this;
51+
return this;
52+
} finally {
53+
unlock();
54+
}
5255
}
5356

5457
async unmute(): Promise<LocalAudioTrack> {
55-
await this.muteQueue.run(async () => {
58+
const unlock = await this.muteLock.lock();
59+
try {
5660
if (
5761
this.source === Track.Source.Microphone &&
5862
(this.stopOnMute || this._mediaStreamTrack.readyState === 'ended') &&
@@ -62,8 +66,11 @@ export default class LocalAudioTrack extends LocalTrack {
6266
await this.restartTrack();
6367
}
6468
await super.unmute();
65-
});
66-
return this;
69+
70+
return this;
71+
} finally {
72+
unlock();
73+
}
6774
}
6875

6976
async restartTrack(options?: AudioCaptureOptions) {

src/room/track/LocalTrack.ts

+22-8
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1-
import Queue from 'async-await-queue';
21
import log from '../../logger';
32
import DeviceManager from '../DeviceManager';
43
import { TrackInvalidError } from '../errors';
54
import { TrackEvent } from '../events';
6-
import { getEmptyAudioStreamTrack, getEmptyVideoStreamTrack, isMobile, sleep } from '../utils';
5+
import {
6+
getEmptyAudioStreamTrack,
7+
getEmptyVideoStreamTrack,
8+
isMobile,
9+
Mutex,
10+
sleep,
11+
} from '../utils';
712
import type { VideoCodec } from './options';
813
import { attachToElement, detachTrack, Track } from './Track';
914

@@ -22,7 +27,9 @@ export default abstract class LocalTrack extends Track {
2227

2328
protected providedByUser: boolean;
2429

25-
protected muteQueue: Queue;
30+
protected muteLock: Mutex;
31+
32+
protected pauseUpstreamLock: Mutex;
2633

2734
/**
2835
*
@@ -42,7 +49,8 @@ export default abstract class LocalTrack extends Track {
4249
this.constraints = constraints ?? mediaTrack.getConstraints();
4350
this.reacquireTrack = false;
4451
this.providedByUser = userProvidedTrack;
45-
this.muteQueue = new Queue();
52+
this.muteLock = new Mutex();
53+
this.pauseUpstreamLock = new Mutex();
4654
}
4755

4856
get id(): string {
@@ -246,7 +254,8 @@ export default abstract class LocalTrack extends Track {
246254
};
247255

248256
async pauseUpstream() {
249-
this.muteQueue.run(async () => {
257+
const unlock = await this.pauseUpstreamLock.lock();
258+
try {
250259
if (this._isUpstreamPaused === true) {
251260
return;
252261
}
@@ -260,11 +269,14 @@ export default abstract class LocalTrack extends Track {
260269
const emptyTrack =
261270
this.kind === Track.Kind.Audio ? getEmptyAudioStreamTrack() : getEmptyVideoStreamTrack();
262271
await this.sender.replaceTrack(emptyTrack);
263-
});
272+
} finally {
273+
unlock();
274+
}
264275
}
265276

266277
async resumeUpstream() {
267-
this.muteQueue.run(async () => {
278+
const unlock = await this.pauseUpstreamLock.lock();
279+
try {
268280
if (this._isUpstreamPaused === false) {
269281
return;
270282
}
@@ -276,7 +288,9 @@ export default abstract class LocalTrack extends Track {
276288
this.emit(TrackEvent.UpstreamResumed, this);
277289

278290
await this.sender.replaceTrack(this._mediaStreamTrack);
279-
});
291+
} finally {
292+
unlock();
293+
}
280294
}
281295

282296
protected abstract monitorSender(): void;

src/room/track/LocalVideoTrack.ts

+12-6
Original file line numberDiff line numberDiff line change
@@ -97,26 +97,32 @@ export default class LocalVideoTrack extends LocalTrack {
9797
}
9898

9999
async mute(): Promise<LocalVideoTrack> {
100-
await this.muteQueue.run(async () => {
100+
const unlock = await this.muteLock.lock();
101+
try {
101102
if (this.source === Track.Source.Camera && !this.isUserProvided) {
102103
log.debug('stopping camera track');
103104
// also stop the track, so that camera indicator is turned off
104105
this._mediaStreamTrack.stop();
105106
}
106107
await super.mute();
107-
});
108-
return this;
108+
return this;
109+
} finally {
110+
unlock();
111+
}
109112
}
110113

111114
async unmute(): Promise<LocalVideoTrack> {
112-
await this.muteQueue.run(async () => {
115+
const unlock = await this.muteLock.lock();
116+
try {
113117
if (this.source === Track.Source.Camera && !this.isUserProvided) {
114118
log.debug('reacquiring camera track');
115119
await this.restartTrack();
116120
}
117121
await super.unmute();
118-
});
119-
return this;
122+
return this;
123+
} finally {
124+
unlock();
125+
}
120126
}
121127

122128
async getSenderStats(): Promise<VideoSenderStats[]> {

yarn.lock

+4-14
Original file line numberDiff line numberDiff line change
@@ -3228,20 +3228,10 @@ camelcase@^6.2.0:
32283228
resolved "https://registry.yarnpkg.com/camelcase/-/camelcase-6.2.1.tgz#250fd350cfd555d0d2160b1d51510eaf8326e86e"
32293229
integrity sha512-tVI4q5jjFV5CavAU8DXfza/TJcZutVKo/5Foskmsqcm0MsL91moHvwiGNnqaa2o6PF/7yT5ikDRcVcl8Rj6LCA==
32303230

3231-
caniuse-lite@^1.0.30001280:
3232-
version "1.0.30001361"
3233-
resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001361.tgz"
3234-
integrity sha512-ybhCrjNtkFji1/Wto6SSJKkWk6kZgVQsDq5QI83SafsF6FXv2JB4df9eEdH6g8sdGgqTXrFLjAxqBGgYoU3azQ==
3235-
3236-
caniuse-lite@^1.0.30001366:
3237-
version "1.0.30001368"
3238-
resolved "https://registry.yarnpkg.com/caniuse-lite/-/caniuse-lite-1.0.30001368.tgz#c5c06381c6051cd863c45021475434e81936f713"
3239-
integrity sha512-wgfRYa9DenEomLG/SdWgQxpIyvdtH3NW8Vq+tB6AwR9e56iOIcu1im5F/wNdDf04XlKHXqIx4N8Jo0PemeBenQ==
3240-
3241-
caniuse-lite@^1.0.30001400:
3242-
version "1.0.30001414"
3243-
resolved "https://registry.yarnpkg.com/caniuse-lite/-/caniuse-lite-1.0.30001414.tgz#5f1715e506e71860b4b07c50060ea6462217611e"
3244-
integrity sha512-t55jfSaWjCdocnFdKQoO+d2ct9C59UZg4dY3OnUlSZ447r8pUtIKdp0hpAzrGFultmTC+Us+KpKi4GZl/LXlFg==
3231+
caniuse-lite@^1.0.30001280, caniuse-lite@^1.0.30001366, caniuse-lite@^1.0.30001400:
3232+
version "1.0.30001472"
3233+
resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001472.tgz"
3234+
integrity sha512-xWC/0+hHHQgj3/vrKYY0AAzeIUgr7L9wlELIcAvZdDUHlhL/kNxMdnQLOSOQfP8R51ZzPhmHdyMkI0MMpmxCfg==
32453235

32463236
case-anything@^2.1.10:
32473237
version "2.1.10"

0 commit comments

Comments
 (0)