Skip to content

Commit

Permalink
Merge branch 'main' into hiroshi/prepare-connection
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroshihorie authored Sep 9, 2024
2 parents 4bb928f + 4ca5fa2 commit 02a6f47
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 80 deletions.
11 changes: 4 additions & 7 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,9 @@ extension Room {

} else if case .reconnect = connectResponse {
log("[Connect] Configuring transports with RECONNECT response...")
guard let subscriber = _state.subscriber, let publisher = _state.publisher else {
log("[Connect] Subscriber or Publisher is nil", .error)
return
}

try await subscriber.set(configuration: rtcConfiguration)
try await publisher.set(configuration: rtcConfiguration)
let (subscriber, publisher) = _state.read { ($0.subscriber, $0.publisher) }
try await subscriber?.set(configuration: rtcConfiguration)
try await publisher?.set(configuration: rtcConfiguration)
}
}
}
Expand Down Expand Up @@ -295,6 +291,7 @@ extension Room {
token,
connectOptions: _state.connectOptions,
reconnectMode: _state.isReconnectingWithMode,
participantSid: localParticipant.sid,
adaptiveStream: _state.roomOptions.adaptiveStream)
try Task.checkCancellation()

Expand Down
38 changes: 22 additions & 16 deletions Sources/LiveKit/Core/Room+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extension RTCPeerConnectionState {
}

