Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quick-turkeys-post.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Improve e2e reliablility of data channel
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
},
"dependencies": {
"@livekit/mutex": "1.1.1",
"@livekit/protocol": "1.38.0",
"@livekit/protocol": "1.39.1",
"events": "^3.3.0",
"loglevel": "^1.9.2",
"sdp-transform": "^2.15.0",
Expand Down
25 changes: 10 additions & 15 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/api/SignalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,10 @@ export class SignalClient {
sid,
reconnectReason: reason,
});
return res;
if (res instanceof ReconnectResponse) {
return res;
}
return;
}

private connect(
Expand Down
65 changes: 63 additions & 2 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ClientConfiguration,
type ConnectionQualityUpdate,
DataChannelInfo,
DataChannelReceiveState,
DataPacket,
DataPacket_Kind,
DisconnectReason,
Expand Down Expand Up @@ -44,6 +45,8 @@ import {
} from '../api/SignalClient';
import log, { LoggerNames, getLogger } from '../logger';
import type { InternalRoomOptions } from '../options';
import { DataPacketBuffer } from '../utils/dataPacketBuffer';
import { TTLMap } from '../utils/ttlmap';
import PCTransport, { PCEvents } from './PCTransport';
import { PCTransportManager, PCTransportState } from './PCTransportManager';
import type { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy';
Expand Down Expand Up @@ -81,6 +84,7 @@ const lossyDataChannel = '_lossy';
const reliableDataChannel = '_reliable';
const minReconnectWait = 2 * 1000;
const leaveReconnect = 'leave-reconnect';
const reliabeReceiveStateTTL = 30_000;

enum PCState {
New,
Expand Down Expand Up @@ -178,6 +182,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

private publisherConnectionPromise: Promise<void> | undefined;

private reliableDataSequence: number = 1;

private reliableMessageBuffer = new DataPacketBuffer();

private reliableReceivedState: TTLMap<string, number> = new TTLMap(reliabeReceiveStateTTL);

constructor(private options: InternalRoomOptions) {
super();
this.log = getLogger(options.loggerName ?? LoggerNames.Engine);
Expand Down Expand Up @@ -310,6 +320,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
this.lossyDCSub = undefined;
this.reliableDC = undefined;
this.reliableDCSub = undefined;
this.reliableMessageBuffer = new DataPacketBuffer();
this.reliableDataSequence = 1;
this.reliableReceivedState.clear();
}

async cleanupClient() {
Expand Down Expand Up @@ -677,6 +690,15 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
const dp = DataPacket.fromBinary(new Uint8Array(buffer));

if (dp.sequence > 0 && dp.participantSid !== '') {
const lastSeq = this.reliableReceivedState.get(dp.participantSid);
if (lastSeq && dp.sequence <= lastSeq) {
// ignore duplicate or out-of-order packets in reliable channel
return;
}
this.reliableReceivedState.set(dp.participantSid, dp.sequence);
}

if (dp.value?.case === 'speaker') {
// dispatch speaker updates
this.emit(EngineEvent.ActiveSpeakersUpdate, dp.value.value.speakers);
Expand Down Expand Up @@ -1033,6 +1055,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
if (res) {
const rtcConfig = this.makeRTCConfiguration(res);
this.pcManager.updateConfiguration(rtcConfig);
if (this.latestJoinResponse) {
this.latestJoinResponse.serverInfo = res.serverInfo;
}
} else {
this.log.warn('Did not receive reconnect response', this.logContext);
}
Expand All @@ -1059,6 +1084,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
this.createDataChannels();
}

if (res?.lastMessageSeq) {
this.resendReliableMessagesForResume(res.lastMessageSeq);
}

// resume success
this.emit(EngineEvent.Resumed);
}
Expand Down Expand Up @@ -1151,19 +1180,42 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

/* @internal */
async sendDataPacket(packet: DataPacket, kind: DataPacket_Kind) {
const msg = packet.toBinary();

// make sure we do have a data connection
await this.ensurePublisherConnected(kind);

if (kind === DataPacket_Kind.RELIABLE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, thinking this.ensurePublisherConnection might race for parallel sendDataPacket s so I'm not sure if it's guaranteed to respect invocation order.

Copy link
Contributor Author

@cnderrauber cnderrauber Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no await between increase sequence and dc.Send so the messages in the wire are in order and we can guaranteed the message has been received in the order which is sent. The sendxxx methods are async so I think the developer needs to wait for the first call completed before the next call if the message order is matter?

Copy link
Contributor

@lukasIO lukasIO Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's right.
I just wonder what happens to our buffer logic here if the message order doesn't match the sequence number.
We simply pop and don't really look at the actual sequence number order in the buffer, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the message order in the buffer should be same as the sequence number. I assume the message will be in order since there is no async statements between increase the number and push to buffer, is it wrong in the typescript?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, yeah, I think your assumption is correct!

packet.sequence = this.reliableDataSequence;
this.reliableDataSequence += 1;
}
const msg = packet.toBinary();
const dc = this.dataChannelForKind(kind);
if (dc) {
if (kind === DataPacket_Kind.RELIABLE) {
this.reliableMessageBuffer.push({ data: msg, sequence: packet.sequence });
}

if (this.attemptingReconnect) {
return;
}

dc.send(msg);
}

this.updateAndEmitDCBufferStatus(kind);
}

private async resendReliableMessagesForResume(lastMessageSeq: number) {
await this.ensurePublisherConnected(DataPacket_Kind.RELIABLE);
const dc = this.dataChannelForKind(DataPacket_Kind.RELIABLE);
if (dc) {
this.reliableMessageBuffer.popToSequence(lastMessageSeq);
this.reliableMessageBuffer.getAll().forEach((msg) => {
dc.send(msg.data);
});
}
this.updateAndEmitDCBufferStatus(DataPacket_Kind.RELIABLE);
}

private updateAndEmitDCBufferStatus = (kind: DataPacket_Kind) => {
const status = this.isBufferStatusLow(kind);
if (typeof status !== 'undefined' && status !== this.dcBufferStatus.get(kind)) {
Expand All @@ -1175,6 +1227,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
private isBufferStatusLow = (kind: DataPacket_Kind): boolean | undefined => {
const dc = this.dataChannelForKind(kind);
if (dc) {
if (kind === DataPacket_Kind.RELIABLE) {
this.reliableMessageBuffer.alignBufferedAmount(dc.bufferedAmount);
}
return dc.bufferedAmount <= dc.bufferedAmountLowThreshold;
}
};
Expand Down Expand Up @@ -1409,6 +1464,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
publishTracks: getTrackPublicationInfo(localTracks),
dataChannels: this.dataChannelsInfo(),
trackSidsDisabled,
datachannelReceiveStates: this.reliableReceivedState.map((seq, sid) => {
return new DataChannelReceiveState({
publisherSid: sid,
lastSeq: seq,
});
}),
}),
);
}
Expand Down
52 changes: 52 additions & 0 deletions src/utils/dataPacketBuffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
export interface DataPacketItem {
data: Uint8Array;
sequence: number;
}

export class DataPacketBuffer {
private buffer: DataPacketItem[] = [];

private _totalSize = 0;

push(item: DataPacketItem) {
this.buffer.push(item);
this._totalSize += item.data.byteLength;
}

pop(): DataPacketItem | undefined {
const item = this.buffer.shift();
if (item) {
this._totalSize -= item.data.byteLength;
}
return item;
}

getAll(): DataPacketItem[] {
return this.buffer.slice();
}

popToSequence(sequence: number) {
while (this.buffer.length > 0) {
const first = this.buffer[0];
if (first.sequence <= sequence) {
this.pop();
} else {
break;
}
}
}

alignBufferedAmount(bufferedAmount: number) {
while (this.buffer.length > 0) {
const first = this.buffer[0];
if (this._totalSize - first.data.byteLength <= bufferedAmount) {
break;
}
this.pop();
}
}

get length(): number {
return this.buffer.length;
}
}
Loading
Loading