Skip to content

Commit af223f4

Browse files
lukasIOdavidzhao
andauthored
Emit TrackUnpublished before TrackPublished (#541)
* Emit TrackUnpublished before TrackUnpublished within the same update * changeset * fix changeset * LocalParticipant.unpublishTrack to be return a promise * add negotiation timeout, use pc event obj * add negotiationStarted event * fix changeset Co-authored-by: David Zhao <[email protected]>
1 parent 260ad8b commit af223f4

File tree

5 files changed

+79
-28
lines changed

5 files changed

+79
-28
lines changed

.changeset/curvy-years-listen.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'livekit-client': patch
3+
---
4+
5+
Make `unpublishTrack` async, emit TrackUnpublished before TrackPublished within the same update

src/room/PCTransport.ts

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import EventEmitter from 'events';
12
import { MediaDescription, parse, write } from 'sdp-transform';
23
import { debounce } from 'ts-debounce';
34
import log from '../logger';
@@ -10,8 +11,13 @@ interface TrackBitrateInfo {
1011
maxbr: number;
1112
}
1213

14+
export const PCEvents = {
15+
NegotiationStarted: 'negotiationStarted',
16+
NegotiationComplete: 'negotiationComplete',
17+
} as const;
18+
1319
/** @internal */
14-
export default class PCTransport {
20+
export default class PCTransport extends EventEmitter {
1521
pc: RTCPeerConnection;
1622

1723
pendingCandidates: RTCIceCandidateInit[] = [];
@@ -27,6 +33,7 @@ export default class PCTransport {
2733
onOffer?: (offer: RTCSessionDescriptionInit) => void;
2834

2935
constructor(config?: RTCConfiguration) {
36+
super();
3037
this.pc = new RTCPeerConnection(config);
3138
}
3239

@@ -56,11 +63,14 @@ export default class PCTransport {
5663
if (this.renegotiate) {
5764
this.renegotiate = false;
5865
this.createAndSendOffer();
66+
} else if (sd.type === 'answer') {
67+
this.emit(PCEvents.NegotiationComplete);
5968
}
6069
}
6170

6271
// debounced negotiate interface
6372
negotiate = debounce((onError?: (e: Error) => void) => {
73+
this.emit(PCEvents.NegotiationStarted);
6474
try {
6575
this.createAndSendOffer();
6676
} catch (e) {

src/room/RTCEngine.ts

+29-11
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import {
2929
UnexpectedConnectionState,
3030
} from './errors';
3131
import { EngineEvent } from './events';
32-
import PCTransport from './PCTransport';
32+
import PCTransport, { PCEvents } from './PCTransport';
3333
import type { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy';
3434
import type LocalTrack from './track/LocalTrack';
3535
import type LocalVideoTrack from './track/LocalVideoTrack';
@@ -964,18 +964,36 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
964964
}
965965

966966
/** @internal */
967-
negotiate() {
968-
if (!this.publisher) {
969-
return;
970-
}
967+
negotiate(): Promise<void> {
968+
// observe signal state
969+
return new Promise<void>((resolve, reject) => {
970+
if (!this.publisher) {
971+
reject(new NegotiationError('publisher is not defined'));
972+
return;
973+
}
971974

972-
this.hasPublished = true;
975+
this.hasPublished = true;
973976

974-
this.publisher.negotiate((e) => {
975-
if (e instanceof NegotiationError) {
976-
this.fullReconnectOnNext = true;
977-
}
978-
this.handleDisconnect('negotiation');
977+
const negotiationTimeout = setTimeout(() => {
978+
reject('negotiation timed out');
979+
this.handleDisconnect('negotiation');
980+
}, this.peerConnectionTimeout);
981+
982+
this.publisher.once(PCEvents.NegotiationStarted, () => {
983+
this.publisher?.once(PCEvents.NegotiationComplete, () => {
984+
clearTimeout(negotiationTimeout);
985+
resolve();
986+
});
987+
});
988+
989+
this.publisher.negotiate((e) => {
990+
clearTimeout(negotiationTimeout);
991+
reject(e);
992+
if (e instanceof NegotiationError) {
993+
this.fullReconnectOnNext = true;
994+
}
995+
this.handleDisconnect('negotiation');
996+
});
979997
});
980998
}
981999

src/room/participant/LocalParticipant.ts

+29-11
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ export default class LocalParticipant extends Participant {
262262
} else if (track && track.track) {
263263
// screenshare cannot be muted, unpublish instead
264264
if (source === Track.Source.ScreenShare) {
265-
track = this.unpublishTrack(track.track);
265+
track = await this.unpublishTrack(track.track);
266266
const screenAudioTrack = this.getTrack(Track.Source.ScreenShareAudio);
267267
if (screenAudioTrack && screenAudioTrack.track) {
268268
this.unpublishTrack(screenAudioTrack.track);
@@ -694,10 +694,10 @@ export default class LocalParticipant extends Participant {
694694
log.debug(`published ${videoCodec} for track ${track.sid}`, { encodings, trackInfo: ti });
695695
}
696696

697-
unpublishTrack(
697+
async unpublishTrack(
698698
track: LocalTrack | MediaStreamTrack,
699699
stopOnUnpublish?: boolean,
700-
): LocalTrackPublication | undefined {
700+
): Promise<LocalTrackPublication | undefined> {
701701
// look through all published tracks to find the right ones
702702
const publication = this.getPublicationForTrack(track);
703703

@@ -744,7 +744,7 @@ export default class LocalParticipant extends Participant {
744744
} catch (e) {
745745
log.warn('failed to unpublish track', { error: e, method: 'unpublishTrack' });
746746
} finally {
747-
this.engine.negotiate();
747+
await this.engine.negotiate();
748748
}
749749
}
750750

@@ -769,15 +769,33 @@ export default class LocalParticipant extends Participant {
769769
return publication;
770770
}
771771

772-
unpublishTracks(tracks: LocalTrack[] | MediaStreamTrack[]): LocalTrackPublication[] {
773-
const publications: LocalTrackPublication[] = [];
774-
tracks.forEach((track: LocalTrack | MediaStreamTrack) => {
775-
const pub = this.unpublishTrack(track);
776-
if (pub) {
777-
publications.push(pub);
772+
async unpublishTracks(
773+
tracks: LocalTrack[] | MediaStreamTrack[],
774+
): Promise<LocalTrackPublication[]> {
775+
const results = await Promise.all(tracks.map((track) => this.unpublishTrack(track)));
776+
return results.filter(
777+
(track) => track instanceof LocalTrackPublication,
778+
) as LocalTrackPublication[];
779+
}
780+
781+
async republishAllTracks(options?: TrackPublishOptions) {
782+
const localPubs: LocalTrackPublication[] = [];
783+
this.tracks.forEach((pub) => {
784+
if (pub.track) {
785+
if (options) {
786+
pub.options = { ...pub.options, ...options };
787+
}
788+
localPubs.push(pub);
778789
}
779790
});
780-
return publications;
791+
792+
await Promise.all(
793+
localPubs.map(async (pub) => {
794+
const track = pub.track!;
795+
await this.unpublishTrack(track, false);
796+
await this.publishTrack(track, pub.options);
797+
}),
798+
);
781799
}
782800

783801
/**

src/room/participant/RemoteParticipant.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -263,11 +263,6 @@ export default class RemoteParticipant extends Participant {
263263
validTracks.set(ti.sid, publication);
264264
});
265265

266-
// always emit events for new publications, Room will not forward them unless it's ready
267-
newTracks.forEach((publication) => {
268-
this.emit(ParticipantEvent.TrackPublished, publication);
269-
});
270-
271266
// detect removed tracks
272267
this.tracks.forEach((publication) => {
273268
if (!validTracks.has(publication.trackSid)) {
@@ -278,6 +273,11 @@ export default class RemoteParticipant extends Participant {
278273
this.unpublishTrack(publication.trackSid, true);
279274
}
280275
});
276+
277+
// always emit events for new publications, Room will not forward them unless it's ready
278+
newTracks.forEach((publication) => {
279+
this.emit(ParticipantEvent.TrackPublished, publication);
280+
});
281281
}
282282

283283
/** @internal */

0 commit comments

Comments
 (0)