extension Room: TransportDelegate {
func transport(_ transport: Transport, didUpdateState pcState: RTCPeerConnectionState) async {
func transport(_ transport: Transport, didUpdateState pcState: RTCPeerConnectionState) {
log("target: \(transport.target), connectionState: \(pcState.description)")

// primary connected
Expand All @@ -57,25 +57,29 @@ extension Room: TransportDelegate {
if _state.connectionState == .connected {
// Attempt re-connect if primary or publisher transport failed
if transport.isPrimary || (_state.hasPublished && transport.target == .publisher), pcState.isDisconnected {
do {
try await startReconnect(reason: .transport)
} catch {
log("Failed calling startReconnect, error: \(error)", .error)
Task {
do {
try await startReconnect(reason: .transport)
} catch {
log("Failed calling startReconnect, error: \(error)", .error)
}
}
}
}
}

func transport(_ transport: Transport, didGenerateIceCandidate iceCandidate: LKRTCIceCandidate) async {
do {
log("sending iceCandidate")
try await signalClient.sendCandidate(candidate: iceCandidate, target: transport.target)
} catch {
log("Failed to send iceCandidate, error: \(error)", .error)
func transport(_ transport: Transport, didGenerateIceCandidate iceCandidate: LKRTCIceCandidate) {
Task {
do {
log("sending iceCandidate")
try await signalClient.sendCandidate(candidate: iceCandidate, target: transport.target)
} catch {
log("Failed to send iceCandidate, error: \(error)", .error)
}
}
}

func transport(_ transport: Transport, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream]) async {
func transport(_ transport: Transport, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream]) {
guard !streams.isEmpty else {
log("Received onTrack with no streams!", .warning)
return
Expand All @@ -95,13 +99,15 @@ extension Room: TransportDelegate {
}
}

func transport(_ transport: Transport, didRemoveTrack track: LKRTCMediaStreamTrack) async {
func transport(_ transport: Transport, didRemoveTrack track: LKRTCMediaStreamTrack) {
if transport.target == .subscriber {
await engine(self, didRemoveTrack: track)
Task {
await engine(self, didRemoveTrack: track)
}
}
}

func transport(_ transport: Transport, didOpenDataChannel dataChannel: LKRTCDataChannel) async {
func transport(_ transport: Transport, didOpenDataChannel dataChannel: LKRTCDataChannel) {
log("Server opened data channel \(dataChannel.label)(\(dataChannel.readyState))")

if _state.isSubscriberPrimary, transport.target == .subscriber {
Expand All @@ -113,5 +119,5 @@ extension Room: TransportDelegate {
}
}

func transportShouldNegotiate(_: Transport) async {}
func transportShouldNegotiate(_: Transport) {}
}
4 changes: 3 additions & 1 deletion Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ actor SignalClient: Loggable {
_ token: String,
connectOptions: ConnectOptions? = nil,
reconnectMode: ReconnectMode? = nil,
participantSid: Participant.Sid? = nil,
adaptiveStream: Bool) async throws -> ConnectResponse
{
await cleanUp()
Expand All @@ -126,6 +127,7 @@ actor SignalClient: Loggable {
token,
connectOptions: connectOptions,
reconnectMode: reconnectMode,
participantSid: participantSid,
adaptiveStream: adaptiveStream)

if reconnectMode != nil {
Expand All @@ -148,7 +150,6 @@ actor SignalClient: Loggable {
} catch {
await self.cleanUp(withError: error)
}
self.log("Did exit WebSocket message loop...")
}

let connectResponse = try await _connectResponseCompleter.wait()
Expand Down Expand Up @@ -179,6 +180,7 @@ actor SignalClient: Loggable {
let validateUrl = try Utils.buildUrl(url,
token,
connectOptions: connectOptions,
participantSid: participantSid,
adaptiveStream: adaptiveStream,
validate: true)

Expand Down
16 changes: 8 additions & 8 deletions Sources/LiveKit/Core/Transport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ actor Transport: NSObject, Loggable {

// MARK: - Private

private let _delegate = AsyncSerialDelegate<TransportDelegate>()
private let _delegate = MulticastDelegate<TransportDelegate>(label: "TransportDelegate")
private let _debounce = Debounce(delay: 0.1)

private var _reNegotiate: Bool = false
Expand Down Expand Up @@ -93,7 +93,7 @@ actor Transport: NSObject, Loggable {
log()

_pc.delegate = self
_delegate.set(delegate: delegate)
_delegate.add(delegate: delegate)
}

deinit {
Expand Down Expand Up @@ -218,16 +218,16 @@ extension Transport {
extension Transport: LKRTCPeerConnectionDelegate {
nonisolated func peerConnection(_: LKRTCPeerConnection, didChange state: RTCPeerConnectionState) {
log("[Connect] Transport(\(target)) did update state: \(state.description)")
_delegate.notifyDetached { await $0.transport(self, didUpdateState: state) }
_delegate.notify { $0.transport(self, didUpdateState: state) }
}

nonisolated func peerConnection(_: LKRTCPeerConnection, didGenerate candidate: LKRTCIceCandidate) {
_delegate.notifyDetached { await $0.transport(self, didGenerateIceCandidate: candidate) }
_delegate.notify { $0.transport(self, didGenerateIceCandidate: candidate) }
}

nonisolated func peerConnectionShouldNegotiate(_: LKRTCPeerConnection) {
log("ShouldNegotiate for \(target)")
_delegate.notifyDetached { await $0.transportShouldNegotiate(self) }
_delegate.notify { $0.transportShouldNegotiate(self) }
}

nonisolated func peerConnection(_: LKRTCPeerConnection, didAdd rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream]) {
Expand All @@ -237,7 +237,7 @@ extension Transport: LKRTCPeerConnectionDelegate {
}

log("type: \(type(of: track)), track.id: \(track.trackId), streams: \(streams.map { "Stream(hash: \($0.hash), id: \($0.streamId), videoTracks: \($0.videoTracks.count), audioTracks: \($0.audioTracks.count))" })")
_delegate.notifyDetached { await $0.transport(self, didAddTrack: track, rtpReceiver: rtpReceiver, streams: streams) }
_delegate.notify { $0.transport(self, didAddTrack: track, rtpReceiver: rtpReceiver, streams: streams) }
}

nonisolated func peerConnection(_: LKRTCPeerConnection, didRemove rtpReceiver: LKRTCRtpReceiver) {
Expand All @@ -247,12 +247,12 @@ extension Transport: LKRTCPeerConnectionDelegate {
}

log("didRemove track: \(track.trackId)")
_delegate.notifyDetached { await $0.transport(self, didRemoveTrack: track) }
_delegate.notify { $0.transport(self, didRemoveTrack: track) }
}

nonisolated func peerConnection(_: LKRTCPeerConnection, didOpen dataChannel: LKRTCDataChannel) {
log("Received data channel \(dataChannel.label) for \(target)")
_delegate.notifyDetached { await $0.transport(self, didOpenDataChannel: dataChannel) }
_delegate.notify { $0.transport(self, didOpenDataChannel: dataChannel) }
}

nonisolated func peerConnection(_: LKRTCPeerConnection, didChange _: RTCIceConnectionState) {}
Expand Down
12 changes: 6 additions & 6 deletions Sources/LiveKit/Protocols/TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ internal import LiveKitWebRTC
#endif

protocol TransportDelegate: AnyObject {
func transport(_ transport: Transport, didUpdateState state: RTCPeerConnectionState) async
func transport(_ transport: Transport, didGenerateIceCandidate iceCandidate: LKRTCIceCandidate) async
func transport(_ transport: Transport, didOpenDataChannel dataChannel: LKRTCDataChannel) async
func transport(_ transport: Transport, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream]) async
func transport(_ transport: Transport, didRemoveTrack track: LKRTCMediaStreamTrack) async
func transportShouldNegotiate(_ transport: Transport) async
func transport(_ transport: Transport, didUpdateState state: RTCPeerConnectionState)
func transport(_ transport: Transport, didGenerateIceCandidate iceCandidate: LKRTCIceCandidate)
func transport(_ transport: Transport, didOpenDataChannel dataChannel: LKRTCDataChannel)
func transport(_ transport: Transport, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream])
func transport(_ transport: Transport, didRemoveTrack track: LKRTCMediaStreamTrack)
func transportShouldNegotiate(_ transport: Transport)
}
8 changes: 4 additions & 4 deletions Sources/LiveKit/Support/AsyncCompleter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ class AsyncCompleter<T>: Loggable {
// Already resolved...
if case let .success(value) = result {
// resume(returning:) already called
log("\(label) returning value...")
log("\(label) returning existing value")
return value
} else if case let .failure(error) = result {
// resume(throwing:) already called
log("\(label) throwing error...")
log("\(label) throwing existing error")
throw error
}
}
Expand All @@ -166,7 +166,7 @@ class AsyncCompleter<T>: Loggable {
// Create time-out block
let timeoutBlock = DispatchWorkItem { [weak self] in
guard let self else { return }
self.log("Wait \(entryId) timedOut")
self.log("\(self.label) id: \(entryId) timed out")
self._lock.sync {
if let entry = self._entries[entryId] {
entry.timeout()
Expand All @@ -182,7 +182,7 @@ class AsyncCompleter<T>: Loggable {
// Store entry
_entries[entryId] = WaitEntry(continuation: continuation, timeoutBlock: timeoutBlock)

log("\(label) waiting \(computedTimeout) with id: \(entryId)")
log("\(label) id: \(entryId) waiting for \(computedTimeout)")
}
}
} onCancel: {
Expand Down
1 change: 0 additions & 1 deletion Sources/LiveKit/Support/AsyncTimer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class AsyncTimer: Loggable {
$0.isStarted = false
$0.task?.cancel()
}
log(nil, .debug)
}

func cancel() {
Expand Down
13 changes: 12 additions & 1 deletion Sources/LiveKit/Support/SerialRunnerActor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,26 @@ actor SerialRunnerActor<Value: Sendable> {

func run(block: @Sendable @escaping () async throws -> Value) async throws -> Value {
let task = Task { [previousTask] in
let _ = try? await previousTask?.value
// Wait for the previous task to complete, but cancel it if needed
if let previousTask, !Task.isCancelled {
// If previous task is still running, wait for it
_ = try? await previousTask.value
}

// Check for cancellation before running the block
try Task.checkCancellation()

// Run the new block
return try await block()
}

previousTask = task

return try await withTaskCancellationHandler {
// Await the current task's result
try await task.value
} onCancel: {
// Ensure the task is canceled when requested
task.cancel()
}
}
Expand Down
19 changes: 17 additions & 2 deletions Sources/LiveKit/Support/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ typealias DebouncFunc = () -> Void
enum OS {
case macOS
case iOS
case visionOS
case tvOS
}

extension OS: CustomStringConvertible {
var description: String {
switch self {
case .macOS: return "macOS"
case .iOS: return "iOS"
case .visionOS: return "visionOS"
case .tvOS: return "tvOS"
}
}
}
Expand All @@ -56,10 +60,14 @@ class Utils {

/// Returns current OS.
static func os() -> OS {
#if os(iOS) || os(visionOS) || os(tvOS)
#if os(iOS)
.iOS
#elseif os(macOS)
.macOS
#elseif os(visionOS)
.visionOS
#elseif os(tvOS)
.tvOS
#endif
}

Expand Down Expand Up @@ -132,6 +140,7 @@ class Utils {
_ token: String,
connectOptions: ConnectOptions? = nil,
reconnectMode: ReconnectMode? = nil,
participantSid: Participant.Sid? = nil,
adaptiveStream: Bool,
validate: Bool = false,
forceSecure: Bool = false
Expand Down Expand Up @@ -190,7 +199,13 @@ class Utils {
}

// only for quick-reconnect
queryItems.append(URLQueryItem(name: "reconnect", value: reconnectMode == .quick ? "1" : "0"))
if reconnectMode == .quick {
queryItems.append(URLQueryItem(name: "reconnect", value: "1"))
if let sid = participantSid {
queryItems.append(URLQueryItem(name: "sid", value: sid.stringValue))
}
}

queryItems.append(URLQueryItem(name: "auto_subscribe", value: connectOptions.autoSubscribe ? "1" : "0"))
queryItems.append(URLQueryItem(name: "adaptive_stream", value: adaptiveStream ? "1" : "0"))

Expand Down
6 changes: 2 additions & 4 deletions Sources/LiveKit/Track/AudioManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,16 @@ public class AudioManager: Loggable {
}

func trackDidStart(_ type: Type) {
// async mutation
_state.mutate { state in
if type == .local { state.localTracksCount += 1 }
if type == .remote { state.remoteTracksCount += 1 }
}
}

func trackDidStop(_ type: Type) {
// async mutation
_state.mutate { state in
if type == .local { state.localTracksCount -= 1 }
if type == .remote { state.remoteTracksCount -= 1 }
if type == .local { state.localTracksCount = max(state.localTracksCount - 1, 0) }
if type == .remote { state.remoteTracksCount = max(state.remoteTracksCount - 1, 0) }
}
}

Expand Down
28 changes: 10 additions & 18 deletions Sources/LiveKit/Track/Local/LocalAudioTrack.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,31 +73,23 @@ public class LocalAudioTrack: Track, LocalTrack, AudioTrack {
captureOptions: options)
}

@discardableResult
override func onPublish() async throws -> Bool {
let didPublish = try await super.onPublish()
if didPublish {
AudioManager.shared.trackDidStart(.local)
}
return didPublish
}

@discardableResult
override func onUnpublish() async throws -> Bool {
let didUnpublish = try await super.onUnpublish()
if didUnpublish {
AudioManager.shared.trackDidStop(.local)
}
return didUnpublish
}

public func mute() async throws {
try await super._mute()
}

public func unmute() async throws {
try await super._unmute()
}

// MARK: - Internal

override func startCapture() async throws {
AudioManager.shared.trackDidStart(.local)
}

override func stopCapture() async throws {
AudioManager.shared.trackDidStop(.local)
}
}

public extension LocalAudioTrack {
Expand Down
Loading

0 comments on commit 02a6f47

Please sign in to comment.