From 356fefc16b55eba0562ca4806c4685876715667f Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 12 Jul 2024 02:20:56 +0900 Subject: [PATCH 01/11] RegionUrlProvider --- Sources/LiveKit/Errors.swift | 4 + .../LiveKit/Support/RegionUrlProvider.swift | 128 ++++++++++++++++++ Sources/LiveKit/Support/Utils.swift | 25 ++++ 3 files changed, 157 insertions(+) create mode 100644 Sources/LiveKit/Support/RegionUrlProvider.swift diff --git a/Sources/LiveKit/Errors.swift b/Sources/LiveKit/Errors.swift index beb9e5960..44554589f 100644 --- a/Sources/LiveKit/Errors.swift +++ b/Sources/LiveKit/Errors.swift @@ -50,6 +50,10 @@ public enum LiveKitErrorType: Int { case captureFormatNotFound = 702 case unableToResolveFPSRange = 703 case capturerDimensionsNotResolved = 704 + + // LiveKit Cloud + case onlyForCloud = 901 + case regionUrlProvider = 902 } extension LiveKitErrorType: CustomStringConvertible { diff --git a/Sources/LiveKit/Support/RegionUrlProvider.swift b/Sources/LiveKit/Support/RegionUrlProvider.swift new file mode 100644 index 000000000..6c58ceb10 --- /dev/null +++ b/Sources/LiveKit/Support/RegionUrlProvider.swift @@ -0,0 +1,128 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +class RegionUrlProvider: Loggable { + static let settingsCacheTime: TimeInterval = 3000 + + private struct State { + var serverUrl: URL + var token: String + var regionSettings: Livekit_RegionSettings? + var regionSettingsUpdated: Date? + var attemptedRegions: [Livekit_RegionInfo] = [] + } + + private let _state: StateSync + + public var serverUrl: URL { + _state.mutate { $0.serverUrl } + } + + init(url: String, token: String) { + let serverUrl = URL(string: url)! + _state = StateSync(State(serverUrl: serverUrl, token: token)) + } + + func set(regionSettings: Livekit_RegionSettings) { + _state.mutate { + $0.regionSettings = regionSettings + $0.regionSettingsUpdated = Date() + } + } + + func set(token: String) { + _state.mutate { $0.token = token } + } + + func resetAttempts() { + _state.mutate { + $0.attemptedRegions = [] + } + } + + func shouldRequestRegionSettings() -> Bool { + _state.read { + guard $0.regionSettings != nil, let regionSettingsUpdated = $0.regionSettingsUpdated else { + return true + } + + let interval = Date().timeIntervalSince(regionSettingsUpdated) + return interval > Self.settingsCacheTime + } + } + + func nextBestRegionUrl() async throws -> URL? { + if shouldRequestRegionSettings() { + try await requestRegionSettings() + } + + let (regionSettings, attemptedRegions) = _state.read { ($0.regionSettings, $0.attemptedRegions) } + + guard let regionSettings else { + return nil + } + + let remainingRegions = regionSettings.regions.filter { region in + !attemptedRegions.contains { $0.url == region.url } + } + + guard let selectedRegion = remainingRegions.first else { + return nil + } + + _state.mutate { + $0.attemptedRegions.append(selectedRegion) + } + + return URL(string: selectedRegion.url) + } + + func requestRegionSettings() async throws { + let (serverUrl, token) = _state.read { ($0.serverUrl, $0.token) } + + // Ensure url is for cloud. + guard serverUrl.isCloud() else { + throw LiveKitError(.onlyForCloud) + } + + var request = URLRequest(url: serverUrl.regionSettingsUrl()) + request.addValue("Bearer \(token)", forHTTPHeaderField: "authorization") + + let (data, response) = try await URLSession.shared.data(for: request) + // Response must be a HTTPURLResponse. + guard let httpResponse = response as? HTTPURLResponse else { + throw LiveKitError(.regionUrlProvider, message: "Failed to fetch region settings") + } + + // Check the status code. + guard httpResponse.isStatusCodeOK else { + throw LiveKitError(.regionUrlProvider, message: "Failed to fetch region settings with status code: \(httpResponse.statusCode)") + } + + do { + // Try to parse the JSON data. + let regionSettings = try Livekit_RegionSettings(jsonUTF8Data: data) + _state.mutate { + $0.regionSettings = regionSettings + $0.regionSettingsUpdated = Date() + } + } catch { + throw LiveKitError(.regionUrlProvider, message: "Failed to parse region settings with error: \(error)") + } + } +} diff --git a/Sources/LiveKit/Support/Utils.swift b/Sources/LiveKit/Support/Utils.swift index 95d12626e..0bb4abdd1 100644 --- a/Sources/LiveKit/Support/Utils.swift +++ b/Sources/LiveKit/Support/Utils.swift @@ -263,3 +263,28 @@ extension MutableCollection { } } } + +extension URL { + /// Checks whether the URL is a LiveKit Cloud URL. + func isCloud() -> Bool { + guard let host else { return false } + return host.hasSuffix(".livekit.cloud") || host.hasSuffix(".livekit.run") + } + + func cloudConfigUrl() -> URL { + var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! + components.scheme = scheme?.replacingOccurrences(of: "ws", with: "http") + components.path = "/settings" + return components.url! + } + + func regionSettingsUrl() -> URL { + cloudConfigUrl().appendingPathComponent("/regions") + } +} + +extension HTTPURLResponse { + var isStatusCodeOK: Bool { + (200 ... 299).contains(statusCode) + } +} From eba68999f09f7b4dd6ec6e288b76321e1778d4ae Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 12 Jul 2024 02:38:57 +0900 Subject: [PATCH 02/11] Tests --- .../LiveKitTests/RegionUrlProviderTests.swift | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 Tests/LiveKitTests/RegionUrlProviderTests.swift diff --git a/Tests/LiveKitTests/RegionUrlProviderTests.swift b/Tests/LiveKitTests/RegionUrlProviderTests.swift new file mode 100644 index 000000000..5fd0edd63 --- /dev/null +++ b/Tests/LiveKitTests/RegionUrlProviderTests.swift @@ -0,0 +1,68 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@testable import LiveKit +import XCTest + +class RegionUrlProviderTests: XCTestCase { + func testResolveUrl() async throws { + // Test data. + let testRegionSettings = Livekit_RegionSettings.with { + $0.regions.append(Livekit_RegionInfo.with { + $0.region = "otokyo1a" + $0.url = "https://example.otokyo1a.production.livekit.cloud" + $0.distance = 32838 + }) + $0.regions.append(Livekit_RegionInfo.with { + $0.region = "dblr1a" + $0.url = "https://example.dblr1a.production.livekit.cloud" + $0.distance = 6_660_301 + }) + $0.regions.append(Livekit_RegionInfo.with { + $0.region = "dsyd1a" + $0.url = "https://example.dsyd1a.production.livekit.cloud" + $0.distance = 7_823_582 + }) + } + + let provider = RegionUrlProvider(url: "wss://test.livekit.cloud", token: "") + + // See if request should be initiated. + XCTAssert(provider.shouldRequestRegionSettings(), "Should require to request region settings") + + // Set test data. + provider.set(regionSettings: testRegionSettings) + + // See if request is not required to be initiated. + XCTAssert(!provider.shouldRequestRegionSettings(), "Should require to request region settings") + + let attempt1 = try await provider.nextBestRegionUrl() + print("Next url: \(String(describing: attempt1))") + XCTAssert(attempt1?.absoluteString == testRegionSettings.regions[0].url) + + let attempt2 = try await provider.nextBestRegionUrl() + print("Next url: \(String(describing: attempt2))") + XCTAssert(attempt2?.absoluteString == testRegionSettings.regions[1].url) + + let attempt3 = try await provider.nextBestRegionUrl() + print("Next url: \(String(describing: attempt3))") + XCTAssert(attempt3?.absoluteString == testRegionSettings.regions[2].url) + + let attempt4 = try await provider.nextBestRegionUrl() + print("Next url: \(String(describing: attempt4))") + XCTAssert(attempt4 == nil) + } +} From 652b87f6c9eaff91e09b870791ad9b00e094ddc7 Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 12 Jul 2024 02:48:48 +0900 Subject: [PATCH 03/11] Test cache interval --- .../LiveKit/Support/RegionUrlProvider.swift | 9 +++++--- .../LiveKitTests/RegionUrlProviderTests.swift | 9 +++++++- Tests/LiveKitTests/Support/Utils.swift | 22 +++++++++++++++++++ 3 files changed, 36 insertions(+), 4 deletions(-) create mode 100644 Tests/LiveKitTests/Support/Utils.swift diff --git a/Sources/LiveKit/Support/RegionUrlProvider.swift b/Sources/LiveKit/Support/RegionUrlProvider.swift index 6c58ceb10..d8fa71bde 100644 --- a/Sources/LiveKit/Support/RegionUrlProvider.swift +++ b/Sources/LiveKit/Support/RegionUrlProvider.swift @@ -17,7 +17,8 @@ import Foundation class RegionUrlProvider: Loggable { - static let settingsCacheTime: TimeInterval = 3000 + // Default + static let defaultCacheInterval: TimeInterval = 3000 private struct State { var serverUrl: URL @@ -28,13 +29,15 @@ class RegionUrlProvider: Loggable { } private let _state: StateSync + public let cacheInterval: TimeInterval public var serverUrl: URL { _state.mutate { $0.serverUrl } } - init(url: String, token: String) { + init(url: String, token: String, cacheInterval: TimeInterval = defaultCacheInterval) { let serverUrl = URL(string: url)! + self.cacheInterval = cacheInterval _state = StateSync(State(serverUrl: serverUrl, token: token)) } @@ -62,7 +65,7 @@ class RegionUrlProvider: Loggable { } let interval = Date().timeIntervalSince(regionSettingsUpdated) - return interval > Self.settingsCacheTime + return interval > cacheInterval } } diff --git a/Tests/LiveKitTests/RegionUrlProviderTests.swift b/Tests/LiveKitTests/RegionUrlProviderTests.swift index 5fd0edd63..beb7f765c 100644 --- a/Tests/LiveKitTests/RegionUrlProviderTests.swift +++ b/Tests/LiveKitTests/RegionUrlProviderTests.swift @@ -19,6 +19,7 @@ import XCTest class RegionUrlProviderTests: XCTestCase { func testResolveUrl() async throws { + let testCacheInterval: TimeInterval = 3 // Test data. let testRegionSettings = Livekit_RegionSettings.with { $0.regions.append(Livekit_RegionInfo.with { @@ -38,7 +39,7 @@ class RegionUrlProviderTests: XCTestCase { }) } - let provider = RegionUrlProvider(url: "wss://test.livekit.cloud", token: "") + let provider = RegionUrlProvider(url: "wss://test.livekit.cloud", token: "", cacheInterval: testCacheInterval) // See if request should be initiated. XCTAssert(provider.shouldRequestRegionSettings(), "Should require to request region settings") @@ -64,5 +65,11 @@ class RegionUrlProviderTests: XCTestCase { let attempt4 = try await provider.nextBestRegionUrl() print("Next url: \(String(describing: attempt4))") XCTAssert(attempt4 == nil) + + // Simulate cache time elapse. + await asyncSleep(for: testCacheInterval) + + // After cache time elapsed, should require to request region settings again. + XCTAssert(provider.shouldRequestRegionSettings(), "Should require to request region settings") } } diff --git a/Tests/LiveKitTests/Support/Utils.swift b/Tests/LiveKitTests/Support/Utils.swift new file mode 100644 index 000000000..b25cffbdc --- /dev/null +++ b/Tests/LiveKitTests/Support/Utils.swift @@ -0,0 +1,22 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +func asyncSleep(for duration: TimeInterval) async { + let nanoseconds = UInt64(duration * Double(NSEC_PER_SEC)) + try? await Task.sleep(nanoseconds: nanoseconds) +} From 14c0ffe91569f98ba3fcb80a5a3ff4d787e0e173 Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 12 Jul 2024 02:55:46 +0900 Subject: [PATCH 04/11] Return socket url by default --- Sources/LiveKit/Support/RegionUrlProvider.swift | 2 +- Sources/LiveKit/Support/Utils.swift | 12 ++++++++++++ Tests/LiveKitTests/RegionUrlProviderTests.swift | 6 +++--- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/Sources/LiveKit/Support/RegionUrlProvider.swift b/Sources/LiveKit/Support/RegionUrlProvider.swift index d8fa71bde..de3141c56 100644 --- a/Sources/LiveKit/Support/RegionUrlProvider.swift +++ b/Sources/LiveKit/Support/RegionUrlProvider.swift @@ -92,7 +92,7 @@ class RegionUrlProvider: Loggable { $0.attemptedRegions.append(selectedRegion) } - return URL(string: selectedRegion.url) + return URL(string: selectedRegion.url)?.toSocketUrl() } func requestRegionSettings() async throws { diff --git a/Sources/LiveKit/Support/Utils.swift b/Sources/LiveKit/Support/Utils.swift index 0bb4abdd1..517b1a373 100644 --- a/Sources/LiveKit/Support/Utils.swift +++ b/Sources/LiveKit/Support/Utils.swift @@ -281,6 +281,18 @@ extension URL { func regionSettingsUrl() -> URL { cloudConfigUrl().appendingPathComponent("/regions") } + + func toSocketUrl() -> URL { + var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! + components.scheme = scheme?.replacingOccurrences(of: "http", with: "ws") + return components.url! + } + + func toHTTPUrl() -> URL { + var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! + components.scheme = scheme?.replacingOccurrences(of: "ws", with: "http") + return components.url! + } } extension HTTPURLResponse { diff --git a/Tests/LiveKitTests/RegionUrlProviderTests.swift b/Tests/LiveKitTests/RegionUrlProviderTests.swift index beb7f765c..6b6388fdb 100644 --- a/Tests/LiveKitTests/RegionUrlProviderTests.swift +++ b/Tests/LiveKitTests/RegionUrlProviderTests.swift @@ -52,15 +52,15 @@ class RegionUrlProviderTests: XCTestCase { let attempt1 = try await provider.nextBestRegionUrl() print("Next url: \(String(describing: attempt1))") - XCTAssert(attempt1?.absoluteString == testRegionSettings.regions[0].url) + XCTAssert(attempt1 == URL(string: testRegionSettings.regions[0].url)?.toSocketUrl()) let attempt2 = try await provider.nextBestRegionUrl() print("Next url: \(String(describing: attempt2))") - XCTAssert(attempt2?.absoluteString == testRegionSettings.regions[1].url) + XCTAssert(attempt2 == URL(string: testRegionSettings.regions[1].url)?.toSocketUrl()) let attempt3 = try await provider.nextBestRegionUrl() print("Next url: \(String(describing: attempt3))") - XCTAssert(attempt3?.absoluteString == testRegionSettings.regions[2].url) + XCTAssert(attempt3 == URL(string: testRegionSettings.regions[2].url)?.toSocketUrl()) let attempt4 = try await provider.nextBestRegionUrl() print("Next url: \(String(describing: attempt4))") From 69cff3e7ff6bd739564abb4f1cce7d41d5149c09 Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Mon, 19 Aug 2024 18:29:43 +0900 Subject: [PATCH 05/11] Fix compile --- Sources/LiveKit/Support/Utils.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Sources/LiveKit/Support/Utils.swift b/Sources/LiveKit/Support/Utils.swift index ec678f9fa..59eba2512 100644 --- a/Sources/LiveKit/Support/Utils.swift +++ b/Sources/LiveKit/Support/Utils.swift @@ -299,6 +299,8 @@ extension HTTPURLResponse { var isStatusCodeOK: Bool { (200 ... 299).contains(statusCode) } +} + func computeAttributesDiff(oldValues: [String: String], newValues: [String: String]) -> [String: String] { let allKeys = Set(oldValues.keys).union(newValues.keys) var diff = [String: String]() From 0799efe5bd285e2222f8e9ae5b3a18a8f7f459fb Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 27 Aug 2024 14:08:57 +0900 Subject: [PATCH 06/11] Change to category --- .../Room+Region.swift} | 113 +++++++----------- Sources/LiveKit/Core/Room.swift | 22 ++-- Sources/LiveKit/Extensions/URL.swift | 29 +++++ Sources/LiveKit/Support/Utils.swift | 31 ----- Sources/LiveKit/Types/RegionInfo.swift | 58 +++++++++ 5 files changed, 147 insertions(+), 106 deletions(-) rename Sources/LiveKit/{Support/RegionUrlProvider.swift => Core/Room+Region.swift} (51%) create mode 100644 Sources/LiveKit/Types/RegionInfo.swift diff --git a/Sources/LiveKit/Support/RegionUrlProvider.swift b/Sources/LiveKit/Core/Room+Region.swift similarity index 51% rename from Sources/LiveKit/Support/RegionUrlProvider.swift rename to Sources/LiveKit/Core/Room+Region.swift index de3141c56..a20b32d02 100644 --- a/Sources/LiveKit/Support/RegionUrlProvider.swift +++ b/Sources/LiveKit/Core/Room+Region.swift @@ -16,96 +16,66 @@ import Foundation -class RegionUrlProvider: Loggable { - // Default - static let defaultCacheInterval: TimeInterval = 3000 +// MARK: - Room+Region - private struct State { - var serverUrl: URL - var token: String - var regionSettings: Livekit_RegionSettings? - var regionSettingsUpdated: Date? - var attemptedRegions: [Livekit_RegionInfo] = [] - } +extension Room { + static let defaultCacheInterval: TimeInterval = 3000 - private let _state: StateSync - public let cacheInterval: TimeInterval + func resolveNextBestRegionUrl() async throws -> URL { + if shouldRequestRegionSettings() { + try await requestRegionSettings() + } - public var serverUrl: URL { - _state.mutate { $0.serverUrl } - } + let (allRegions, failedRegions) = _state.read { ($0.allRegions, $0.failedRegions) } - init(url: String, token: String, cacheInterval: TimeInterval = defaultCacheInterval) { - let serverUrl = URL(string: url)! - self.cacheInterval = cacheInterval - _state = StateSync(State(serverUrl: serverUrl, token: token)) - } + let remainingRegions = allRegions.filter { region in + !failedRegions.contains { $0 == region } + } - func set(regionSettings: Livekit_RegionSettings) { - _state.mutate { - $0.regionSettings = regionSettings - $0.regionSettingsUpdated = Date() + guard let selectedRegion = remainingRegions.first else { + throw LiveKitError(.regionUrlProvider, message: "No more remaining regions.") } - } - func set(token: String) { - _state.mutate { $0.token = token } - } +// _state.mutate { +// $0.allRegions.append(selectedRegion) +// } - func resetAttempts() { - _state.mutate { - $0.attemptedRegions = [] - } + let result = selectedRegion.url.toSocketUrl() + log("[Region] Resolved region url: \(String(describing: result))") + + return result } - func shouldRequestRegionSettings() -> Bool { - _state.read { - guard $0.regionSettings != nil, let regionSettingsUpdated = $0.regionSettingsUpdated else { - return true - } + // MARK: - Private + private func shouldRequestRegionSettings() -> Bool { + _state.read { + guard !$0.allRegions.isEmpty, let regionSettingsUpdated = $0.regionDataUpdated else { return true } let interval = Date().timeIntervalSince(regionSettingsUpdated) - return interval > cacheInterval + return interval > Self.defaultCacheInterval } } - func nextBestRegionUrl() async throws -> URL? { - if shouldRequestRegionSettings() { - try await requestRegionSettings() - } - - let (regionSettings, attemptedRegions) = _state.read { ($0.regionSettings, $0.attemptedRegions) } - - guard let regionSettings else { - return nil - } - - let remainingRegions = regionSettings.regions.filter { region in - !attemptedRegions.contains { $0.url == region.url } - } - - guard let selectedRegion = remainingRegions.first else { - return nil - } + private func requestRegionSettings() async throws { + let (serverUrl, token) = _state.read { ($0.url, $0.token) } - _state.mutate { - $0.attemptedRegions.append(selectedRegion) + guard let serverUrl, let token else { + throw LiveKitError(.invalidState) } - return URL(string: selectedRegion.url)?.toSocketUrl() - } - - func requestRegionSettings() async throws { - let (serverUrl, token) = _state.read { ($0.serverUrl, $0.token) } - // Ensure url is for cloud. guard serverUrl.isCloud() else { throw LiveKitError(.onlyForCloud) } - var request = URLRequest(url: serverUrl.regionSettingsUrl()) + // Make a request which ignores cache. + var request = URLRequest(url: serverUrl.regionSettingsUrl(), + cachePolicy: .reloadIgnoringLocalAndRemoteCacheData) + request.addValue("Bearer \(token)", forHTTPHeaderField: "authorization") + log("[Region] Requesting region settings...") + let (data, response) = try await URLSession.shared.data(for: request) // Response must be a HTTPURLResponse. guard let httpResponse = response as? HTTPURLResponse else { @@ -114,15 +84,24 @@ class RegionUrlProvider: Loggable { // Check the status code. guard httpResponse.isStatusCodeOK else { + log("[Region] Failed to fetch region settings, error: \(String(describing: httpResponse))", .error) throw LiveKitError(.regionUrlProvider, message: "Failed to fetch region settings with status code: \(httpResponse.statusCode)") } do { // Try to parse the JSON data. let regionSettings = try Livekit_RegionSettings(jsonUTF8Data: data) + let allRegions = regionSettings.regions.compactMap { $0.toLKType() } + + if allRegions.isEmpty { + throw LiveKitError(.regionUrlProvider, message: "Fetched region data is empty.") + } + + log("[Region] all regions: \(String(describing: allRegions))") + _state.mutate { - $0.regionSettings = regionSettings - $0.regionSettingsUpdated = Date() + $0.allRegions = allRegions + $0.regionDataUpdated = Date() } } catch { throw LiveKitError(.regionUrlProvider, message: "Failed to parse region settings with error: \(error)") diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 49ee6d95d..4a98a2db1 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -151,6 +151,11 @@ public class Room: NSObject, ObservableObject, Loggable { // Agents var transcriptionReceivedTimes: [String: Date] = [:] + // Region + var regionDataUpdated: Date? + var allRegions: [RegionInfo] = [] + var failedRegions: [RegionInfo] = [] + @discardableResult mutating func updateRemoteParticipant(info: Livekit_ParticipantInfo, room: Room) -> RemoteParticipant { let identity = Participant.Identity(from: info.identity) @@ -315,10 +320,16 @@ public class Room: NSObject, ObservableObject, Loggable { try Task.checkCancellation() - _state.mutate { $0.connectionState = .connecting } + _state.mutate { + $0.url = url + $0.token = token + $0.connectionState = .connecting + } do { - try await fullConnectSequence(url, token) + let regionUrl = try await resolveNextBestRegionUrl() + + try await fullConnectSequence(regionUrl, token) // Connect sequence successful log("Connect sequence completed") @@ -326,12 +337,7 @@ public class Room: NSObject, ObservableObject, Loggable { // Final check if cancelled, don't fire connected events try Task.checkCancellation() - // update internal vars (only if connect succeeded) - _state.mutate { - $0.url = url - $0.token = token - $0.connectionState = .connected - } + _state.mutate { $0.connectionState = .connected } } catch { await cleanUp(withError: error) diff --git a/Sources/LiveKit/Extensions/URL.swift b/Sources/LiveKit/Extensions/URL.swift index 18873b440..fec23f486 100644 --- a/Sources/LiveKit/Extensions/URL.swift +++ b/Sources/LiveKit/Extensions/URL.swift @@ -28,4 +28,33 @@ extension URL { var isSecure: Bool { scheme == "https" || scheme == "wss" } + + /// Checks whether the URL is a LiveKit Cloud URL. + func isCloud() -> Bool { + guard let host else { return false } + return host.hasSuffix(".livekit.cloud") || host.hasSuffix(".livekit.run") + } + + func cloudConfigUrl() -> URL { + var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! + components.scheme = scheme?.replacingOccurrences(of: "ws", with: "http") + components.path = "/settings" + return components.url! + } + + func regionSettingsUrl() -> URL { + cloudConfigUrl().appendingPathComponent("/regions") + } + + func toSocketUrl() -> URL { + var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! + components.scheme = scheme?.replacingOccurrences(of: "http", with: "ws") + return components.url! + } + + func toHTTPUrl() -> URL { + var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! + components.scheme = scheme?.replacingOccurrences(of: "ws", with: "http") + return components.url! + } } diff --git a/Sources/LiveKit/Support/Utils.swift b/Sources/LiveKit/Support/Utils.swift index 5ea5603ad..156934200 100644 --- a/Sources/LiveKit/Support/Utils.swift +++ b/Sources/LiveKit/Support/Utils.swift @@ -268,37 +268,6 @@ extension MutableCollection { } } -extension URL { - /// Checks whether the URL is a LiveKit Cloud URL. - func isCloud() -> Bool { - guard let host else { return false } - return host.hasSuffix(".livekit.cloud") || host.hasSuffix(".livekit.run") - } - - func cloudConfigUrl() -> URL { - var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! - components.scheme = scheme?.replacingOccurrences(of: "ws", with: "http") - components.path = "/settings" - return components.url! - } - - func regionSettingsUrl() -> URL { - cloudConfigUrl().appendingPathComponent("/regions") - } - - func toSocketUrl() -> URL { - var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! - components.scheme = scheme?.replacingOccurrences(of: "http", with: "ws") - return components.url! - } - - func toHTTPUrl() -> URL { - var components = URLComponents(url: self, resolvingAgainstBaseURL: false)! - components.scheme = scheme?.replacingOccurrences(of: "ws", with: "http") - return components.url! - } -} - extension HTTPURLResponse { var isStatusCodeOK: Bool { (200 ... 299).contains(statusCode) diff --git a/Sources/LiveKit/Types/RegionInfo.swift b/Sources/LiveKit/Types/RegionInfo.swift new file mode 100644 index 000000000..a983bf18d --- /dev/null +++ b/Sources/LiveKit/Types/RegionInfo.swift @@ -0,0 +1,58 @@ +/* + * Copyright 2024 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +@objc +public class RegionInfo: NSObject { + let regionId: String + let url: URL + let distance: Int64 + + init?(region: String, url: String, distance: Int64) { + guard let url = URL(string: url) else { return nil } + regionId = region + self.url = url + self.distance = distance + } + + // MARK: - Equal + + override public func isEqual(_ object: Any?) -> Bool { + guard let other = object as? Self else { return false } + return regionId == other.regionId + } + + override public var hash: Int { + var hasher = Hasher() + hasher.combine(regionId) + return hasher.finalize() + } + + // + + override public var description: String { + "RegionInfo(id: \(regionId), url: \(url), distance: \(distance))" + } +} + +extension Livekit_RegionInfo { + func toLKType() -> RegionInfo? { + RegionInfo(region: region, + url: url, + distance: distance) + } +} From 812a294e59da7d7fa710950a07ea50d142b9d5d3 Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Tue, 27 Aug 2024 14:55:41 +0900 Subject: [PATCH 07/11] Optimize --- Sources/LiveKit/Core/Room+Region.swift | 53 +++++++++++++------------- Sources/LiveKit/Core/Room.swift | 46 +++++++++++++--------- 2 files changed, 54 insertions(+), 45 deletions(-) diff --git a/Sources/LiveKit/Core/Room+Region.swift b/Sources/LiveKit/Core/Room+Region.swift index a20b32d02..f826277e8 100644 --- a/Sources/LiveKit/Core/Room+Region.swift +++ b/Sources/LiveKit/Core/Room+Region.swift @@ -21,41 +21,29 @@ import Foundation extension Room { static let defaultCacheInterval: TimeInterval = 3000 - func resolveNextBestRegionUrl() async throws -> URL { - if shouldRequestRegionSettings() { - try await requestRegionSettings() - } - - let (allRegions, failedRegions) = _state.read { ($0.allRegions, $0.failedRegions) } + func resolveBestRegion() async throws -> RegionInfo { + try await requestRegionSettings() - let remainingRegions = allRegions.filter { region in - !failedRegions.contains { $0 == region } - } + let sortedByDistance = _regionState.remaining.sorted { $0.distance < $1.distance } + log("[Region] Remaining regions: \(String(describing: sortedByDistance))") - guard let selectedRegion = remainingRegions.first else { + guard let selectedRegion = sortedByDistance.first else { throw LiveKitError(.regionUrlProvider, message: "No more remaining regions.") } -// _state.mutate { -// $0.allRegions.append(selectedRegion) -// } - - let result = selectedRegion.url.toSocketUrl() - log("[Region] Resolved region url: \(String(describing: result))") + log("[Region] Resolved region: \(String(describing: selectedRegion))") - return result + return selectedRegion } - // MARK: - Private - - private func shouldRequestRegionSettings() -> Bool { - _state.read { - guard !$0.allRegions.isEmpty, let regionSettingsUpdated = $0.regionDataUpdated else { return true } - let interval = Date().timeIntervalSince(regionSettingsUpdated) - return interval > Self.defaultCacheInterval + func add(failedRegion region: RegionInfo) { + _regionState.mutate { + $0.remaining.remove(region) } } + // MARK: - Private + private func requestRegionSettings() async throws { let (serverUrl, token) = _state.read { ($0.url, $0.token) } @@ -63,6 +51,15 @@ extension Room { throw LiveKitError(.invalidState) } + let shouldRequestRegionSettings = _regionState.read { + guard serverUrl == $0.url, let regionSettingsUpdated = $0.lastRequested else { return true } + let interval = Date().timeIntervalSince(regionSettingsUpdated) + log("[Region] Interval: \(String(describing: interval))") + return interval > Self.defaultCacheInterval + } + + guard shouldRequestRegionSettings else { return } + // Ensure url is for cloud. guard serverUrl.isCloud() else { throw LiveKitError(.onlyForCloud) @@ -99,9 +96,11 @@ extension Room { log("[Region] all regions: \(String(describing: allRegions))") - _state.mutate { - $0.allRegions = allRegions - $0.regionDataUpdated = Date() + _regionState.mutate { + $0.url = serverUrl + $0.all = Set(allRegions) + $0.remaining = Set(allRegions) + $0.lastRequested = Date() } } catch { throw LiveKitError(.regionUrlProvider, message: "Failed to parse region settings with error: \(error)") diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 4a98a2db1..7d20b74ad 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -151,11 +151,6 @@ public class Room: NSObject, ObservableObject, Loggable { // Agents var transcriptionReceivedTimes: [String: Date] = [:] - // Region - var regionDataUpdated: Date? - var allRegions: [RegionInfo] = [] - var failedRegions: [RegionInfo] = [] - @discardableResult mutating func updateRemoteParticipant(info: Livekit_ParticipantInfo, room: Room) -> RemoteParticipant { let identity = Participant.Identity(from: info.identity) @@ -173,7 +168,16 @@ public class Room: NSObject, ObservableObject, Loggable { } } + struct RegionState { + // Region + var url: URL? + var lastRequested: Date? + var all: Set = [] + var remaining: Set = [] + } + let _state: StateSync + let _regionState = StateSync(RegionState()) private let _sidCompleter = AsyncCompleter(label: "sid", defaultTimeout: .resolveSid) @@ -327,22 +331,28 @@ public class Room: NSObject, ObservableObject, Loggable { } do { - let regionUrl = try await resolveNextBestRegionUrl() - - try await fullConnectSequence(regionUrl, token) - - // Connect sequence successful - log("Connect sequence completed") - - // Final check if cancelled, don't fire connected events - try Task.checkCancellation() - - _state.mutate { $0.connectionState = .connected } + while true { + let region = try await resolveBestRegion() + do { + try await fullConnectSequence(region.url, token) + // Connect sequence successful + log("Connect sequence completed") + // Final check if cancelled, don't fire connected events + try Task.checkCancellation() + _state.mutate { $0.connectionState = .connected } + break // Exit loop on successful connection + } catch { + log("Connect failed with region: \(region)") + add(failedRegion: region) + // Prepare for next connect attempt. + await cleanUp(isFullReconnect: true) + } + } } catch { + log("Failed to resolve a region or connect: \(error)") await cleanUp(withError: error) - // Re-throw error - throw error + throw error // Re-throw the original error } log("Connected to \(String(describing: self))", .info) From 9761f24bfb518298c5835afb6b31a75a77289bb6 Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Wed, 28 Aug 2024 16:58:31 +0900 Subject: [PATCH 08/11] Remove sort --- Sources/LiveKit/Core/Room+Region.swift | 11 ++++------- Sources/LiveKit/Core/Room.swift | 4 ++-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/Sources/LiveKit/Core/Room+Region.swift b/Sources/LiveKit/Core/Room+Region.swift index f826277e8..62318843a 100644 --- a/Sources/LiveKit/Core/Room+Region.swift +++ b/Sources/LiveKit/Core/Room+Region.swift @@ -24,10 +24,7 @@ extension Room { func resolveBestRegion() async throws -> RegionInfo { try await requestRegionSettings() - let sortedByDistance = _regionState.remaining.sorted { $0.distance < $1.distance } - log("[Region] Remaining regions: \(String(describing: sortedByDistance))") - - guard let selectedRegion = sortedByDistance.first else { + guard let selectedRegion = _regionState.remaining.first else { throw LiveKitError(.regionUrlProvider, message: "No more remaining regions.") } @@ -38,7 +35,7 @@ extension Room { func add(failedRegion region: RegionInfo) { _regionState.mutate { - $0.remaining.remove(region) + $0.remaining.removeAll { $0 == region } } } @@ -98,8 +95,8 @@ extension Room { _regionState.mutate { $0.url = serverUrl - $0.all = Set(allRegions) - $0.remaining = Set(allRegions) + $0.all = allRegions + $0.remaining = allRegions $0.lastRequested = Date() } } catch { diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 7d20b74ad..29691c399 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -172,8 +172,8 @@ public class Room: NSObject, ObservableObject, Loggable { // Region var url: URL? var lastRequested: Date? - var all: Set = [] - var remaining: Set = [] + var all: [RegionInfo] = [] + var remaining: [RegionInfo] = [] } let _state: StateSync From 59faa1f0bb2ba9c565e88fcc49120f77821bcdca Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Wed, 4 Sep 2024 02:16:55 +0900 Subject: [PATCH 09/11] Improvements --- Sources/LiveKit/Core/Room+Engine.swift | 42 +++++++++++++++++--- Sources/LiveKit/Core/Room+Region.swift | 18 ++++++--- Sources/LiveKit/Core/Room.swift | 55 ++++++++++++++++++++------ Sources/LiveKit/Extensions/URL.swift | 2 +- 4 files changed, 91 insertions(+), 26 deletions(-) diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index 29be75949..1a7118bd0 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -266,7 +266,7 @@ extension Room { throw LiveKitError(.invalidState) } - guard let url = _state.url, let token = _state.token else { + guard let url = _state.providedUrl, let token = _state.token else { log("[Connect] Url or token is nil", .error) throw LiveKitError(.invalidState) } @@ -333,16 +333,46 @@ extension Room { $0.connectionState = .reconnecting } - await cleanUp(isFullReconnect: true) + let (providedUrl, connectedUrl, token) = _state.read { ($0.providedUrl, $0.connectedUrl, $0.token) } - guard let url = _state.url, - let token = _state.token - else { + guard let providedUrl, let connectedUrl, let token else { log("[Connect] Url or token is nil") throw LiveKitError(.invalidState) } - try await fullConnectSequence(url, token) + var nextUrl = connectedUrl + var nextRegion: RegionInfo? + + while true { + do { + // Prepare for next connect attempt. + await cleanUp(isFullReconnect: true) + + try await fullConnectSequence(nextUrl, token) + _state.mutate { $0.connectedUrl = nextUrl } + // Exit loop on successful connection + break + } catch { + // Re-throw if is cancel. + if error is CancellationError { + throw error + } + + if let region = nextRegion { + nextRegion = nil + log("Connect failed with region: \(region)") + add(failedRegion: region) + } + + try Task.checkCancellation() + + if providedUrl.isCloud { + let region = try await resolveBestRegion() + nextUrl = region.url + nextRegion = region + } + } + } } do { diff --git a/Sources/LiveKit/Core/Room+Region.swift b/Sources/LiveKit/Core/Room+Region.swift index 62318843a..dec587520 100644 --- a/Sources/LiveKit/Core/Room+Region.swift +++ b/Sources/LiveKit/Core/Room+Region.swift @@ -39,15 +39,26 @@ extension Room { } } + public func prepareRegionSettings() { + Task.detached { + try await self.requestRegionSettings() + } + } + // MARK: - Private private func requestRegionSettings() async throws { - let (serverUrl, token) = _state.read { ($0.url, $0.token) } + let (serverUrl, token) = _state.read { ($0.providedUrl, $0.token) } guard let serverUrl, let token else { throw LiveKitError(.invalidState) } + // Ensure url is for cloud. + guard serverUrl.isCloud else { + throw LiveKitError(.onlyForCloud) + } + let shouldRequestRegionSettings = _regionState.read { guard serverUrl == $0.url, let regionSettingsUpdated = $0.lastRequested else { return true } let interval = Date().timeIntervalSince(regionSettingsUpdated) @@ -57,11 +68,6 @@ extension Room { guard shouldRequestRegionSettings else { return } - // Ensure url is for cloud. - guard serverUrl.isCloud() else { - throw LiveKitError(.onlyForCloud) - } - // Make a request which ignores cache. var request = URLRequest(url: serverUrl.regionSettingsUrl(), cachePolicy: .reloadIgnoringLocalAndRemoteCacheData) diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 29691c399..3ae2cc123 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -77,7 +77,7 @@ public class Room: NSObject, ObservableObject, Loggable { // expose engine's vars @objc - public var url: String? { _state.url?.absoluteString } + public var url: String? { _state.providedUrl?.absoluteString } @objc public var token: String? { _state.token } @@ -134,8 +134,10 @@ public class Room: NSObject, ObservableObject, Loggable { var serverInfo: Livekit_ServerInfo? // Engine - var url: URL? + var providedUrl: URL? + var connectedUrl: URL? var token: String? + // preferred reconnect mode which will be used only for next attempt var nextReconnectMode: ReconnectMode? var isReconnectingWithMode: ReconnectMode? @@ -287,12 +289,12 @@ public class Room: NSObject, ObservableObject, Loggable { } @objc - public func connect(url: String, + public func connect(url urlString: String, token: String, connectOptions: ConnectOptions? = nil, roomOptions: RoomOptions? = nil) async throws { - guard let url = URL(string: url), url.isValidForConnect else { + guard let baseUrl = URL(string: urlString), baseUrl.isValidForConnect else { log("URL parse failed", .error) throw LiveKitError(.failedToParseUrl) } @@ -325,28 +327,54 @@ public class Room: NSObject, ObservableObject, Loggable { try Task.checkCancellation() _state.mutate { - $0.url = url + $0.providedUrl = baseUrl $0.token = token $0.connectionState = .connecting } + if baseUrl.isCloud { + prepareRegionSettings() + } + + var nextUrl = baseUrl + var nextRegion: RegionInfo? + do { while true { - let region = try await resolveBestRegion() - do { - try await fullConnectSequence(region.url, token) + try await fullConnectSequence(nextUrl, token) // Connect sequence successful log("Connect sequence completed") // Final check if cancelled, don't fire connected events try Task.checkCancellation() - _state.mutate { $0.connectionState = .connected } - break // Exit loop on successful connection + + _state.mutate { + $0.connectedUrl = nextUrl + $0.connectionState = .connected + } + // Exit loop on successful connection + break } catch { - log("Connect failed with region: \(region)") - add(failedRegion: region) + // Re-throw if is cancel. + if error is CancellationError { + throw error + } + + if let region = nextRegion { + nextRegion = nil + log("Connect failed with region: \(region)") + add(failedRegion: region) + } + + try Task.checkCancellation() // Prepare for next connect attempt. await cleanUp(isFullReconnect: true) + + if baseUrl.isCloud { + let region = try await resolveBestRegion() + nextUrl = region.url + nextRegion = region + } } } } catch { @@ -402,7 +430,8 @@ extension Room { $0 = isFullReconnect ? State( connectOptions: $0.connectOptions, roomOptions: $0.roomOptions, - url: $0.url, + providedUrl: $0.providedUrl, + connectedUrl: $0.connectedUrl, token: $0.token, nextReconnectMode: $0.nextReconnectMode, isReconnectingWithMode: $0.isReconnectingWithMode, diff --git a/Sources/LiveKit/Extensions/URL.swift b/Sources/LiveKit/Extensions/URL.swift index fec23f486..a72596dae 100644 --- a/Sources/LiveKit/Extensions/URL.swift +++ b/Sources/LiveKit/Extensions/URL.swift @@ -30,7 +30,7 @@ extension URL { } /// Checks whether the URL is a LiveKit Cloud URL. - func isCloud() -> Bool { + var isCloud: Bool { guard let host else { return false } return host.hasSuffix(".livekit.cloud") || host.hasSuffix(".livekit.run") } From d886ed6fc1f90730a0f7f5dc788c266baceb7e65 Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Wed, 4 Sep 2024 10:35:23 +0900 Subject: [PATCH 10/11] Prepare --- Sources/LiveKit/Core/Room+Engine.swift | 4 +- Sources/LiveKit/Core/Room+Region.swift | 74 +++++++++++++++++++------- Sources/LiveKit/Core/Room.swift | 27 ++++++---- 3 files changed, 75 insertions(+), 30 deletions(-) diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index 1a7118bd0..a47fb3574 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -361,13 +361,13 @@ extension Room { if let region = nextRegion { nextRegion = nil log("Connect failed with region: \(region)") - add(failedRegion: region) + regionManager(addFailedRegion: region) } try Task.checkCancellation() if providedUrl.isCloud { - let region = try await resolveBestRegion() + let region = try await regionManagerResolveBest() nextUrl = region.url nextRegion = region } diff --git a/Sources/LiveKit/Core/Room+Region.swift b/Sources/LiveKit/Core/Room+Region.swift index dec587520..5d846fb16 100644 --- a/Sources/LiveKit/Core/Room+Region.swift +++ b/Sources/LiveKit/Core/Room+Region.swift @@ -21,8 +21,42 @@ import Foundation extension Room { static let defaultCacheInterval: TimeInterval = 3000 - func resolveBestRegion() async throws -> RegionInfo { - try await requestRegionSettings() + // MARK: - Public + + // prepareConnection should be called as soon as the page is loaded, in order + // to speed up the connection attempt. + // + // With LiveKit Cloud, it will also determine the best edge data center for + // the current client to connect to if a token is provided. + public func prepareConnection(url providedUrlString: String, token: String) { + // Must be in disconnected state. + guard _state.connectionState == .disconnected else { + log("Room is not in disconnected state", .info) + return + } + + guard let providedUrl = URL(string: providedUrlString), providedUrl.isValidForConnect else { + log("URL parse failed", .error) + return + } + + guard providedUrl.isCloud else { + log("Provided url is not a livekit cloud url", .warning) + return + } + + _state.mutate { + $0.providedUrl = providedUrl + $0.token = token + } + + regionManagerPrepareRegionSettings() + } + + // MARK: - Internal + + func regionManagerResolveBest() async throws -> RegionInfo { + try await regionManagerRequestSettings() guard let selectedRegion = _regionState.remaining.first else { throw LiveKitError(.regionUrlProvider, message: "No more remaining regions.") @@ -33,43 +67,47 @@ extension Room { return selectedRegion } - func add(failedRegion region: RegionInfo) { + func regionManager(addFailedRegion region: RegionInfo) { _regionState.mutate { $0.remaining.removeAll { $0 == region } } } - public func prepareRegionSettings() { + func regionManagerPrepareRegionSettings() { Task.detached { - try await self.requestRegionSettings() + try await self.regionManagerRequestSettings() + } + } + + func regionManager(shouldRequestSettingsForUrl providedUrl: URL) -> Bool { + guard providedUrl.isCloud else { return false } + return _regionState.read { + guard providedUrl == $0.url, let regionSettingsUpdated = $0.lastRequested else { return true } + let interval = Date().timeIntervalSince(regionSettingsUpdated) + return interval > Self.defaultCacheInterval } } // MARK: - Private - private func requestRegionSettings() async throws { - let (serverUrl, token) = _state.read { ($0.providedUrl, $0.token) } + private func regionManagerRequestSettings() async throws { + let (providedUrl, token) = _state.read { ($0.providedUrl, $0.token) } - guard let serverUrl, let token else { + guard let providedUrl, let token else { throw LiveKitError(.invalidState) } // Ensure url is for cloud. - guard serverUrl.isCloud else { + guard providedUrl.isCloud else { throw LiveKitError(.onlyForCloud) } - let shouldRequestRegionSettings = _regionState.read { - guard serverUrl == $0.url, let regionSettingsUpdated = $0.lastRequested else { return true } - let interval = Date().timeIntervalSince(regionSettingsUpdated) - log("[Region] Interval: \(String(describing: interval))") - return interval > Self.defaultCacheInterval + guard regionManager(shouldRequestSettingsForUrl: providedUrl) else { + return } - guard shouldRequestRegionSettings else { return } - // Make a request which ignores cache. - var request = URLRequest(url: serverUrl.regionSettingsUrl(), + var request = URLRequest(url: providedUrl.regionSettingsUrl(), cachePolicy: .reloadIgnoringLocalAndRemoteCacheData) request.addValue("Bearer \(token)", forHTTPHeaderField: "authorization") @@ -100,7 +138,7 @@ extension Room { log("[Region] all regions: \(String(describing: allRegions))") _regionState.mutate { - $0.url = serverUrl + $0.url = providedUrl $0.all = allRegions $0.remaining = allRegions $0.lastRequested = Date() diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 3ae2cc123..547c9bdf7 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -294,7 +294,7 @@ public class Room: NSObject, ObservableObject, Loggable { connectOptions: ConnectOptions? = nil, roomOptions: RoomOptions? = nil) async throws { - guard let baseUrl = URL(string: urlString), baseUrl.isValidForConnect else { + guard let providedUrl = URL(string: urlString), providedUrl.isValidForConnect else { log("URL parse failed", .error) throw LiveKitError(.failedToParseUrl) } @@ -327,18 +327,25 @@ public class Room: NSObject, ObservableObject, Loggable { try Task.checkCancellation() _state.mutate { - $0.providedUrl = baseUrl + $0.providedUrl = providedUrl $0.token = token $0.connectionState = .connecting } - if baseUrl.isCloud { - prepareRegionSettings() - } - - var nextUrl = baseUrl + var nextUrl = providedUrl var nextRegion: RegionInfo? + if providedUrl.isCloud { + if regionManager(shouldRequestSettingsForUrl: providedUrl) { + regionManagerPrepareRegionSettings() + } else { + // If region info already available, use it instead of provided url. + let region = try await regionManagerResolveBest() + nextUrl = region.url + nextRegion = region + } + } + do { while true { do { @@ -363,15 +370,15 @@ public class Room: NSObject, ObservableObject, Loggable { if let region = nextRegion { nextRegion = nil log("Connect failed with region: \(region)") - add(failedRegion: region) + regionManager(addFailedRegion: region) } try Task.checkCancellation() // Prepare for next connect attempt. await cleanUp(isFullReconnect: true) - if baseUrl.isCloud { - let region = try await resolveBestRegion() + if providedUrl.isCloud { + let region = try await regionManagerResolveBest() nextUrl = region.url nextRegion = region } From 4bb928f006ac0dd34957dbfc6c9c9d9d36c595c6 Mon Sep 17 00:00:00 2001 From: hiroshihorie <548776+hiroshihorie@users.noreply.github.com> Date: Wed, 4 Sep 2024 10:58:19 +0900 Subject: [PATCH 11/11] Update tests --- Sources/LiveKit/Core/Room+Region.swift | 4 +- .../LiveKitTests/RegionUrlProviderTests.swift | 78 +++++++++++-------- 2 files changed, 49 insertions(+), 33 deletions(-) diff --git a/Sources/LiveKit/Core/Room+Region.swift b/Sources/LiveKit/Core/Room+Region.swift index 5d846fb16..a0747ebd9 100644 --- a/Sources/LiveKit/Core/Room+Region.swift +++ b/Sources/LiveKit/Core/Room+Region.swift @@ -19,7 +19,7 @@ import Foundation // MARK: - Room+Region extension Room { - static let defaultCacheInterval: TimeInterval = 3000 + static let regionManagerCacheInterval: TimeInterval = 3000 // MARK: - Public @@ -84,7 +84,7 @@ extension Room { return _regionState.read { guard providedUrl == $0.url, let regionSettingsUpdated = $0.lastRequested else { return true } let interval = Date().timeIntervalSince(regionSettingsUpdated) - return interval > Self.defaultCacheInterval + return interval > Self.regionManagerCacheInterval } } diff --git a/Tests/LiveKitTests/RegionUrlProviderTests.swift b/Tests/LiveKitTests/RegionUrlProviderTests.swift index 6b6388fdb..24749beb9 100644 --- a/Tests/LiveKitTests/RegionUrlProviderTests.swift +++ b/Tests/LiveKitTests/RegionUrlProviderTests.swift @@ -19,57 +19,73 @@ import XCTest class RegionUrlProviderTests: XCTestCase { func testResolveUrl() async throws { + let room = Room() + let testCacheInterval: TimeInterval = 3 // Test data. - let testRegionSettings = Livekit_RegionSettings.with { - $0.regions.append(Livekit_RegionInfo.with { - $0.region = "otokyo1a" - $0.url = "https://example.otokyo1a.production.livekit.cloud" - $0.distance = 32838 - }) - $0.regions.append(Livekit_RegionInfo.with { - $0.region = "dblr1a" - $0.url = "https://example.dblr1a.production.livekit.cloud" - $0.distance = 6_660_301 - }) - $0.regions.append(Livekit_RegionInfo.with { - $0.region = "dsyd1a" - $0.url = "https://example.dsyd1a.production.livekit.cloud" - $0.distance = 7_823_582 - }) - } + let testRegionSettings = [Livekit_RegionInfo.with { + $0.region = "otokyo1a" + $0.url = "https://example.otokyo1a.production.livekit.cloud" + $0.distance = 32838 + }, + Livekit_RegionInfo.with { + $0.region = "dblr1a" + $0.url = "https://example.dblr1a.production.livekit.cloud" + $0.distance = 6_660_301 + }, + Livekit_RegionInfo.with { + $0.region = "dsyd1a" + $0.url = "https://example.dsyd1a.production.livekit.cloud" + $0.distance = 7_823_582 + }].map { $0.toLKType() }.compactMap { $0 } - let provider = RegionUrlProvider(url: "wss://test.livekit.cloud", token: "", cacheInterval: testCacheInterval) + let providedUrl = URL(string: "https://example.livekit.cloud")! // See if request should be initiated. - XCTAssert(provider.shouldRequestRegionSettings(), "Should require to request region settings") + XCTAssert(room.regionManager(shouldRequestSettingsForUrl: providedUrl), "Should require to request region settings") // Set test data. - provider.set(regionSettings: testRegionSettings) + room._state.mutate { + $0.providedUrl = providedUrl + $0.token = "" + } + + room._regionState.mutate { + $0.url = providedUrl + $0.all = testRegionSettings + $0.remaining = testRegionSettings + $0.lastRequested = Date() + } // See if request is not required to be initiated. - XCTAssert(!provider.shouldRequestRegionSettings(), "Should require to request region settings") + XCTAssert(!room.regionManager(shouldRequestSettingsForUrl: providedUrl), "Should require to request region settings") - let attempt1 = try await provider.nextBestRegionUrl() + let attempt1 = try await room.regionManagerResolveBest() print("Next url: \(String(describing: attempt1))") - XCTAssert(attempt1 == URL(string: testRegionSettings.regions[0].url)?.toSocketUrl()) + XCTAssert(attempt1.url == testRegionSettings[0].url) + room.regionManager(addFailedRegion: attempt1) - let attempt2 = try await provider.nextBestRegionUrl() + let attempt2 = try await room.regionManagerResolveBest() print("Next url: \(String(describing: attempt2))") - XCTAssert(attempt2 == URL(string: testRegionSettings.regions[1].url)?.toSocketUrl()) + XCTAssert(attempt2.url == testRegionSettings[1].url) + room.regionManager(addFailedRegion: attempt2) - let attempt3 = try await provider.nextBestRegionUrl() + let attempt3 = try await room.regionManagerResolveBest() print("Next url: \(String(describing: attempt3))") - XCTAssert(attempt3 == URL(string: testRegionSettings.regions[2].url)?.toSocketUrl()) + XCTAssert(attempt3.url == testRegionSettings[2].url) + room.regionManager(addFailedRegion: attempt3) - let attempt4 = try await provider.nextBestRegionUrl() - print("Next url: \(String(describing: attempt4))") + // No more regions + let attempt4 = try? await room.regionManagerResolveBest() XCTAssert(attempt4 == nil) // Simulate cache time elapse. - await asyncSleep(for: testCacheInterval) + room._regionState.mutate { + // Roll back time. + $0.lastRequested = Date().addingTimeInterval(-Room.regionManagerCacheInterval) + } // After cache time elapsed, should require to request region settings again. - XCTAssert(provider.shouldRequestRegionSettings(), "Should require to request region settings") + XCTAssert(room.regionManager(shouldRequestSettingsForUrl: providedUrl), "Should require to request region settings") } }