Skip to content

Commit

Permalink
fix: use actor for DataChannelPair
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroshihorie committed Dec 11, 2023
1 parent 7d8a661 commit 0ad9342
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@ import Foundation

@_implementationOnly import WebRTC

class DataChannelPair: NSObject, Loggable {
actor DataChannelPairActor: NSObject, Loggable {
// MARK: - Types

public typealias OnDataPacket = (_ dataPacket: Livekit_DataPacket) -> Void

// MARK: - Public

public let openCompleter = AsyncCompleter<Void>(label: "Data channel open", timeOut: .defaultPublisherDataChannelOpen)
public var isOpen: Bool { _lock.sync { _isOpen } }

public var isOpen: Bool {
guard let reliable = _reliableChannel, let lossy = _lossyChannel else { return false }
return reliable.readyState == .open && lossy.readyState == .open
}

// MARK: - Private

private let _lock = UnfairLock()
private let _onDataPacket: OnDataPacket?
private var _reliableChannel: LKRTCDataChannel?
private var _lossyChannel: LKRTCDataChannel?
private var _isOpen: Bool {
guard let reliable = _reliableChannel, let lossy = _lossyChannel else { return false }
return reliable.readyState == .open && lossy.readyState == .open
}

public init(reliableChannel: LKRTCDataChannel? = nil,
lossyChannel: LKRTCDataChannel? = nil,
Expand All @@ -49,38 +48,28 @@ class DataChannelPair: NSObject, Loggable {
}

public func set(reliable channel: LKRTCDataChannel?) {
_lock.sync {
_reliableChannel = channel
channel?.delegate = self
_reliableChannel = channel
channel?.delegate = self

if _isOpen {
openCompleter.resume(returning: ())
}
if isOpen {
openCompleter.resume(returning: ())
}
}

public func set(lossy channel: LKRTCDataChannel?) {
_lock.sync {
_lossyChannel = channel
channel?.delegate = self
_lossyChannel = channel
channel?.delegate = self

if _isOpen {
openCompleter.resume(returning: ())
}
if isOpen {
openCompleter.resume(returning: ())
}
}

public func reset() {
_lock.sync {
let reliable = _reliableChannel
let lossy = _lossyChannel

_reliableChannel = nil
_lossyChannel = nil

reliable?.close()
lossy?.close()
}
_reliableChannel?.close()
_lossyChannel?.close()
_reliableChannel = nil
_lossyChannel = nil

openCompleter.reset()
}
Expand All @@ -98,37 +87,32 @@ class DataChannelPair: NSObject, Loggable {
let serializedData = try packet.serializedData()
let rtcData = Engine.createDataBuffer(data: serializedData)

let result = _lock.sync {
switch reliability {
case .reliable: return _reliableChannel?.sendData(rtcData) ?? false
case .lossy: return _lossyChannel?.sendData(rtcData) ?? false
}
}

guard result else {
throw InternalError.state(message: "sendData returned false")
let channel = (reliability == .reliable) ? _reliableChannel : _lossyChannel
guard let sendDataResult = channel?.sendData(rtcData), sendDataResult else {
throw InternalError.state(message: "sendData failed")
}
}

public func infos() -> [Livekit_DataChannelInfo] {
_lock.sync {
[_lossyChannel, _reliableChannel]
.compactMap { $0 }
.map { $0.toLKInfoType() }
}
[_lossyChannel, _reliableChannel]
.compactMap { $0 }
.map { $0.toLKInfoType() }
}
}

// MARK: - RTCDataChannelDelegate

extension DataChannelPair: LKRTCDataChannelDelegate {
func dataChannelDidChangeState(_: LKRTCDataChannel) {
if _lock.sync({ _isOpen }) {
openCompleter.resume(returning: ())
extension DataChannelPairActor: LKRTCDataChannelDelegate {
nonisolated func dataChannelDidChangeState(_: LKRTCDataChannel) {
Task {
if await isOpen {
openCompleter.resume(returning: ())
}
}
}

func dataChannel(_: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) {
nonisolated func dataChannel(_: LKRTCDataChannel, didReceiveMessageWith buffer: LKRTCDataBuffer) {
log("dataChannel(didReceiveMessageWith:)")
guard let dataPacket = try? Livekit_DataPacket(contiguousBytes: buffer.data) else {
log("could not decode data message", .error)
return
Expand Down
12 changes: 7 additions & 5 deletions Sources/LiveKit/Core/Engine+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ extension Engine: TransportDelegate {
func transport(_ transport: Transport, didOpen dataChannel: LKRTCDataChannel) {
log("Server opened data channel \(dataChannel.label)(\(dataChannel.readyState))")

if subscriberPrimary, transport.target == .subscriber {
switch dataChannel.label {
case LKRTCDataChannel.labels.reliable: subscriberDC.set(reliable: dataChannel)
case LKRTCDataChannel.labels.lossy: subscriberDC.set(lossy: dataChannel)
default: log("Unknown data channel label \(dataChannel.label)", .warning)
Task {
if subscriberPrimary, transport.target == .subscriber {
switch dataChannel.label {
case LKRTCDataChannel.labels.reliable: await subscriberDataChannel.set(reliable: dataChannel)
case LKRTCDataChannel.labels.lossy: await subscriberDataChannel.set(lossy: dataChannel)
default: log("Unknown data channel label \(dataChannel.label)", .warning)
}
}
}
}
Expand Down
29 changes: 15 additions & 14 deletions Sources/LiveKit/Core/Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Engine: MulticastDelegate<EngineDelegate> {

// MARK: - DataChannels

lazy var subscriberDC: DataChannelPair = .init(onDataPacket: { [weak self] dataPacket in
lazy var subscriberDataChannel: DataChannelPairActor = .init(onDataPacket: { [weak self] dataPacket in
guard let self else { return }
switch dataPacket.value {
case let .speaker(update): self.notify { $0.engine(self, didUpdate: update.speakers) }
Expand All @@ -73,7 +73,7 @@ class Engine: MulticastDelegate<EngineDelegate> {
}
})

let publisherDC = DataChannelPair()
let publisherDataChannel = DataChannelPairActor()

private var _blockProcessQueue = DispatchQueue(label: "LiveKitSDK.engine.pendingBlocks",
qos: .default)
Expand Down Expand Up @@ -178,8 +178,8 @@ class Engine: MulticastDelegate<EngineDelegate> {
// Resets state of transports
func cleanUpRTC() async {
// Close data channels
publisherDC.reset()
subscriberDC.reset()
await publisherDataChannel.reset()
await subscriberDataChannel.reset()

// Close transports
await publisher?.close()
Expand Down Expand Up @@ -211,17 +211,18 @@ class Engine: MulticastDelegate<EngineDelegate> {
}

try await publisherTransportConnectedCompleter.wait()
try await publisherDC.openCompleter.wait()
try await publisherDataChannel.openCompleter.wait()
}

try await ensurePublisherConnected()

// At this point publisher should be .connected and dc should be .open
assert(publisher?.isConnected ?? false, "publisher is not .connected")
assert(publisherDC.isOpen, "publisher data channel is not .open")
let dataChannelIsOpen = await publisherDataChannel.isOpen
assert(dataChannelIsOpen, "publisher data channel is not .open")

// Should return true if successful
try publisherDC.send(userPacket: userPacket, reliability: reliability)
try await publisherDataChannel.send(userPacket: userPacket, reliability: reliability)
}
}

Expand Down Expand Up @@ -275,17 +276,17 @@ extension Engine {

// data over pub channel for backwards compatibility

let publisherReliableDC = publisher.dataChannel(for: LKRTCDataChannel.labels.reliable,
let reliableDataChannel = publisher.dataChannel(for: LKRTCDataChannel.labels.reliable,
configuration: Engine.createDataChannelConfiguration())

let publisherLossyDC = publisher.dataChannel(for: LKRTCDataChannel.labels.lossy,
let lossyDataChannel = publisher.dataChannel(for: LKRTCDataChannel.labels.lossy,
configuration: Engine.createDataChannelConfiguration(maxRetransmits: 0))

publisherDC.set(reliable: publisherReliableDC)
publisherDC.set(lossy: publisherLossyDC)
await publisherDataChannel.set(reliable: reliableDataChannel)
await publisherDataChannel.set(lossy: lossyDataChannel)

log("dataChannel.\(String(describing: publisherReliableDC?.label)) : \(String(describing: publisherReliableDC?.channelId))")
log("dataChannel.\(String(describing: publisherLossyDC?.label)) : \(String(describing: publisherLossyDC?.channelId))")
log("dataChannel.\(String(describing: reliableDataChannel?.label)) : \(String(describing: reliableDataChannel?.channelId))")
log("dataChannel.\(String(describing: lossyDataChannel?.label)) : \(String(describing: lossyDataChannel?.channelId))")

if !subscriberPrimary {
// lazy negotiation for protocol v3+
Expand Down Expand Up @@ -508,7 +509,7 @@ extension Engine {
try await signalClient.sendSyncState(answer: previousAnswer.toPBType(),
offer: previousOffer?.toPBType(),
subscription: subscription, publishTracks: room.localParticipant.publishedTracksInfo(),
dataChannels: publisherDC.infos())
dataChannels: publisherDataChannel.infos())
}
}

Expand Down

0 comments on commit 0ad9342

Please sign in to comment.