Skip to content

Commit 7af32ad

Browse files
lukasIOdavidzhao
andauthored
Fix stuck reconnect (#475)
* retry initial signal connection * changeset * move retry logic into rtcEngine * make maxJoinAttempts configurable * update changeset * typo * move maxRetries into connectOpts * wip * more logic in engine * remove comment * fix connect logic * don't reject reconnect promise * replace reconnectFuture with event driven logic * changeset * Update src/room/RTCEngine.ts Co-authored-by: David Zhao <[email protected]> * change naming Co-authored-by: David Zhao <[email protected]>
1 parent 676ecd3 commit 7af32ad

File tree

4 files changed

+138
-105
lines changed

4 files changed

+138
-105
lines changed

.changeset/clean-rings-hug.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'livekit-client': patch
3+
---
4+
5+
Fix reconnection attempts potentially getting stuck

src/api/SignalClient.ts

+20-16
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,6 @@ export class SignalClient {
192192

193193
return new Promise<JoinResponse | void>((resolve, reject) => {
194194
const abortHandler = () => {
195-
ws.close();
196195
this.close();
197196
reject(new ConnectionError('room connection has been cancelled'));
198197
};
@@ -202,12 +201,14 @@ export class SignalClient {
202201
}
203202
abortSignal?.addEventListener('abort', abortHandler);
204203
log.debug(`connecting to ${url + params}`);
205-
this.ws = undefined;
206-
const ws = new WebSocket(url + params);
207-
ws.binaryType = 'arraybuffer';
204+
if (this.ws) {
205+
this.close();
206+
}
207+
this.ws = new WebSocket(url + params);
208+
this.ws.binaryType = 'arraybuffer';
208209

209-
ws.onerror = async (ev: Event) => {
210-
if (!this.ws) {
210+
this.ws.onerror = async (ev: Event) => {
211+
if (!this.isConnected) {
211212
try {
212213
const resp = await fetch(`http${url.substring(2)}/validate${params}`);
213214
if (!resp.ok) {
@@ -236,8 +237,7 @@ export class SignalClient {
236237
this.handleWSError(ev);
237238
};
238239

239-
ws.onopen = () => {
240-
this.ws = ws;
240+
this.ws.onopen = () => {
241241
if (opts.reconnect) {
242242
// upon reconnection, there will not be additional handshake
243243
this.isConnected = true;
@@ -247,7 +247,7 @@ export class SignalClient {
247247
}
248248
};
249249

250-
ws.onmessage = async (ev: MessageEvent) => {
250+
this.ws.onmessage = async (ev: MessageEvent) => {
251251
// not considered connected until JoinResponse is received
252252
let resp: SignalResponse;
253253
if (typeof ev.data === 'string') {
@@ -288,23 +288,27 @@ export class SignalClient {
288288
this.handleSignalResponse(resp);
289289
};
290290

291-
ws.onclose = (ev: CloseEvent) => {
292-
if (!this.isConnected || this.ws !== ws) return;
291+
this.ws.onclose = (ev: CloseEvent) => {
292+
if (!this.isConnected) return;
293293

294294
log.debug(`websocket connection closed: ${ev.reason}`);
295295
this.isConnected = false;
296-
if (this.onClose) this.onClose(ev.reason);
297-
if (this.ws === ws) {
298-
this.ws = undefined;
296+
if (this.onClose) {
297+
this.onClose(ev.reason);
299298
}
299+
this.ws = undefined;
300300
};
301301
});
302302
}
303303

304304
close() {
305305
this.isConnected = false;
306-
if (this.ws) this.ws.onclose = null;
307-
this.ws?.close();
306+
if (this.ws) {
307+
this.ws.onclose = null;
308+
this.ws.onmessage = null;
309+
this.ws.onopen = null;
310+
this.ws.close();
311+
}
308312
this.ws = undefined;
309313
this.clearPingInterval();
310314
}

src/room/RTCEngine.ts

+94-55
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
129129
this.client = new SignalClient();
130130
this.client.signalLatency = this.options.expSignalLatency;
131131
this.reconnectPolicy = this.options.reconnectPolicy;
132+
this.registerOnLineListener();
132133
}
133134

134135
async join(
@@ -174,8 +175,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
174175

175176
close() {
176177
this._isClosed = true;
177-
178178
this.removeAllListeners();
179+
this.deregisterOnLineListener();
180+
this.clearPendingReconnect();
179181
if (this.publisher && this.publisher.pc.signalingState !== 'closed') {
180182
this.publisher.pc.getSenders().forEach((sender) => {
181183
try {
@@ -684,66 +686,74 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
684686
if (this.reconnectTimeout) {
685687
clearTimeout(this.reconnectTimeout);
686688
}
687-
this.reconnectTimeout = setTimeout(async () => {
688-
if (this._isClosed) {
689-
return;
689+
this.reconnectTimeout = setTimeout(() => this.attemptReconnect(signalEvents), delay);
690+
};
691+
692+
private async attemptReconnect(signalEvents: boolean = false) {
693+
if (this._isClosed) {
694+
return;
695+
}
696+
// guard for attempting reconnection multiple times while one attempt is still not finished
697+
if (this.attemptingReconnect) {
698+
return;
699+
}
700+
if (
701+
this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED ||
702+
// signaling state could change to closed due to hardware sleep
703+
// those connections cannot be resumed
704+
(this.primaryPC?.signalingState ?? 'closed') === 'closed'
705+
) {
706+
this.fullReconnectOnNext = true;
707+
}
708+
709+
try {
710+
this.attemptingReconnect = true;
711+
if (this.fullReconnectOnNext) {
712+
await this.restartConnection(signalEvents);
713+
} else {
714+
await this.resumeConnection(signalEvents);
690715
}
691-
// guard for attempting reconnection multiple times while one attempt is still not finished
692-
if (this.attemptingReconnect) {
693-
return;
716+
this.reconnectAttempts = 0;
717+
this.fullReconnectOnNext = false;
718+
if (this.reconnectTimeout) {
719+
clearTimeout(this.reconnectTimeout);
694720
}
695-
if (
696-
this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED ||
697-
// signaling state could change to closed due to hardware sleep
698-
// those connections cannot be resumed
699-
(this.primaryPC?.signalingState ?? 'closed') === 'closed'
700-
) {
701-
this.fullReconnectOnNext = true;
721+
} catch (e) {
722+
this.reconnectAttempts += 1;
723+
let reconnectRequired = false;
724+
let recoverable = true;
725+
let requireSignalEvents = false;
726+
if (e instanceof UnexpectedConnectionState) {
727+
log.debug('received unrecoverable error', { error: e });
728+
// unrecoverable
729+
recoverable = false;
730+
} else if (!(e instanceof SignalReconnectError)) {
731+
// cannot resume
732+
reconnectRequired = true;
702733
}
703734

704-
try {
705-
this.attemptingReconnect = true;
706-
if (this.fullReconnectOnNext) {
707-
await this.restartConnection(signalEvents);
708-
} else {
709-
await this.resumeConnection(signalEvents);
710-
}
711-
this.reconnectAttempts = 0;
712-
this.fullReconnectOnNext = false;
713-
if (this.reconnectTimeout) {
714-
clearTimeout(this.reconnectTimeout);
715-
}
716-
} catch (e) {
717-
this.reconnectAttempts += 1;
718-
let reconnectRequired = false;
719-
let recoverable = true;
720-
let requireSignalEvents = false;
721-
if (e instanceof UnexpectedConnectionState) {
722-
log.debug('received unrecoverable error', { error: e });
723-
// unrecoverable
724-
recoverable = false;
725-
} else if (!(e instanceof SignalReconnectError)) {
726-
// cannot resume
727-
reconnectRequired = true;
728-
}
729-
730-
// when we flip from resume to reconnect
731-
// we need to fire the right reconnecting events
732-
if (reconnectRequired && !this.fullReconnectOnNext) {
733-
this.fullReconnectOnNext = true;
734-
requireSignalEvents = true;
735-
}
735+
// when we flip from resume to reconnect
736+
// we need to fire the right reconnecting events
737+
if (reconnectRequired && !this.fullReconnectOnNext) {
738+
this.fullReconnectOnNext = true;
739+
requireSignalEvents = true;
740+
}
736741

737-
if (recoverable) {
738-
this.handleDisconnect('reconnect', requireSignalEvents);
739-
} else {
740-
disconnect(Date.now() - this.reconnectStart);
741-
}
742-
} finally {
743-
this.attemptingReconnect = false;
742+
if (recoverable) {
743+
this.handleDisconnect('reconnect', requireSignalEvents);
744+
} else {
745+
log.info(
746+
`could not recover connection after ${this.reconnectAttempts} attempts, ${
747+
Date.now() - this.reconnectStart
748+
}ms. giving up`,
749+
);
750+
this.emit(EngineEvent.Disconnected);
751+
this.close();
744752
}
745-
}, delay);
746-
};
753+
} finally {
754+
this.attemptingReconnect = false;
755+
}
756+
}
747757

748758
private getNextRetryDelay(context: ReconnectContext) {
749759
try {
@@ -956,6 +966,35 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
956966
}
957967
}
958968
}
969+
970+
private clearPendingReconnect() {
971+
if (this.reconnectTimeout) {
972+
clearTimeout(this.reconnectTimeout);
973+
}
974+
this.reconnectAttempts = 0;
975+
}
976+
977+
private handleBrowserOnLine = () => {
978+
// in case the engine is currently reconnecting, attempt a reconnect immediately after the browser state has changed to 'onLine'
979+
if (this.client.isReconnecting) {
980+
if (this.reconnectTimeout) {
981+
clearTimeout(this.reconnectTimeout);
982+
}
983+
this.attemptReconnect(true);
984+
}
985+
};
986+
987+
private registerOnLineListener() {
988+
if (isWeb()) {
989+
window.addEventListener('online', this.handleBrowserOnLine);
990+
}
991+
}
992+
993+
private deregisterOnLineListener() {
994+
if (isWeb()) {
995+
window.removeEventListener('online', this.handleBrowserOnLine);
996+
}
997+
}
959998
}
960999

9611000
async function getConnectedAddress(pc: RTCPeerConnection): Promise<string | undefined> {

0 commit comments

Comments
 (0)