diff --git a/.changeset/quick-turkeys-post.md b/.changeset/quick-turkeys-post.md new file mode 100644 index 0000000000..3f27a040e4 --- /dev/null +++ b/.changeset/quick-turkeys-post.md @@ -0,0 +1,5 @@ +--- +'livekit-client': patch +--- + +Improve e2e reliablility of data channel diff --git a/package.json b/package.json index daa1a27b79..c763a28542 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ }, "dependencies": { "@livekit/mutex": "1.1.1", - "@livekit/protocol": "1.38.0", + "@livekit/protocol": "1.39.1", "events": "^3.3.0", "loglevel": "^1.9.2", "sdp-transform": "^2.15.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f995cc9de2..5bbeee8a4f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -12,8 +12,8 @@ importers: specifier: 1.1.1 version: 1.1.1 '@livekit/protocol': - specifier: 1.38.0 - version: 1.38.0 + specifier: 1.39.1 + version: 1.39.1 '@types/dom-mediacapture-record': specifier: ^1 version: 1.0.22 @@ -696,9 +696,6 @@ packages: resolution: {integrity: sha512-+EzkxvLNfiUeKMgy/3luqfsCWFRXLb7U6wNQTk60tovuckwB15B191tJWvpp4HjiQWdJkCxO3Wbvc6jlk3Xb2Q==} engines: {node: '>=6.9.0'} - '@bufbuild/protobuf@1.10.0': - resolution: {integrity: sha512-QDdVFLoN93Zjg36NoQPZfsVH9tZew7wKDKyV5qRdj8ntT4wQCOradQjRaTdwMhWUYsgKsvCINKKm87FdEk96Ag==} - '@bufbuild/protobuf@1.10.1': resolution: {integrity: sha512-wJ8ReQbHxsAfXhrf9ixl0aYbZorRuOWpBNzm8pL8ftmSxQx/wnJD5Eg861NwJU/czy2VXFIebCeZnZrI9rktIQ==} @@ -1010,8 +1007,8 @@ packages: '@livekit/mutex@1.1.1': resolution: {integrity: sha512-EsshAucklmpuUAfkABPxJNhzj9v2sG7JuzFDL4ML1oJQSV14sqrpTYnsaOudMAw9yOaW53NU3QQTlUQoRs4czw==} - '@livekit/protocol@1.38.0': - resolution: {integrity: sha512-XX6ulvsE1XCN18LVf3ydHN7Ri1Z1M1P5dQdjnm5nVDsSqUL12Vbo/4RKcRlCEXAg2qB62mKjcaVLXVwkfXggkg==} + '@livekit/protocol@1.39.1': + resolution: {integrity: sha512-LK3JFUOjpb4UeNacvqcdhdrn7AxkklyojtOHH589z3LjXGkSM0ZTGskmT1fKD3mJskFgJ9Dlw1RbmdzLKXhEhw==} '@manypkg/find-root@1.1.0': resolution: {integrity: sha512-mki5uBvhHzO8kYYix/WRy2WX8S3B5wdVSc9D6KcU5lQNglP2yt58/VfLuAK49glRXChosY8ap2oJ1qgma3GUVA==} @@ -3188,8 +3185,8 @@ packages: engines: {node: '>=14.17'} hasBin: true - typescript@5.9.0-dev.20250523: - resolution: {integrity: sha512-igDelnnCrlPHezUmn1QseV7BBn7bITue651a+X+JSKkBDwrvyy5vqd1G6dvM3vrc4PlInB5F6jQDp/23kg9ZEA==} + typescript@5.9.0-dev.20250610: + resolution: {integrity: sha512-jvHWZeBl3xNkveo2FMtA6ZHFb8D5CjNVU5qbI/yzpkvUzEFcqwDhKK9FaprSxupsb/V2cdMXsV0u2DMnrsfD9A==} engines: {node: '>=14.17'} hasBin: true @@ -4134,8 +4131,6 @@ snapshots: '@babel/helper-string-parser': 7.27.1 '@babel/helper-validator-identifier': 7.27.1 - '@bufbuild/protobuf@1.10.0': {} - '@bufbuild/protobuf@1.10.1': {} '@bufbuild/protoc-gen-es@1.10.1(@bufbuild/protobuf@1.10.1)': @@ -4475,9 +4470,9 @@ snapshots: '@livekit/mutex@1.1.1': {} - '@livekit/protocol@1.38.0': + '@livekit/protocol@1.39.1': dependencies: - '@bufbuild/protobuf': 1.10.0 + '@bufbuild/protobuf': 1.10.1 '@manypkg/find-root@1.1.0': dependencies: @@ -5279,7 +5274,7 @@ snapshots: dependencies: semver: 7.6.0 shelljs: 0.8.5 - typescript: 5.9.0-dev.20250523 + typescript: 5.9.0-dev.20250610 electron-to-chromium@1.5.4: {} @@ -6783,7 +6778,7 @@ snapshots: typescript@5.8.3: {} - typescript@5.9.0-dev.20250523: {} + typescript@5.9.0-dev.20250610: {} uc.micro@2.1.0: {} diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index ebc05a877a..891237bb86 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -252,7 +252,10 @@ export class SignalClient { sid, reconnectReason: reason, }); - return res; + if (res instanceof ReconnectResponse) { + return res; + } + return; } private connect( diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 06bac286fa..2589804e8e 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -5,6 +5,7 @@ import { ClientConfiguration, type ConnectionQualityUpdate, DataChannelInfo, + DataChannelReceiveState, DataPacket, DataPacket_Kind, DisconnectReason, @@ -44,6 +45,8 @@ import { } from '../api/SignalClient'; import log, { LoggerNames, getLogger } from '../logger'; import type { InternalRoomOptions } from '../options'; +import { DataPacketBuffer } from '../utils/dataPacketBuffer'; +import { TTLMap } from '../utils/ttlmap'; import PCTransport, { PCEvents } from './PCTransport'; import { PCTransportManager, PCTransportState } from './PCTransportManager'; import type { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy'; @@ -81,6 +84,7 @@ const lossyDataChannel = '_lossy'; const reliableDataChannel = '_reliable'; const minReconnectWait = 2 * 1000; const leaveReconnect = 'leave-reconnect'; +const reliabeReceiveStateTTL = 30_000; enum PCState { New, @@ -178,6 +182,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private publisherConnectionPromise: Promise | undefined; + private reliableDataSequence: number = 1; + + private reliableMessageBuffer = new DataPacketBuffer(); + + private reliableReceivedState: TTLMap = new TTLMap(reliabeReceiveStateTTL); + constructor(private options: InternalRoomOptions) { super(); this.log = getLogger(options.loggerName ?? LoggerNames.Engine); @@ -310,6 +320,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.lossyDCSub = undefined; this.reliableDC = undefined; this.reliableDCSub = undefined; + this.reliableMessageBuffer = new DataPacketBuffer(); + this.reliableDataSequence = 1; + this.reliableReceivedState.clear(); } async cleanupClient() { @@ -677,6 +690,15 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } const dp = DataPacket.fromBinary(new Uint8Array(buffer)); + if (dp.sequence > 0 && dp.participantSid !== '') { + const lastSeq = this.reliableReceivedState.get(dp.participantSid); + if (lastSeq && dp.sequence <= lastSeq) { + // ignore duplicate or out-of-order packets in reliable channel + return; + } + this.reliableReceivedState.set(dp.participantSid, dp.sequence); + } + if (dp.value?.case === 'speaker') { // dispatch speaker updates this.emit(EngineEvent.ActiveSpeakersUpdate, dp.value.value.speakers); @@ -1033,6 +1055,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (res) { const rtcConfig = this.makeRTCConfiguration(res); this.pcManager.updateConfiguration(rtcConfig); + if (this.latestJoinResponse) { + this.latestJoinResponse.serverInfo = res.serverInfo; + } } else { this.log.warn('Did not receive reconnect response', this.logContext); } @@ -1059,6 +1084,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.createDataChannels(); } + if (res?.lastMessageSeq) { + this.resendReliableMessagesForResume(res.lastMessageSeq); + } + // resume success this.emit(EngineEvent.Resumed); } @@ -1151,19 +1180,42 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /* @internal */ async sendDataPacket(packet: DataPacket, kind: DataPacket_Kind) { - const msg = packet.toBinary(); - // make sure we do have a data connection await this.ensurePublisherConnected(kind); + if (kind === DataPacket_Kind.RELIABLE) { + packet.sequence = this.reliableDataSequence; + this.reliableDataSequence += 1; + } + const msg = packet.toBinary(); const dc = this.dataChannelForKind(kind); if (dc) { + if (kind === DataPacket_Kind.RELIABLE) { + this.reliableMessageBuffer.push({ data: msg, sequence: packet.sequence }); + } + + if (this.attemptingReconnect) { + return; + } + dc.send(msg); } this.updateAndEmitDCBufferStatus(kind); } + private async resendReliableMessagesForResume(lastMessageSeq: number) { + await this.ensurePublisherConnected(DataPacket_Kind.RELIABLE); + const dc = this.dataChannelForKind(DataPacket_Kind.RELIABLE); + if (dc) { + this.reliableMessageBuffer.popToSequence(lastMessageSeq); + this.reliableMessageBuffer.getAll().forEach((msg) => { + dc.send(msg.data); + }); + } + this.updateAndEmitDCBufferStatus(DataPacket_Kind.RELIABLE); + } + private updateAndEmitDCBufferStatus = (kind: DataPacket_Kind) => { const status = this.isBufferStatusLow(kind); if (typeof status !== 'undefined' && status !== this.dcBufferStatus.get(kind)) { @@ -1175,6 +1227,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private isBufferStatusLow = (kind: DataPacket_Kind): boolean | undefined => { const dc = this.dataChannelForKind(kind); if (dc) { + if (kind === DataPacket_Kind.RELIABLE) { + this.reliableMessageBuffer.alignBufferedAmount(dc.bufferedAmount); + } return dc.bufferedAmount <= dc.bufferedAmountLowThreshold; } }; @@ -1409,6 +1464,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit publishTracks: getTrackPublicationInfo(localTracks), dataChannels: this.dataChannelsInfo(), trackSidsDisabled, + datachannelReceiveStates: this.reliableReceivedState.map((seq, sid) => { + return new DataChannelReceiveState({ + publisherSid: sid, + lastSeq: seq, + }); + }), }), ); } diff --git a/src/utils/dataPacketBuffer.ts b/src/utils/dataPacketBuffer.ts new file mode 100644 index 0000000000..9d41877d66 --- /dev/null +++ b/src/utils/dataPacketBuffer.ts @@ -0,0 +1,52 @@ +export interface DataPacketItem { + data: Uint8Array; + sequence: number; +} + +export class DataPacketBuffer { + private buffer: DataPacketItem[] = []; + + private _totalSize = 0; + + push(item: DataPacketItem) { + this.buffer.push(item); + this._totalSize += item.data.byteLength; + } + + pop(): DataPacketItem | undefined { + const item = this.buffer.shift(); + if (item) { + this._totalSize -= item.data.byteLength; + } + return item; + } + + getAll(): DataPacketItem[] { + return this.buffer.slice(); + } + + popToSequence(sequence: number) { + while (this.buffer.length > 0) { + const first = this.buffer[0]; + if (first.sequence <= sequence) { + this.pop(); + } else { + break; + } + } + } + + alignBufferedAmount(bufferedAmount: number) { + while (this.buffer.length > 0) { + const first = this.buffer[0]; + if (this._totalSize - first.data.byteLength <= bufferedAmount) { + break; + } + this.pop(); + } + } + + get length(): number { + return this.buffer.length; + } +} diff --git a/src/utils/ttlmap.ts b/src/utils/ttlmap.ts new file mode 100644 index 0000000000..0045b4750c --- /dev/null +++ b/src/utils/ttlmap.ts @@ -0,0 +1,96 @@ +export class TTLMap { + private _map = new Map(); + + private ttl: number; + + private _lastCleanup = 0; + + /** + * @param ttl ttl of the key (ms) + */ + constructor(ttl: number) { + this.ttl = ttl; + } + + set(key: K, value: V) { + const now = Date.now(); + if (now - this._lastCleanup > this.ttl / 2) { + this.cleanup(); + } + const expiresAt = now + this.ttl; + this._map.set(key, { value, expiresAt }); + return this; + } + + get(key: K): V | undefined { + const entry = this._map.get(key); + if (!entry) return undefined; + if (entry.expiresAt < Date.now()) { + this._map.delete(key); + return undefined; + } + return entry.value; + } + + has(key: K): boolean { + const entry = this._map.get(key); + if (!entry) return false; + if (entry.expiresAt < Date.now()) { + this._map.delete(key); + return false; + } + return true; + } + + delete(key: K): boolean { + return this._map.delete(key); + } + + clear() { + this._map.clear(); + } + + cleanup() { + const now = Date.now(); + for (const [key, entry] of this._map.entries()) { + if (entry.expiresAt < now) { + this._map.delete(key); + } + } + this._lastCleanup = now; + } + + get size() { + this.cleanup(); + return this._map.size; + } + + forEach(callback: (value: V, key: K, map: Map) => void) { + this.cleanup(); + for (const [key, entry] of this._map.entries()) { + if (entry.expiresAt >= Date.now()) { + callback(entry.value, key, this.asValueMap()); + } + } + } + + map(callback: (value: V, key: K, map: Map) => U): U[] { + this.cleanup(); + const result: U[] = []; + const valueMap = this.asValueMap(); + for (const [key, value] of valueMap.entries()) { + result.push(callback(value, key, valueMap)); + } + return result; + } + + private asValueMap(): Map { + const result = new Map(); + for (const [key, entry] of this._map.entries()) { + if (entry.expiresAt >= Date.now()) { + result.set(key, entry.value); + } + } + return result; + } +}