Skip to content

Commit

Permalink
data channel thread safety optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroshihorie committed Dec 11, 2023
1 parent 7a371b6 commit be48ea9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 63 deletions.
93 changes: 45 additions & 48 deletions Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,82 +19,77 @@ import Foundation
@_implementationOnly import WebRTC

class DataChannelPair: NSObject, Loggable {
// MARK: - Public
// MARK: - Types

public typealias OnDataPacket = (_ dataPacket: Livekit_DataPacket) -> Void

public let target: Livekit_SignalTarget
public var onDataPacket: OnDataPacket?
// MARK: - Public

public private(set) var openCompleter = AsyncCompleter<Void>(label: "Data channel open", timeOut: .defaultPublisherDataChannelOpen)
public let openCompleter = AsyncCompleter<Void>(label: "Data channel open", timeOut: .defaultPublisherDataChannelOpen)
public var isOpen: Bool { _lock.sync { _isOpen } }

// MARK: - Private

private let _lock = UnfairLock()
private let _onDataPacket: OnDataPacket?
private var _reliableChannel: LKRTCDataChannel?
private var _lossyChannel: LKRTCDataChannel?

public var isOpen: Bool {
guard let reliable = _reliableChannel,
let lossy = _lossyChannel
else {
return false
}

private var _isOpen: Bool {
guard let reliable = _reliableChannel, let lossy = _lossyChannel else { return false }
return reliable.readyState == .open && lossy.readyState == .open
}

public init(target: Livekit_SignalTarget,
reliableChannel: LKRTCDataChannel? = nil,
lossyChannel: LKRTCDataChannel? = nil)
public init(reliableChannel: LKRTCDataChannel? = nil,
lossyChannel: LKRTCDataChannel? = nil,
onDataPacket: OnDataPacket? = nil)
{
self.target = target
_reliableChannel = reliableChannel
_lossyChannel = lossyChannel
_onDataPacket = onDataPacket
}

public func set(reliable channel: LKRTCDataChannel?) {
_reliableChannel = channel
channel?.delegate = self
_lock.sync {
_reliableChannel = channel
channel?.delegate = self

if isOpen {
openCompleter.resume(returning: ())
if _isOpen {
openCompleter.resume(returning: ())
}
}
}

public func set(lossy channel: LKRTCDataChannel?) {
_lossyChannel = channel
channel?.delegate = self
_lock.sync {
_lossyChannel = channel
channel?.delegate = self

if isOpen {
openCompleter.resume(returning: ())
if _isOpen {
openCompleter.resume(returning: ())
}
}
}

public func close() {
let reliable = _reliableChannel
let lossy = _lossyChannel

_reliableChannel = nil
_lossyChannel = nil
public func reset() {
_lock.sync {
let reliable = _reliableChannel
let lossy = _lossyChannel

openCompleter.reset()
_reliableChannel = nil
_lossyChannel = nil

// execute on .webRTC queue
DispatchQueue.liveKitWebRTC.sync {
reliable?.close()
lossy?.close()
}

openCompleter.reset()
}

public func send(userPacket: Livekit_UserPacket, reliability: Reliability) throws {
guard let reliableChannel = _reliableChannel,
let lossyChannel = _lossyChannel
else {
throw InternalError.state(message: "Data channel is nil")
guard isOpen else {
throw InternalError.state(message: "Data channel is not open")
}

// prepare the data

let packet = Livekit_DataPacket.with {
$0.kind = reliability.toPBType()
$0.user = userPacket
Expand All @@ -103,30 +98,32 @@ class DataChannelPair: NSObject, Loggable {
let serializedData = try packet.serializedData()
let rtcData = Engine.createDataBuffer(data: serializedData)

let result = { () -> Bool in
let result = _lock.sync {
switch reliability {
case .reliable: return reliableChannel.sendData(rtcData)
case .lossy: return lossyChannel.sendData(rtcData)
case .reliable: return _reliableChannel?.sendData(rtcData) ?? false
case .lossy: return _lossyChannel?.sendData(rtcData) ?? false
}
}()
}

guard result else {
throw InternalError.state(message: "sendData returned false")
}
}

public func infos() -> [Livekit_DataChannelInfo] {
[_lossyChannel, _reliableChannel]
.compactMap { $0 }
.map { $0.toLKInfoType() }
_lock.sync {
[_lossyChannel, _reliableChannel]
.compactMap { $0 }
.map { $0.toLKInfoType() }
}
}
}

// MARK: - RTCDataChannelDelegate

extension DataChannelPair: LKRTCDataChannelDelegate {
func dataChannelDidChangeState(_: LKRTCDataChannel) {
if isOpen {
if _lock.sync({ _isOpen }) {
openCompleter.resume(returning: ())
}
}
Expand All @@ -137,6 +134,6 @@ extension DataChannelPair: LKRTCDataChannelDelegate {
return
}

onDataPacket?(dataPacket)
_onDataPacket?(dataPacket)
}
}
27 changes: 12 additions & 15 deletions Sources/LiveKit/Core/Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,16 @@ class Engine: MulticastDelegate<EngineDelegate> {

// MARK: - DataChannels

public internal(set) var subscriberDC = DataChannelPair(target: .subscriber)
public internal(set) var publisherDC = DataChannelPair(target: .publisher)
lazy var subscriberDC: DataChannelPair = .init(onDataPacket: { [weak self] dataPacket in
guard let self else { return }
switch dataPacket.value {
case let .speaker(update): self.notify { $0.engine(self, didUpdate: update.speakers) }
case let .user(userPacket): self.notify { $0.engine(self, didReceive: userPacket) }
default: return
}
})

let publisherDC = DataChannelPair()

private var _blockProcessQueue = DispatchQueue(label: "LiveKitSDK.engine.pendingBlocks",
qos: .default)
Expand Down Expand Up @@ -114,17 +122,6 @@ class Engine: MulticastDelegate<EngineDelegate> {
}
}
}

subscriberDC.onDataPacket = { [weak self] (dataPacket: Livekit_DataPacket) in

guard let self else { return }

switch dataPacket.value {
case let .speaker(update): self.notify { $0.engine(self, didUpdate: update.speakers) }
case let .user(userPacket): self.notify { $0.engine(self, didReceive: userPacket) }
default: return
}
}
}

deinit {
Expand Down Expand Up @@ -181,8 +178,8 @@ class Engine: MulticastDelegate<EngineDelegate> {
// Resets state of transports
func cleanUpRTC() async {
// Close data channels
publisherDC.close()
subscriberDC.close()
publisherDC.reset()
subscriberDC.reset()

// Close transports
await publisher?.close()
Expand Down

0 comments on commit be48ea9

Please sign in to comment.