Skip to content

Commit

Permalink
Actor pattern for SignalClient's request / response queue (livekit#269
Browse files Browse the repository at this point in the history
)

* impl

* doc
  • Loading branch information
hiroshihorie authored Oct 31, 2023
1 parent 33b32f8 commit ec42d30
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 79 deletions.
6 changes: 3 additions & 3 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ internal extension Room {
)
}

engine.signalClient.cleanUp(reason: reason)

return engine.cleanUpRTC().then(on: queue) {
return promise(from: engine.signalClient.cleanUp, param1: reason).then(on: queue) {
self.engine.cleanUpRTC()
}.then(on: queue) {
self.cleanUpParticipants()
}.then(on: queue) {
// reset state
Expand Down
105 changes: 30 additions & 75 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,9 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

// MARK: - Private

private enum QueueState {
case resumed
case suspended
}

// queue to store requests while reconnecting
private var requestQueue = [Livekit_SignalRequest]()
private var responseQueue = [Livekit_SignalResponse]()

private let requestDispatchQueue = DispatchQueue(label: "LiveKitSDK.signalClient.requestQueue", qos: .default)
private let responseDispatchQueue = DispatchQueue(label: "LiveKitSDK.signalClient.responseQueue", qos: .default)

private var responseQueueState: QueueState = .resumed
// Queue to store requests while reconnecting
private let _requestQueue = AsyncQueueActor<Livekit_SignalRequest>()
private var _responseQueue = AsyncQueueActor<Livekit_SignalResponse>()

private var _webSocket: WebSocket?
private var latestJoinResponse: Livekit_JoinResponse?
Expand Down Expand Up @@ -89,7 +79,7 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
reconnectMode: ReconnectMode? = nil,
adaptiveStream: Bool) async throws {

cleanUp()
await cleanUp()

log("reconnectMode: \(String(describing: reconnectMode))")

Expand Down Expand Up @@ -123,14 +113,17 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
self.onWebSocketMessage(message: message)
}
} catch {
//
self.cleanUp(reason: .networkError(error))
await self.cleanUp(reason: .networkError(error))
}
self.log("Did exit WebSocket message loop...")
}
} catch let error {

defer { cleanUp(reason: .networkError(error)) }
defer {
Task {
await cleanUp(reason: .networkError(error))
}
}

// Skip validation if reconnect mode
if reconnectMode != nil { throw error }
Expand All @@ -152,7 +145,7 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
}
}

func cleanUp(reason: DisconnectReason? = nil) {
func cleanUp(reason: DisconnectReason? = nil) async {

log("reason: \(String(describing: reason))")

Expand Down Expand Up @@ -180,16 +173,8 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
$0 = State()
}

requestDispatchQueue.async { [weak self] in
guard let self = self else { return }
self.requestQueue = []
}

responseDispatchQueue.async { [weak self] in
guard let self = self else { return }
self.responseQueue = []
self.responseQueueState = .resumed
}
await _requestQueue.clear()
await _responseQueue.clear()
}

func resumeCompleter(forAddTrackRequest trackCid: String, trackInfo: Livekit_TrackInfo) {
Expand Down Expand Up @@ -224,12 +209,9 @@ private extension SignalClient {
// send request or enqueue while reconnecting
func sendRequest(_ request: Livekit_SignalRequest, enqueueIfReconnecting: Bool = true) async throws {

// on: requestDispatchQueue

guard !(_state.connectionState.isReconnecting && request.canEnqueue() && enqueueIfReconnecting) else {
log("queuing request while reconnecting, request: \(request)")
requestQueue.append(request)
// success
log("Queuing request while reconnecting, request: \(request)")
await _requestQueue.enqueue(request)
return
}

Expand Down Expand Up @@ -267,17 +249,12 @@ private extension SignalClient {
return
}

responseDispatchQueue.async {
if case .suspended = self.responseQueueState {
self.log("Enqueueing response: \(response)")
self.responseQueue.append(response)
} else {
self.onSignalResponse(response)
}
Task {
await _responseQueue.enqueue(response) { await processSignalResponse($0) }
}
}

func onSignalResponse(_ response: Livekit_SignalResponse) {
func processSignalResponse(_ response: Livekit_SignalResponse) async {

guard case .connected = connectionState else {
log("Not connected", .warning)
Expand All @@ -291,7 +268,7 @@ private extension SignalClient {

switch message {
case .join(let joinResponse):
responseQueueState = .suspended
await _responseQueue.suspend()
latestJoinResponse = joinResponse
restartPingTimer()
notify { $0.signalClient(self, didReceive: joinResponse) }
Expand Down Expand Up @@ -370,23 +347,9 @@ internal extension SignalClient {

func resumeResponseQueue() async throws {

// on: responseDispatchQueue

defer { responseQueueState = .resumed }

// Quickly return if no queued requests
guard !responseQueue.isEmpty else {
self.log("No queued response")
return
await _responseQueue.resume { response in
await processSignalResponse(response)
}

// Run responses in sequence
for response in responseQueue {
onSignalResponse(response)
}

// Clear the queue
responseQueue = []
}
}

Expand All @@ -396,25 +359,13 @@ internal extension SignalClient {

func sendQueuedRequests() async throws {

// on: requestDispatchQueue

// Return if no queued requests
guard !requestQueue.isEmpty else {
log("No queued requests")
return
}

// Send requests in sequential order
for request in requestQueue {
await _requestQueue.resume { element in
do {
try await sendRequest(request, enqueueIfReconnecting: false)
try await sendRequest(element, enqueueIfReconnecting: false)
} catch let error {
log("Failed to send queued request \(request) with error: \(error)", .error)
log("Failed to send queued request \(element) with error: \(error)", .error)
}
}

// Clear the queue
requestQueue = []
}

func sendOffer(offer: LKRTCSessionDescription) async throws {
Expand Down Expand Up @@ -623,7 +574,9 @@ internal extension SignalClient {

defer {
if shouldDisconnect {
cleanUp(reason: .networkError(NetworkError.disconnected(message: "Simulate scenario")))
Task {
await cleanUp(reason: .networkError(NetworkError.disconnected(message: "Simulate scenario")))
}
}
}

Expand Down Expand Up @@ -658,7 +611,9 @@ private extension SignalClient {
timer.handler = { [weak self] in
guard let self = self else { return }
self.log("ping/pong timed out", .error)
self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut()))
Task {
await self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut()))
}
}
timer.resume()
return timer
Expand Down
1 change: 0 additions & 1 deletion Sources/LiveKit/Support/AsyncCompleter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

import Foundation
import Promises

internal enum AsyncCompleterError: LiveKitError {
case timedOut
Expand Down
61 changes: 61 additions & 0 deletions Sources/LiveKit/Support/AsyncQueueActor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2023 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

internal actor AsyncQueueActor<T> {

public enum State {
case resumed
case suspended
}

public private(set) var state: State = .resumed
private var queue = [T]()

/// Mark as `.suspended`.
func suspend() {
state = .suspended
}

func enqueue(_ value: T) {
queue.append(value)
}

/// Only enqueue if `.suspended` state, otherwise process immediately.
func enqueue(_ value: T, ifResumed process: (T) async -> Void) async {
if case .suspended = state {
queue.append(value)
} else {
await process(value)
}
}

func clear() {
queue.removeAll()
state = .resumed
}

/// Mark as `.resumed` and process each element with an async `block`.
func resume(_ block: (T) async -> Void) async {
state = .resumed
if queue.isEmpty { return }
for element in queue {
await block(element)
}
queue.removeAll()
}
}

0 comments on commit ec42d30

Please sign in to comment.