Skip to content

Commit 08d6c60

Browse files
authored
Add ping pong heartbeat for signal connection (#377)
* wip * trigger onclose on ping timeout * changeset * address comments * log warning on ping timeout * fix merge * adjust log level * remove verbose logs * start ping interval only if timeout > 0
1 parent 8ec5c02 commit 08d6c60

File tree

5 files changed

+240
-9
lines changed

5 files changed

+240
-9
lines changed

.changeset/fresh-terms-train.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'livekit-client': patch
3+
---
4+
5+
Add ping pong heartbeat for signal connection

src/api/SignalClient.ts

+83-6
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,14 @@ export class SignalClient {
123123

124124
ws?: WebSocket;
125125

126+
private pingTimeout: ReturnType<typeof setTimeout> | undefined;
127+
128+
private pingTimeoutDuration: number | undefined;
129+
130+
private pingIntervalDuration: number | undefined;
131+
132+
private pingInterval: ReturnType<typeof setInterval> | undefined;
133+
126134
constructor(useJSON: boolean = false) {
127135
this.isConnected = false;
128136
this.isReconnecting = false;
@@ -155,6 +163,8 @@ export class SignalClient {
155163

156164
async reconnect(url: string, token: string): Promise<void> {
157165
this.isReconnecting = true;
166+
// clear ping interval and restart it once reconnected
167+
this.clearPingInterval();
158168
await this.connect(url, token, {
159169
reconnect: true,
160170
});
@@ -215,29 +225,41 @@ export class SignalClient {
215225
if (opts.reconnect) {
216226
// upon reconnection, there will not be additional handshake
217227
this.isConnected = true;
228+
// restart ping interval as it's cleared for reconnection
229+
this.startPingInterval();
218230
resolve();
219231
}
220232
};
221233

222234
ws.onmessage = async (ev: MessageEvent) => {
223235
// not considered connected until JoinResponse is received
224-
let msg: SignalResponse;
236+
let resp: SignalResponse;
225237
if (typeof ev.data === 'string') {
226238
const json = JSON.parse(ev.data);
227-
msg = SignalResponse.fromJSON(json);
239+
resp = SignalResponse.fromJSON(json);
228240
} else if (ev.data instanceof ArrayBuffer) {
229-
msg = SignalResponse.decode(new Uint8Array(ev.data));
241+
resp = SignalResponse.decode(new Uint8Array(ev.data));
230242
} else {
231243
log.error(`could not decode websocket message: ${typeof ev.data}`);
232244
return;
233245
}
234246

235247
if (!this.isConnected) {
236248
// handle join message only
237-
if (msg.message?.$case === 'join') {
249+
if (resp.message?.$case === 'join') {
238250
this.isConnected = true;
239251
abortSignal?.removeEventListener('abort', abortHandler);
240-
resolve(msg.message.join);
252+
this.pingTimeoutDuration = resp.message.join.pingTimeout;
253+
this.pingIntervalDuration = resp.message.join.pingInterval;
254+
255+
if (this.pingTimeoutDuration && this.pingTimeoutDuration > 0) {
256+
log.debug('ping config', {
257+
timeout: this.pingTimeoutDuration,
258+
interval: this.pingIntervalDuration,
259+
});
260+
this.startPingInterval();
261+
}
262+
resolve(resp.message.join);
241263
} else {
242264
reject(new ConnectionError('did not receive join response'));
243265
}
@@ -247,7 +269,7 @@ export class SignalClient {
247269
if (this.signalLatency) {
248270
await sleep(this.signalLatency);
249271
}
250-
this.handleSignalResponse(msg);
272+
this.handleSignalResponse(resp);
251273
};
252274

253275
ws.onclose = (ev: CloseEvent) => {
@@ -268,6 +290,7 @@ export class SignalClient {
268290
if (this.ws) this.ws.onclose = null;
269291
this.ws?.close();
270292
this.ws = undefined;
293+
this.clearPingInterval();
271294
}
272295

273296
// initial offer after joining
@@ -364,6 +387,13 @@ export class SignalClient {
364387
});
365388
}
366389

390+
sendPing() {
391+
this.sendRequest({
392+
$case: 'ping',
393+
ping: Date.now(),
394+
});
395+
}
396+
367397
async sendLeave() {
368398
await this.sendRequest({
369399
$case: 'leave',
@@ -475,6 +505,8 @@ export class SignalClient {
475505
if (this.onLocalTrackUnpublished) {
476506
this.onLocalTrackUnpublished(msg.trackUnpublished);
477507
}
508+
} else if (msg.$case === 'pong') {
509+
this.resetPingTimeout();
478510
} else {
479511
log.debug('unsupported message', msg);
480512
}
@@ -493,6 +525,51 @@ export class SignalClient {
493525
private handleWSError(ev: Event) {
494526
log.error('websocket error', ev);
495527
}
528+
529+
private resetPingTimeout() {
530+
this.clearPingTimeout();
531+
if (!this.pingTimeoutDuration) {
532+
log.warn('ping timeout duration not set');
533+
return;
534+
}
535+
this.pingTimeout = setTimeout(() => {
536+
log.warn(
537+
`ping timeout triggered. last pong received at: ${new Date(
538+
Date.now() - this.pingTimeoutDuration! * 1000,
539+
).toUTCString()}`,
540+
);
541+
if (this.onClose) {
542+
this.onClose('ping timeout');
543+
}
544+
}, this.pingTimeoutDuration * 1000);
545+
}
546+
547+
private clearPingTimeout() {
548+
if (this.pingTimeout) {
549+
clearTimeout(this.pingTimeout);
550+
}
551+
}
552+
553+
private startPingInterval() {
554+
this.clearPingInterval();
555+
this.resetPingTimeout();
556+
if (!this.pingIntervalDuration) {
557+
log.warn('ping interval duration not set');
558+
return;
559+
}
560+
log.debug('start ping interval');
561+
this.pingInterval = setInterval(() => {
562+
this.sendPing();
563+
}, this.pingIntervalDuration * 1000);
564+
}
565+
566+
private clearPingInterval() {
567+
log.debug('clearing ping interval');
568+
this.clearPingTimeout();
569+
if (this.pingInterval) {
570+
clearInterval(this.pingInterval);
571+
}
572+
}
496573
}
497574

498575
function fromProtoSessionDescription(sd: SessionDescription): RTCSessionDescriptionInit {

src/proto/livekit_models.ts

+69
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ export enum DisconnectReason {
226226
PARTICIPANT_REMOVED = 4,
227227
ROOM_DELETED = 5,
228228
STATE_MISMATCH = 6,
229+
JOIN_FAILURE = 7,
229230
UNRECOGNIZED = -1,
230231
}
231232

@@ -252,6 +253,9 @@ export function disconnectReasonFromJSON(object: any): DisconnectReason {
252253
case 6:
253254
case 'STATE_MISMATCH':
254255
return DisconnectReason.STATE_MISMATCH;
256+
case 7:
257+
case 'JOIN_FAILURE':
258+
return DisconnectReason.JOIN_FAILURE;
255259
case -1:
256260
case 'UNRECOGNIZED':
257261
default:
@@ -275,6 +279,8 @@ export function disconnectReasonToJSON(object: DisconnectReason): string {
275279
return 'ROOM_DELETED';
276280
case DisconnectReason.STATE_MISMATCH:
277281
return 'STATE_MISMATCH';
282+
case DisconnectReason.JOIN_FAILURE:
283+
return 'JOIN_FAILURE';
278284
case DisconnectReason.UNRECOGNIZED:
279285
default:
280286
return 'UNRECOGNIZED';
@@ -627,6 +633,11 @@ export interface RTPStats_GapHistogramEntry {
627633
value: number;
628634
}
629635

636+
export interface TimedVersion {
637+
unixMicro: number;
638+
ticks: number;
639+
}
640+
630641
function createBaseRoom(): Room {
631642
return {
632643
sid: '',
@@ -2654,6 +2665,64 @@ export const RTPStats_GapHistogramEntry = {
26542665
},
26552666
};
26562667

2668+
function createBaseTimedVersion(): TimedVersion {
2669+
return { unixMicro: 0, ticks: 0 };
2670+
}
2671+
2672+
export const TimedVersion = {
2673+
encode(message: TimedVersion, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
2674+
if (message.unixMicro !== 0) {
2675+
writer.uint32(8).int64(message.unixMicro);
2676+
}
2677+
if (message.ticks !== 0) {
2678+
writer.uint32(16).int32(message.ticks);
2679+
}
2680+
return writer;
2681+
},
2682+
2683+
decode(input: _m0.Reader | Uint8Array, length?: number): TimedVersion {
2684+
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
2685+
let end = length === undefined ? reader.len : reader.pos + length;
2686+
const message = createBaseTimedVersion();
2687+
while (reader.pos < end) {
2688+
const tag = reader.uint32();
2689+
switch (tag >>> 3) {
2690+
case 1:
2691+
message.unixMicro = longToNumber(reader.int64() as Long);
2692+
break;
2693+
case 2:
2694+
message.ticks = reader.int32();
2695+
break;
2696+
default:
2697+
reader.skipType(tag & 7);
2698+
break;
2699+
}
2700+
}
2701+
return message;
2702+
},
2703+
2704+
fromJSON(object: any): TimedVersion {
2705+
return {
2706+
unixMicro: isSet(object.unixMicro) ? Number(object.unixMicro) : 0,
2707+
ticks: isSet(object.ticks) ? Number(object.ticks) : 0,
2708+
};
2709+
},
2710+
2711+
toJSON(message: TimedVersion): unknown {
2712+
const obj: any = {};
2713+
message.unixMicro !== undefined && (obj.unixMicro = Math.round(message.unixMicro));
2714+
message.ticks !== undefined && (obj.ticks = Math.round(message.ticks));
2715+
return obj;
2716+
},
2717+
2718+
fromPartial<I extends Exact<DeepPartial<TimedVersion>, I>>(object: I): TimedVersion {
2719+
const message = createBaseTimedVersion();
2720+
message.unixMicro = object.unixMicro ?? 0;
2721+
message.ticks = object.ticks ?? 0;
2722+
return message;
2723+
},
2724+
};
2725+
26572726
declare var self: any | undefined;
26582727
declare var window: any | undefined;
26592728
declare var global: any | undefined;

0 commit comments

Comments
 (0)