diff --git a/.changeset/blue-cheetahs-shout.md b/.changeset/blue-cheetahs-shout.md new file mode 100644 index 0000000000..4d63d0cb7c --- /dev/null +++ b/.changeset/blue-cheetahs-shout.md @@ -0,0 +1,5 @@ +--- +'livekit-client': minor +--- + +Add new rtc path that defaults to single peer connection mode and falls back to legacy dual pc diff --git a/examples/demo/demo.ts b/examples/demo/demo.ts index c0dbd095a5..22f1a8d6ed 100644 --- a/examples/demo/demo.ts +++ b/examples/demo/demo.ts @@ -131,7 +131,6 @@ const appActions = { encryption: e2eeEnabled ? { keyProvider: state.e2eeKeyProvider, worker: new E2EEWorker() } : undefined, - singlePeerConnection: true, }; if ( roomOpts.publishDefaults?.videoCodec === 'av1' || diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index 0623a1ca27..04530c9271 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -75,7 +75,6 @@ export interface SignalOptions { maxRetries: number; e2eeEnabled: boolean; websocketTimeout: number; - singlePeerConnection: boolean; } type SignalMessage = SignalRequest['message']; @@ -246,12 +245,13 @@ export class SignalClient { token: string, opts: SignalOptions, abortSignal?: AbortSignal, + forceV0Path?: boolean, ): Promise { // during a full reconnect, we'd want to start the sequence even if currently // connected this.state = SignalConnectionState.CONNECTING; this.options = opts; - const res = await this.connect(url, token, opts, abortSignal); + const res = await this.connect(url, token, opts, abortSignal, forceV0Path); return res as JoinResponse; } @@ -286,16 +286,18 @@ export class SignalClient { token: string, opts: ConnectOpts, abortSignal?: AbortSignal, + /** setting this to true results in dual peer connection mode being used */ + forceV0Path?: boolean, ): Promise { const unlock = await this.connectionLock.lock(); this.connectOptions = opts; const clientInfo = getClientInfo(); - const params = opts.singlePeerConnection - ? createJoinRequestConnectionParams(token, clientInfo, opts) - : createConnectionParams(token, clientInfo, opts); - const rtcUrl = createRtcUrl(url, params); - const validateUrl = createValidateUrl(rtcUrl); + const params = forceV0Path + ? createConnectionParams(token, clientInfo, opts) + : createJoinRequestConnectionParams(token, clientInfo, opts); + const rtcUrl = createRtcUrl(url, params, forceV0Path).toString(); + const validateUrl = createValidateUrl(rtcUrl).toString(); return new Promise(async (resolve, reject) => { try { @@ -995,10 +997,22 @@ export class SignalClient { ): Promise { try { const resp = await fetch(validateUrl); - if (resp.status.toFixed(0).startsWith('4')) { - const msg = await resp.text(); - return ConnectionError.notAllowed(msg, resp.status); - } else if (reason instanceof ConnectionError) { + + switch (resp.status) { + case 404: + return ConnectionError.serviceNotFound( + 'v1 RTC path not found. Consider upgrading your LiveKit server version', + 'v0-rtc', + ); + case 401: + case 403: + const msg = await resp.text(); + return ConnectionError.notAllowed(msg, resp.status); + default: + break; + } + + if (reason instanceof ConnectionError) { return reason; } else { return ConnectionError.internal( diff --git a/src/api/utils.test.ts b/src/api/utils.test.ts index 495403b0e3..5ea02771d4 100644 --- a/src/api/utils.test.ts +++ b/src/api/utils.test.ts @@ -6,14 +6,14 @@ describe('createRtcUrl', () => { const url = 'wss://example.com'; const searchParams = new URLSearchParams(); const result = createRtcUrl(url, searchParams); - expect(result.toString()).toBe('wss://example.com/rtc'); + expect(result.toString()).toBe('wss://example.com/rtc/v1'); }); it('should create a basic RTC URL with http protocol', () => { const url = 'http://example.com'; const searchParams = new URLSearchParams(); const result = createRtcUrl(url, searchParams); - expect(result.toString()).toBe('ws://example.com/rtc'); + expect(result.toString()).toBe('ws://example.com/rtc/v1'); }); it('should handle search parameters', () => { @@ -25,7 +25,7 @@ describe('createRtcUrl', () => { const result = createRtcUrl(url, searchParams); const parsedResult = new URL(result); - expect(parsedResult.pathname).toBe('/rtc'); + expect(parsedResult.pathname).toBe('/rtc/v1'); expect(parsedResult.searchParams.get('token')).toBe('test-token'); expect(parsedResult.searchParams.get('room')).toBe('test-room'); }); @@ -36,7 +36,7 @@ describe('createRtcUrl', () => { const result = createRtcUrl(url, searchParams); const parsedResult = new URL(result); - expect(parsedResult.pathname).toBe('/rtc'); + expect(parsedResult.pathname).toBe('/rtc/v1'); }); it('should handle sub paths', () => { @@ -45,7 +45,7 @@ describe('createRtcUrl', () => { const result = createRtcUrl(url, searchParams); const parsedResult = new URL(result); - expect(parsedResult.pathname).toBe('/sub/path/rtc'); + expect(parsedResult.pathname).toBe('/sub/path/rtc/v1'); }); it('should handle sub paths with trailing slashes', () => { @@ -54,7 +54,7 @@ describe('createRtcUrl', () => { const result = createRtcUrl(url, searchParams); const parsedResult = new URL(result); - expect(parsedResult.pathname).toBe('/sub/path/rtc'); + expect(parsedResult.pathname).toBe('/sub/path/rtc/v1'); }); it('should handle sub paths with url params', () => { @@ -64,7 +64,7 @@ describe('createRtcUrl', () => { const result = createRtcUrl(url, searchParams); const parsedResult = new URL(result); - expect(parsedResult.pathname).toBe('/sub/path/rtc'); + expect(parsedResult.pathname).toBe('/sub/path/rtc/v1'); expect(parsedResult.searchParams.get('param')).toBe('value'); expect(parsedResult.searchParams.get('token')).toBe('test-token'); }); @@ -73,8 +73,8 @@ describe('createRtcUrl', () => { describe('createValidateUrl', () => { it('should create a basic validate URL', () => { const rtcUrl = createRtcUrl('wss://example.com', new URLSearchParams()); - const result = createValidateUrl(rtcUrl); - expect(result.toString()).toBe('https://example.com/rtc/validate'); + const result = createValidateUrl(rtcUrl.toString()); + expect(result.toString()).toBe('https://example.com/rtc/v1/validate'); }); it('should handle search parameters', () => { @@ -85,33 +85,41 @@ describe('createValidateUrl', () => { room: 'test-room', }), ); - const result = createValidateUrl(rtcUrl); + const result = createValidateUrl(rtcUrl.toString()); const parsedResult = new URL(result); - expect(parsedResult.pathname).toBe('/rtc/validate'); + expect(parsedResult.pathname).toBe('/rtc/v1/validate'); expect(parsedResult.searchParams.get('token')).toBe('test-token'); expect(parsedResult.searchParams.get('room')).toBe('test-room'); }); it('should handle ws protocol', () => { const rtcUrl = createRtcUrl('ws://example.com', new URLSearchParams()); - const result = createValidateUrl(rtcUrl); + const result = createValidateUrl(rtcUrl.toString()); const parsedResult = new URL(result); - expect(parsedResult.pathname).toBe('/rtc/validate'); + expect(parsedResult.pathname).toBe('/rtc/v1/validate'); }); it('should preserve the original path', () => { const rtcUrl = createRtcUrl('wss://example.com/some/path', new URLSearchParams()); - const result = createValidateUrl(rtcUrl); + const result = createValidateUrl(rtcUrl.toString()); const parsedResult = new URL(result); - expect(parsedResult.pathname).toBe('/some/path/rtc/validate'); + expect(parsedResult.pathname).toBe('/some/path/rtc/v1/validate'); }); it('should handle sub paths with trailing slashes', () => { const rtcUrl = createRtcUrl('wss://example.com/sub/path/', new URLSearchParams()); - const result = createValidateUrl(rtcUrl); + const result = createValidateUrl(rtcUrl.toString()); + + const parsedResult = new URL(result); + expect(parsedResult.pathname).toBe('/sub/path/rtc/v1/validate'); + }); + + it('should handle v0 paths', () => { + const rtcUrl = createRtcUrl('wss://example.com/sub/path/', new URLSearchParams(), true); + const result = createValidateUrl(rtcUrl.toString()); const parsedResult = new URL(result); expect(parsedResult.pathname).toBe('/sub/path/rtc/validate'); diff --git a/src/api/utils.ts b/src/api/utils.ts index a398aed3cf..1202ce3ecf 100644 --- a/src/api/utils.ts +++ b/src/api/utils.ts @@ -1,7 +1,16 @@ import { SignalResponse } from '@livekit/protocol'; import { toHttpUrl, toWebsocketUrl } from '../room/utils'; -export function createRtcUrl(url: string, searchParams: URLSearchParams) { +export function createRtcUrl(url: string, searchParams: URLSearchParams, useV0Path = false) { + const v0Url = createV0RtcUrl(url, searchParams); + if (useV0Path) { + return v0Url; + } else { + return appendUrlPath(v0Url, 'v1'); + } +} + +export function createV0RtcUrl(url: string, searchParams: URLSearchParams) { const urlObj = new URL(toWebsocketUrl(url)); searchParams.forEach((value, key) => { urlObj.searchParams.set(key, value); @@ -20,7 +29,7 @@ export function ensureTrailingSlash(path: string) { function appendUrlPath(urlObj: URL, path: string) { urlObj.pathname = `${ensureTrailingSlash(urlObj.pathname)}${path}`; - return urlObj.toString(); + return urlObj; } export function parseSignalResponse(value: ArrayBuffer | string) { diff --git a/src/connectionHelper/checks/turn.ts b/src/connectionHelper/checks/turn.ts index 1af92b1db2..98079db27a 100644 --- a/src/connectionHelper/checks/turn.ts +++ b/src/connectionHelper/checks/turn.ts @@ -15,13 +15,18 @@ export class TURNCheck extends Checker { (await new RegionUrlProvider(this.url, this.token).getNextBestRegionUrl()) ?? this.url; } const signalClient = new SignalClient(); - const joinRes = await signalClient.join(this.url, this.token, { - autoSubscribe: true, - maxRetries: 0, - e2eeEnabled: false, - websocketTimeout: 15_000, - singlePeerConnection: false, - }); + const joinRes = await signalClient.join( + this.url, + this.token, + { + autoSubscribe: true, + maxRetries: 0, + e2eeEnabled: false, + websocketTimeout: 15_000, + }, + undefined, + true, + ); let hasTLS = false; let hasTURN = false; diff --git a/src/connectionHelper/checks/websocket.ts b/src/connectionHelper/checks/websocket.ts index 8af1d326a4..773fce18aa 100644 --- a/src/connectionHelper/checks/websocket.ts +++ b/src/connectionHelper/checks/websocket.ts @@ -17,13 +17,18 @@ export class WebSocketCheck extends Checker { let signalClient = new SignalClient(); let joinRes: JoinResponse | undefined; try { - joinRes = await signalClient.join(this.url, this.token, { - autoSubscribe: true, - maxRetries: 0, - e2eeEnabled: false, - websocketTimeout: 15_000, - singlePeerConnection: false, - }); + joinRes = await signalClient.join( + this.url, + this.token, + { + autoSubscribe: true, + maxRetries: 0, + e2eeEnabled: false, + websocketTimeout: 15_000, + }, + undefined, + true, + ); } catch (e: any) { if (isCloud(new URL(this.url))) { this.appendMessage( @@ -32,13 +37,18 @@ export class WebSocketCheck extends Checker { const regionProvider = new RegionUrlProvider(this.url, this.token); const regionUrl = await regionProvider.getNextBestRegionUrl(); if (regionUrl) { - joinRes = await signalClient.join(regionUrl, this.token, { - autoSubscribe: true, - maxRetries: 0, - e2eeEnabled: false, - websocketTimeout: 15_000, - singlePeerConnection: false, - }); + joinRes = await signalClient.join( + regionUrl, + this.token, + { + autoSubscribe: true, + maxRetries: 0, + e2eeEnabled: false, + websocketTimeout: 15_000, + }, + undefined, + true, + ); this.appendMessage( `Fallback to region worked. To avoid initial connections failing, ensure you're calling room.prepareConnection() ahead of time`, ); diff --git a/src/options.ts b/src/options.ts index d28d67b95a..f562672220 100644 --- a/src/options.ts +++ b/src/options.ts @@ -101,9 +101,10 @@ export interface InternalRoomOptions { loggerName?: string; /** - * @experimental - * only supported on LiveKit Cloud - * and LiveKit OSS >= 1.9.2 + * will attempt to connect via single peer connection mode. + * falls back to dual peer connection mode if not available. + * + * @default true */ singlePeerConnection: boolean; } diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index b3ae7ae164..beece8e4d4 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -66,6 +66,12 @@ export class PCTransportManager { private loggerOptions: LoggerOptions; + private _mode: PCMode; + + get mode(): PCMode { + return this._mode; + } + constructor(rtcConfig: RTCConfiguration, mode: PCMode, loggerOptions: LoggerOptions) { this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.PCManager); this.loggerOptions = loggerOptions; @@ -73,6 +79,7 @@ export class PCTransportManager { this.isPublisherConnectionRequired = mode !== 'subscriber-primary'; this.isSubscriberConnectionRequired = mode === 'subscriber-primary'; this.publisher = new PCTransport(rtcConfig, loggerOptions); + this._mode = mode; if (mode !== 'publisher-only') { this.subscriber = new PCTransport(rtcConfig, loggerOptions); this.subscriber.onConnectionStateChange = this.updateState; diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 99313a0c34..25d1ed569a 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -266,6 +266,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit token: string, opts: SignalOptions, abortSignal?: AbortSignal, + /** setting this to true results in dual peer connection mode being used */ + forceV0Path?: boolean, ): Promise { this.url = url; this.token = token; @@ -275,13 +277,13 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.joinAttempts += 1; this.setupSignalClientCallbacks(); - const joinResponse = await this.client.join(url, token, opts, abortSignal); + const joinResponse = await this.client.join(url, token, opts, abortSignal, forceV0Path); this._isClosed = false; this.latestJoinResponse = joinResponse; this.subscriberPrimary = joinResponse.subscriberPrimary; if (!this.pcManager) { - await this.configure(joinResponse); + await this.configure(joinResponse, !forceV0Path); } // create offer @@ -305,6 +307,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (this.joinAttempts < this.maxJoinAttempts) { return this.join(url, token, opts, abortSignal); } + } else if (e.reason === ConnectionErrorReason.ServiceNotFound) { + this.log.warn(`Initial connection failed: ${e.message} – Retrying`); + return this.join(url, token, opts, abortSignal, true); } } throw e; @@ -440,7 +445,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.regionUrlProvider = provider; } - private async configure(joinResponse: JoinResponse) { + private async configure(joinResponse: JoinResponse, useSinglePeerConnection: boolean) { // already configured if (this.pcManager && this.pcManager.currentState !== PCTransportState.NEW) { return; @@ -452,7 +457,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.pcManager = new PCTransportManager( rtcConfig, - this.options.singlePeerConnection + useSinglePeerConnection ? 'publisher-only' : joinResponse.subscriberPrimary ? 'subscriber-primary' @@ -1593,32 +1598,34 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.client.sendSyncState( new SyncState({ - answer: this.options.singlePeerConnection - ? previousPublisherAnswer - ? toProtoSessionDescription({ - sdp: previousPublisherAnswer.sdp, - type: previousPublisherAnswer.type, - }) - : undefined - : previousSubscriberAnswer - ? toProtoSessionDescription({ - sdp: previousSubscriberAnswer.sdp, - type: previousSubscriberAnswer.type, - }) - : undefined, - offer: this.options.singlePeerConnection - ? previousPublisherOffer - ? toProtoSessionDescription({ - sdp: previousPublisherOffer.sdp, - type: previousPublisherOffer.type, - }) - : undefined - : previousSubscriberOffer - ? toProtoSessionDescription({ - sdp: previousSubscriberOffer.sdp, - type: previousSubscriberOffer.type, - }) - : undefined, + answer: + this.pcManager.mode === 'publisher-only' + ? previousPublisherAnswer + ? toProtoSessionDescription({ + sdp: previousPublisherAnswer.sdp, + type: previousPublisherAnswer.type, + }) + : undefined + : previousSubscriberAnswer + ? toProtoSessionDescription({ + sdp: previousSubscriberAnswer.sdp, + type: previousSubscriberAnswer.type, + }) + : undefined, + offer: + this.pcManager.mode === 'publisher-only' + ? previousPublisherOffer + ? toProtoSessionDescription({ + sdp: previousPublisherOffer.sdp, + type: previousPublisherOffer.type, + }) + : undefined + : previousSubscriberOffer + ? toProtoSessionDescription({ + sdp: previousSubscriberOffer.sdp, + type: previousSubscriberOffer.type, + }) + : undefined, subscription: new UpdateSubscription({ trackSids, subscribe: !autoSubscribe, diff --git a/src/room/Room.ts b/src/room/Room.ts index 9af84f0a86..d66f6cf3f0 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -792,9 +792,9 @@ class Room extends (EventEmitter as new () => TypedEmitter) maxRetries: connectOptions.maxRetries, e2eeEnabled: !!this.e2eeManager, websocketTimeout: connectOptions.websocketTimeout, - singlePeerConnection: roomOptions.singlePeerConnection, }, abortController.signal, + !roomOptions.singlePeerConnection, ); let serverInfo: Partial | undefined = joinResponse.serverInfo; diff --git a/src/room/defaults.ts b/src/room/defaults.ts index 9e35d104c0..68e95ffb42 100644 --- a/src/room/defaults.ts +++ b/src/room/defaults.ts @@ -42,7 +42,7 @@ export const roomOptionDefaults: InternalRoomOptions = { reconnectPolicy: new DefaultReconnectPolicy(), disconnectOnPageLeave: true, webAudioMix: false, - singlePeerConnection: false, + singlePeerConnection: true, } as const; export const roomConnectOptionDefaults: InternalRoomConnectOptions = { diff --git a/src/room/errors.ts b/src/room/errors.ts index e42fd0e783..4e1ba64bd3 100644 --- a/src/room/errors.ts +++ b/src/room/errors.ts @@ -26,6 +26,7 @@ export enum ConnectionErrorReason { LeaveRequest, Timeout, WebSocket, + ServiceNotFound, } type NotAllowed = { @@ -70,6 +71,12 @@ type WebSocket = { context?: string; }; +type ServiceNotFound = { + reason: ConnectionErrorReason.ServiceNotFound; + status: never; + context: string; +}; + type ConnectionErrorVariants = | NotAllowed | ConnectionTimeout @@ -77,7 +84,8 @@ type ConnectionErrorVariants = | InternalError | Cancelled | ServerUnreachable - | WebSocket; + | WebSocket + | ServiceNotFound; export class ConnectionError< Variant extends ConnectionErrorVariants = ConnectionErrorVariants, @@ -151,6 +159,15 @@ export class ConnectionError< static websocket(message: string, status?: number, reason?: string) { return new ConnectionError(message, ConnectionErrorReason.WebSocket, status, reason); } + + static serviceNotFound(message: string, serviceName: 'v0-rtc') { + return new ConnectionError( + message, + ConnectionErrorReason.ServiceNotFound, + undefined, + serviceName, + ); + } } export class DeviceUnsupportedError extends LivekitError {