Skip to content
1 change: 1 addition & 0 deletions .changes/reliable-data-channel
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="changed" "Improved reliability of the data channel"
183 changes: 160 additions & 23 deletions Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
private struct State {
var lossy: LKRTCDataChannel?
var reliable: LKRTCDataChannel?
var reliableDataSequence: UInt32 = 1
var reliableReceivedState: TTLDictionary<String, UInt32> = TTLDictionary(ttl: reliableReceivedStateTTL)

var isOpen: Bool {
guard let lossy, let reliable else { return false }
Expand All @@ -54,13 +56,59 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
case lossy, reliable
}

private struct BufferingState {
var queue: Deque<PublishDataRequest> = []
var amount: UInt64 = 0
private struct SendBuffer {
private var queue: Deque<PublishDataRequest> = []
var rtcAmount: UInt64 = 0

mutating func enqueue(_ request: PublishDataRequest) {
queue.append(request)
}

@discardableResult
mutating func dequeue() -> PublishDataRequest? {
guard !queue.isEmpty else { return nil }
return queue.removeFirst()
}

func canSend(threshold: UInt64) -> Bool {
rtcAmount <= threshold
}
}

private struct RetryBuffer {
private var queue: Deque<PublishDataRequest> = []
private var currentAmount: UInt64 = 0
private let minAmount: UInt64

init(minAmount: UInt64) {
self.minAmount = minAmount
}

func peek() -> PublishDataRequest? { queue.first }

mutating func enqueue(_ request: PublishDataRequest) {
queue.append(request)
currentAmount += UInt64(request.data.data.count)
}

@discardableResult
mutating func dequeue() -> PublishDataRequest? {
guard !queue.isEmpty else { return nil }
let first = queue.removeFirst()
currentAmount -= UInt64(first.data.data.count)
return first
}

mutating func trim(toAmount: UInt64) {
while currentAmount > toAmount + minAmount {
dequeue()
}
}
}

private struct PublishDataRequest: Sendable {
let data: LKRTCDataBuffer
let sequence: UInt32
let continuation: CheckedContinuation<Void, any Error>?
}

Expand All @@ -70,41 +118,60 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {

enum Detail: Sendable {
case publishData(PublishDataRequest)
case publishedData(PublishDataRequest)
case bufferedAmountChanged(UInt64)
case retryRequested(UInt32)
}
}

// MARK: - Event handling

private func handleEvents(
events: AsyncStream<ChannelEvent>
) async {
var lossyBuffering = BufferingState()
var reliableBuffering = BufferingState()
var lossyBuffer = SendBuffer()
var reliableBuffer = SendBuffer()

var reliableRetryBuffer = RetryBuffer(minAmount: Self.reliableRetryAmount)

for await event in events {
switch event.detail {
case let .publishData(request):
switch event.channelKind {
case .lossy: lossyBuffering.queue.append(request)
case .reliable: reliableBuffering.queue.append(request)
case .lossy: lossyBuffer.enqueue(request)
case .reliable: reliableBuffer.enqueue(request)
}
case let .publishedData(request):
switch event.channelKind {
case .lossy: ()
case .reliable: reliableRetryBuffer.enqueue(request)
}
case let .bufferedAmountChanged(amount):
switch event.channelKind {
case .lossy: updateBufferingState(state: &lossyBuffering, newAmount: amount)
case .reliable: updateBufferingState(state: &reliableBuffering, newAmount: amount)
case .lossy:
updateTarget(buffer: &lossyBuffer, newAmount: amount)
case .reliable:
updateTarget(buffer: &reliableBuffer, newAmount: amount)
reliableRetryBuffer.trim(toAmount: amount)
}
case let .retryRequested(lastSeq):
switch event.channelKind {
case .lossy: ()
case .reliable: retry(buffer: &reliableRetryBuffer, from: lastSeq)
}
}

switch event.channelKind {
case .lossy:
processSendQueue(
threshold: Self.lossyLowThreshold,
state: &lossyBuffering,
buffer: &lossyBuffer,
kind: .lossy
)
case .reliable:
processSendQueue(
threshold: Self.reliableLowThreshold,
state: &reliableBuffering,
buffer: &reliableBuffer,
kind: .reliable
)
}
Expand All @@ -120,14 +187,11 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {

private func processSendQueue(
threshold: UInt64,
state: inout BufferingState,
buffer: inout SendBuffer,
kind: ChannelKind
) {
while state.amount <= threshold {
guard !state.queue.isEmpty else { break }
let request = state.queue.removeFirst()

state.amount += UInt64(request.data.data.count)
while buffer.canSend(threshold: threshold), let request = buffer.dequeue() {
buffer.rtcAmount += UInt64(request.data.data.count)

guard let channel = channel(for: kind) else {
request.continuation?.resume(
Expand All @@ -142,21 +206,43 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
return
}
request.continuation?.resume()

let event = ChannelEvent(channelKind: kind, detail: .publishedData(request))
_state.eventContinuation?.yield(event)
}
}

private func updateBufferingState(
state: inout BufferingState,
// MARK: - Cache

private func updateTarget(
buffer: inout SendBuffer,
newAmount: UInt64
) {
guard state.amount >= newAmount else {
guard buffer.rtcAmount >= newAmount else {
log("Unexpected buffer size detected", .error)
state.amount = 0
buffer.rtcAmount = 0
return
}
state.amount -= newAmount
buffer.rtcAmount -= newAmount
}

private func retry(
buffer: inout RetryBuffer,
from lastSeq: UInt32
) {
if let first = buffer.peek(), first.sequence > lastSeq + 1 {
log("Wrong packet sequence while retrying: \(first.sequence) > \(lastSeq + 1), \(first.sequence - lastSeq - 1) packets missing", .warning)
}
while let request = buffer.dequeue() {
if request.sequence > lastSeq {
let event = ChannelEvent(channelKind: .reliable, detail: .publishData(PublishDataRequest(data: request.data, sequence: request.sequence, continuation: nil)))
_state.eventContinuation?.yield(event)
}
}
}

// MARK: - Init

init(delegate: DataChannelDelegate? = nil,
lossyChannel: LKRTCDataChannel? = nil,
reliableChannel: LKRTCDataChannel? = nil)
Expand Down Expand Up @@ -207,6 +293,8 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
let (lossy, reliable) = _state.mutate {
let result = ($0.lossy, $0.reliable)
$0.reliable = nil
$0.reliableDataSequence = 1
$0.reliableReceivedState.removeAll()
$0.lossy = nil
return result
}
Expand All @@ -217,6 +305,8 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
openCompleter.reset()
}

// MARK: - Send

func send(userPacket: Livekit_UserPacket, kind: Livekit_DataPacket.Kind) async throws {
try await send(dataPacket: .with {
$0.kind = kind // TODO: field is deprecated
Expand All @@ -225,12 +315,14 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
}

func send(dataPacket packet: Livekit_DataPacket) async throws {
let packet = withSequence(packet)
let serializedData = try packet.serializedData()
let rtcData = RTC.createDataBuffer(data: serializedData)

try await withCheckedThrowingContinuation { continuation in
let request = PublishDataRequest(
data: rtcData,
sequence: packet.sequence,
continuation: continuation
)
let event = ChannelEvent(
Expand All @@ -241,15 +333,48 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
}
}

private func withSequence(_ packet: Livekit_DataPacket) -> Livekit_DataPacket {
guard packet.kind == .reliable, packet.sequence == 0 else { return packet }
var packet = packet
_state.mutate {
packet.sequence = $0.reliableDataSequence
$0.reliableDataSequence += 1
}
return packet
}

func retryReliable(lastSequence: UInt32) {
let event = ChannelEvent(channelKind: .reliable, detail: .retryRequested(lastSequence))
_state.eventContinuation?.yield(event)
}

// MARK: - Sync state

func infos() -> [Livekit_DataChannelInfo] {
_state.read { [$0.lossy, $0.reliable] }
.compactMap { $0 }
.map { $0.toLKInfoType() }
}

func receiveStates() -> [Livekit_DataChannelReceiveState] {
_state.reliableReceivedState.map { sid, seq in
Livekit_DataChannelReceiveState.with {
$0.publisherSid = sid
$0.lastSeq = seq
}
}
}

// MARK: - Constants

private static let reliableLowThreshold: UInt64 = 2 * 1024 * 1024 // 2 MB
private static let lossyLowThreshold: UInt64 = reliableLowThreshold

// If rtc drains its buffer to 0, keep at least this amount of data for retry.
// Should be >= the full backpressure amount to avoid losing packets.
private static let reliableRetryAmount: UInt64 = .init(Double(reliableLowThreshold) * 1.25)
private static let reliableReceivedStateTTL: TimeInterval = 30

deinit {
_state.eventContinuation?.finish()
}
Expand All @@ -272,18 +397,30 @@ extension DataChannelPair: LKRTCDataChannelDelegate {
}
}

func dataChannel(_: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) {
func dataChannel(_ dataChannel: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) {
guard let dataPacket = try? Livekit_DataPacket(serializedBytes: buffer.data) else {
log("Could not decode data message", .error)
return
}

if dataChannel.kind == .reliable, dataPacket.sequence > 0, !dataPacket.participantSid.isEmpty {
if let lastSeq = _state.reliableReceivedState[dataPacket.participantSid], dataPacket.sequence <= lastSeq {
log("Ignoring duplicate/out-of-order reliable data message", .warning)
return
}
_state.mutate {
$0.reliableReceivedState[dataPacket.participantSid] = dataPacket.sequence
}
}

delegates.notify {
$0.dataChannel(self, didReceiveDataPacket: dataPacket)
}
}
}

// MARK: - Extensions

private extension DataChannelPair.ChannelKind {
init(_ packetKind: Livekit_DataPacket.Kind) {
guard case .lossy = packetKind else {
Expand Down
9 changes: 7 additions & 2 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ extension Room {
if let identity = localParticipant.identity?.stringValue {
packet.participantIdentity = identity
}
if let sid = localParticipant.sid?.stringValue {
packet.participantSid = sid
}

try await publisherDataChannel.send(dataPacket: packet)
}
Expand Down Expand Up @@ -190,11 +193,12 @@ extension Room {
try await publisherShouldNegotiate()
}

} else if case .reconnect = connectResponse {
} else if case let .reconnect(reconnectResponse) = connectResponse {
log("[Connect] Configuring transports with RECONNECT response...")
let (subscriber, publisher) = _state.read { ($0.subscriber, $0.publisher) }
try await subscriber?.set(configuration: rtcConfiguration)
try await publisher?.set(configuration: rtcConfiguration)
publisherDataChannel.retryReliable(lastSequence: reconnectResponse.lastMessageSeq)
}
}
}
Expand Down Expand Up @@ -455,7 +459,8 @@ extension Room {
offer: previousOffer?.toPBType(),
subscription: subscription,
publishTracks: localParticipant.publishedTracksInfo(),
dataChannels: publisherDataChannel.infos())
dataChannels: publisherDataChannel.infos(),
dataChannelReceiveStates: subscriberDataChannel.receiveStates())
}
}

Expand Down
4 changes: 3 additions & 1 deletion Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,8 @@ extension SignalClient {
offer: Livekit_SessionDescription?,
subscription: Livekit_UpdateSubscription,
publishTracks: [Livekit_TrackPublishedResponse]? = nil,
dataChannels: [Livekit_DataChannelInfo]? = nil) async throws
dataChannels: [Livekit_DataChannelInfo]? = nil,
dataChannelReceiveStates: [Livekit_DataChannelReceiveState]? = nil) async throws
{
let r = Livekit_SignalRequest.with {
$0.syncState = Livekit_SyncState.with {
Expand All @@ -541,6 +542,7 @@ extension SignalClient {
$0.subscription = subscription
$0.publishTracks = publishTracks ?? []
$0.dataChannels = dataChannels ?? []
$0.datachannelReceiveStates = dataChannelReceiveStates ?? []
}
}

Expand Down
Loading
Loading