diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/PublisherTransportObserver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/PublisherTransportObserver.kt index 6c20f0ce..f79e484e 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/PublisherTransportObserver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/PublisherTransportObserver.kt @@ -16,7 +16,10 @@ package io.livekit.android.room +import io.livekit.android.room.util.PeerConnectionStateObservable +import io.livekit.android.util.FlowObservable import io.livekit.android.util.LKLog +import io.livekit.android.util.flowDelegate import io.livekit.android.webrtc.peerconnection.executeOnRTCThread import livekit.LivekitRtc import livekit.org.webrtc.CandidatePairChangeEvent @@ -31,9 +34,14 @@ import livekit.org.webrtc.SessionDescription internal class PublisherTransportObserver( private val engine: RTCEngine, private val client: SignalClient, -) : PeerConnection.Observer, PeerConnectionTransport.Listener { +) : PeerConnection.Observer, PeerConnectionTransport.Listener, PeerConnectionStateObservable { - var connectionChangeListener: ((newState: PeerConnection.PeerConnectionState) -> Unit)? = null + var connectionChangeListener: PeerConnectionStateListener? = null + + @FlowObservable + @get:FlowObservable + override var connectionState by flowDelegate(PeerConnection.PeerConnectionState.NEW) + private set override fun onIceCandidate(iceCandidate: IceCandidate?) { executeOnRTCThread { @@ -66,6 +74,7 @@ internal class PublisherTransportObserver( executeOnRTCThread { LKLog.v { "onConnection new state: $newState" } connectionChangeListener?.invoke(newState) + connectionState = newState } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index 4f318f7f..3999bdb6 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -29,6 +29,7 @@ import io.livekit.android.room.track.TrackException import io.livekit.android.room.util.MediaConstraintKeys import io.livekit.android.room.util.createAnswer import io.livekit.android.room.util.setLocalDescription +import io.livekit.android.room.util.waitUntilConnected import io.livekit.android.util.CloseableCoroutineScope import io.livekit.android.util.Either import io.livekit.android.util.FlowObservable @@ -49,10 +50,12 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withTimeoutOrNull import kotlinx.coroutines.yield import livekit.LivekitModels import livekit.LivekitModels.AudioTrackFeature @@ -66,6 +69,7 @@ import livekit.org.webrtc.MediaConstraints import livekit.org.webrtc.MediaStream import livekit.org.webrtc.MediaStreamTrack import livekit.org.webrtc.PeerConnection +import livekit.org.webrtc.PeerConnection.PeerConnectionState import livekit.org.webrtc.PeerConnection.RTCConfiguration import livekit.org.webrtc.RTCStatsCollectorCallback import livekit.org.webrtc.RTCStatsReport @@ -246,7 +250,7 @@ internal constructor( null, ) - val connectionStateListener: (PeerConnection.PeerConnectionState) -> Unit = { newState -> + val connectionStateListener: PeerConnectionStateListener = { newState -> LKLog.v { "onIceConnection new state: $newState" } if (newState.isConnected()) { connectionState = ConnectionState.CONNECTED @@ -528,31 +532,21 @@ internal constructor( } // wait until publisher ICE connected - val endTime = SystemClock.elapsedRealtime() + MAX_ICE_CONNECT_TIMEOUT_MS + var publisherWaitJob: Job? = null if (hasPublished) { - while (SystemClock.elapsedRealtime() < endTime) { - if (publisher?.isConnected() == true) { - LKLog.v { "publisher reconnected to ICE" } - break - } - delay(100) + publisherWaitJob = launch { + publisherObserver.waitUntilConnected() } } - ensureActive() - if (isClosed) { - LKLog.v { "RTCEngine closed, aborting reconnection" } - break + // wait until subscriber ICE connected + val subscriberWaitJob = launch { + subscriberObserver.waitUntilConnected() } - // wait until subscriber ICE connected - while (SystemClock.elapsedRealtime() < endTime) { - if (subscriber?.isConnected() == true) { - LKLog.v { "reconnected to ICE" } - connectionState = ConnectionState.CONNECTED - break - } - delay(100) + withTimeoutOrNull(MAX_ICE_CONNECT_TIMEOUT_MS.toLong()) { + listOfNotNull(publisherWaitJob, subscriberWaitJob) + .joinAll() } ensureActive() @@ -1160,3 +1154,5 @@ fun LivekitRtc.ICEServer.toWebrtc(): PeerConnection.IceServer = PeerConnection.I .setTlsAlpnProtocols(emptyList()) .setTlsEllipticCurves(emptyList()) .createIceServer() + +typealias PeerConnectionStateListener = (PeerConnectionState) -> Unit diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SubscriberTransportObserver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SubscriberTransportObserver.kt index a967f50d..6c5ccc8a 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SubscriberTransportObserver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SubscriberTransportObserver.kt @@ -16,7 +16,10 @@ package io.livekit.android.room +import io.livekit.android.room.util.PeerConnectionStateObservable +import io.livekit.android.util.FlowObservable import io.livekit.android.util.LKLog +import io.livekit.android.util.flowDelegate import io.livekit.android.webrtc.peerconnection.executeOnRTCThread import livekit.LivekitRtc import livekit.org.webrtc.CandidatePairChangeEvent @@ -34,10 +37,15 @@ import livekit.org.webrtc.RtpTransceiver class SubscriberTransportObserver( private val engine: RTCEngine, private val client: SignalClient, -) : PeerConnection.Observer { +) : PeerConnection.Observer, PeerConnectionStateObservable { var dataChannelListener: ((DataChannel) -> Unit)? = null - var connectionChangeListener: ((PeerConnection.PeerConnectionState) -> Unit)? = null + var connectionChangeListener: PeerConnectionStateListener? = null + + @FlowObservable + @get:FlowObservable + override var connectionState by flowDelegate(PeerConnection.PeerConnectionState.NEW) + private set override fun onIceCandidate(candidate: IceCandidate) { executeOnRTCThread { @@ -75,6 +83,7 @@ class SubscriberTransportObserver( executeOnRTCThread { LKLog.v { "onConnectionChange new state: $newState" } connectionChangeListener?.invoke(newState) + connectionState = newState } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/util/PeerConnectionStateObservable.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/util/PeerConnectionStateObservable.kt new file mode 100644 index 00000000..5ffd6533 --- /dev/null +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/util/PeerConnectionStateObservable.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2024 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.room.util + +import io.livekit.android.util.FlowObservable +import io.livekit.android.util.flow +import io.livekit.android.webrtc.isConnected +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.takeWhile +import livekit.org.webrtc.PeerConnection.PeerConnectionState + +internal interface PeerConnectionStateObservable { + @FlowObservable + @get:FlowObservable + val connectionState: PeerConnectionState +} + +/** + * Waits until the connection state [PeerConnectionState.isConnected]. + */ +internal suspend fun PeerConnectionStateObservable.waitUntilConnected() { + this::connectionState.flow + .takeWhile { + !it.isConnected() + } + .collect() +}