diff --git a/src/main.tsx b/src/main.tsx index e6a102c6a..e41aaff8d 100644 --- a/src/main.tsx +++ b/src/main.tsx @@ -24,6 +24,7 @@ import { App } from "./App"; import { init as initRageshake } from "./settings/rageshake"; import { Initializer } from "./initializer"; import { AppViewModel } from "./state/AppViewModel"; +import { globalScope } from "./state/ObservableScope"; window.setLKLogLevel = setLKLogLevel; @@ -61,7 +62,7 @@ Initializer.initBeforeReact() .then(() => { root.render( - , + , , ); }) diff --git a/src/reactions/ReactionsReader.test.tsx b/src/reactions/ReactionsReader.test.tsx index 76ea45be1..dd82a718c 100644 --- a/src/reactions/ReactionsReader.test.tsx +++ b/src/reactions/ReactionsReader.test.tsx @@ -23,7 +23,7 @@ import { localRtcMember, } from "../utils/test-fixtures"; import { getBasicRTCSession } from "../utils/test-viewmodel"; -import { withTestScheduler } from "../utils/test"; +import { testScope, withTestScheduler } from "../utils/test"; import { ElementCallReactionEventType, ReactionSet } from "."; afterEach(() => { @@ -37,6 +37,7 @@ test("handles a hand raised reaction", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { const { raisedHands$ } = new ReactionsReader( + testScope(), rtcSession.asMockedSession(), ); schedule("ab", { @@ -85,6 +86,7 @@ test("handles a redaction", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { const { raisedHands$ } = new ReactionsReader( + testScope(), rtcSession.asMockedSession(), ); schedule("abc", { @@ -148,6 +150,7 @@ test("handles waiting for event decryption", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { const { raisedHands$ } = new ReactionsReader( + testScope(), rtcSession.asMockedSession(), ); schedule("abc", { @@ -217,6 +220,7 @@ test("hands rejecting events without a proper membership", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { const { raisedHands$ } = new ReactionsReader( + testScope(), rtcSession.asMockedSession(), ); schedule("ab", { @@ -261,7 +265,10 @@ test("handles a reaction", () => { withTestScheduler(({ schedule, time, expectObservable }) => { renderHook(() => { - const { reactions$ } = new ReactionsReader(rtcSession.asMockedSession()); + const { reactions$ } = new ReactionsReader( + testScope(), + rtcSession.asMockedSession(), + ); schedule(`abc`, { a: () => {}, b: () => { @@ -317,7 +324,10 @@ test("ignores bad reaction events", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { - const { reactions$ } = new ReactionsReader(rtcSession.asMockedSession()); + const { reactions$ } = new ReactionsReader( + testScope(), + rtcSession.asMockedSession(), + ); schedule("ab", { a: () => {}, b: () => { @@ -439,7 +449,10 @@ test("that reactions cannot be spammed", () => { withTestScheduler(({ schedule, expectObservable }) => { renderHook(() => { - const { reactions$ } = new ReactionsReader(rtcSession.asMockedSession()); + const { reactions$ } = new ReactionsReader( + testScope(), + rtcSession.asMockedSession(), + ); schedule("abcd", { a: () => {}, b: () => { diff --git a/src/reactions/ReactionsReader.ts b/src/reactions/ReactionsReader.ts index 9e30eff5f..c1f78b51f 100644 --- a/src/reactions/ReactionsReader.ts +++ b/src/reactions/ReactionsReader.ts @@ -18,7 +18,7 @@ import { EventType, RoomEvent as MatrixRoomEvent, } from "matrix-js-sdk"; -import { BehaviorSubject, delay, type Subscription } from "rxjs"; +import { BehaviorSubject, delay } from "rxjs"; import { ElementCallReactionEventType, @@ -28,6 +28,7 @@ import { type RaisedHandInfo, type ReactionInfo, } from "."; +import { type ObservableScope } from "../state/ObservableScope"; export const REACTION_ACTIVE_TIME_MS = 3000; @@ -54,12 +55,13 @@ export class ReactionsReader { */ public readonly reactions$ = this.reactionsSubject$.asObservable(); - private readonly reactionsSub: Subscription; - - public constructor(private readonly rtcSession: MatrixRTCSession) { + public constructor( + private readonly scope: ObservableScope, + private readonly rtcSession: MatrixRTCSession, + ) { // Hide reactions after a given time. - this.reactionsSub = this.reactionsSubject$ - .pipe(delay(REACTION_ACTIVE_TIME_MS)) + this.reactionsSubject$ + .pipe(delay(REACTION_ACTIVE_TIME_MS), this.scope.bind()) .subscribe((reactions) => { const date = new Date(); const nextEntries = Object.fromEntries( @@ -71,15 +73,38 @@ export class ReactionsReader { this.reactionsSubject$.next(nextEntries); }); + // TODO: Convert this class to the functional reactive style and get rid of + // all this manual setup and teardown for event listeners + this.rtcSession.room.on(MatrixRoomEvent.Timeline, this.handleReactionEvent); + this.scope.onEnd(() => + this.rtcSession.room.off( + MatrixRoomEvent.Timeline, + this.handleReactionEvent, + ), + ); + this.rtcSession.room.on( MatrixRoomEvent.Redaction, this.handleReactionEvent, ); + this.scope.onEnd(() => + this.rtcSession.room.off( + MatrixRoomEvent.Redaction, + this.handleReactionEvent, + ), + ); + this.rtcSession.room.client.on( MatrixEventEvent.Decrypted, this.handleReactionEvent, ); + this.scope.onEnd(() => + this.rtcSession.room.client.off( + MatrixEventEvent.Decrypted, + this.handleReactionEvent, + ), + ); // We listen for a local echo to get the real event ID, as timeline events // may still be sending. @@ -87,11 +112,23 @@ export class ReactionsReader { MatrixRoomEvent.LocalEchoUpdated, this.handleReactionEvent, ); + this.scope.onEnd(() => + this.rtcSession.room.off( + MatrixRoomEvent.LocalEchoUpdated, + this.handleReactionEvent, + ), + ); - rtcSession.on( + this.rtcSession.on( MatrixRTCSessionEvent.MembershipsChanged, this.onMembershipsChanged, ); + this.scope.onEnd(() => + this.rtcSession.off( + MatrixRTCSessionEvent.MembershipsChanged, + this.onMembershipsChanged, + ), + ); // Run this once to ensure we have fetched the state from the call. this.onMembershipsChanged([]); @@ -309,31 +346,4 @@ export class ReactionsReader { this.removeRaisedHand(targetUser); } }; - - /** - * Stop listening for events. - */ - public destroy(): void { - this.rtcSession.off( - MatrixRTCSessionEvent.MembershipsChanged, - this.onMembershipsChanged, - ); - this.rtcSession.room.off( - MatrixRoomEvent.Timeline, - this.handleReactionEvent, - ); - this.rtcSession.room.off( - MatrixRoomEvent.Redaction, - this.handleReactionEvent, - ); - this.rtcSession.room.client.off( - MatrixEventEvent.Decrypted, - this.handleReactionEvent, - ); - this.rtcSession.room.off( - MatrixRoomEvent.LocalEchoUpdated, - this.handleReactionEvent, - ); - this.reactionsSub.unsubscribe(); - } } diff --git a/src/room/InCallView.tsx b/src/room/InCallView.tsx index 6b07daccb..bca7c9640 100644 --- a/src/room/InCallView.tsx +++ b/src/room/InCallView.tsx @@ -110,6 +110,7 @@ import ringtoneMp3 from "../sound/ringtone.mp3?url"; import ringtoneOgg from "../sound/ringtone.ogg?url"; import { useTrackProcessorObservable$ } from "../livekit/TrackProcessorContext.tsx"; import { type Layout } from "../state/layout-types.ts"; +import { ObservableScope } from "../state/ObservableScope.ts"; const maxTapDurationMs = 400; @@ -129,8 +130,10 @@ export const ActiveCall: FC = (props) => { const trackProcessorState$ = useTrackProcessorObservable$(); useEffect(() => { - const reactionsReader = new ReactionsReader(props.rtcSession); + const scope = new ObservableScope(); + const reactionsReader = new ReactionsReader(scope, props.rtcSession); const vm = new CallViewModel( + scope, props.rtcSession, props.matrixRoom, mediaDevices, @@ -146,11 +149,9 @@ export const ActiveCall: FC = (props) => { ); setVm(vm); - const sub = vm.leave$.subscribe(props.onLeft); + vm.leave$.pipe(scope.bind()).subscribe(props.onLeft); return (): void => { - vm.destroy(); - sub.unsubscribe(); - reactionsReader.destroy(); + scope.end(); }; }, [ props.rtcSession, diff --git a/src/room/MuteStates.test.tsx b/src/room/MuteStates.test.tsx index d34f4d391..530b5050f 100644 --- a/src/room/MuteStates.test.tsx +++ b/src/room/MuteStates.test.tsx @@ -108,9 +108,7 @@ function mockMediaDevices( throw new Error("Unimplemented"); } }); - const scope = new ObservableScope(); - onTestFinished(() => scope.end()); - return new MediaDevices(scope); + return new MediaDevices(testScope()); } describe("useMuteStates VITE_PACKAGE='full' (SPA) mode", () => { diff --git a/src/state/AppViewModel.ts b/src/state/AppViewModel.ts index 5f65c226c..7ad91e9dc 100644 --- a/src/state/AppViewModel.ts +++ b/src/state/AppViewModel.ts @@ -6,14 +6,16 @@ Please see LICENSE in the repository root for full details. */ import { MediaDevices } from "./MediaDevices"; -import { ViewModel } from "./ViewModel"; +import { type ObservableScope } from "./ObservableScope"; /** * The top-level state holder for the application. */ -export class AppViewModel extends ViewModel { +export class AppViewModel { public readonly mediaDevices = new MediaDevices(this.scope); // TODO: Move more application logic here. The CallViewModel, at the very // least, ought to be accessible from this object. + + public constructor(private readonly scope: ObservableScope) {} } diff --git a/src/state/CallViewModel.test.ts b/src/state/CallViewModel.test.ts index 52d13ca4c..035f545a3 100644 --- a/src/state/CallViewModel.test.ts +++ b/src/state/CallViewModel.test.ts @@ -60,6 +60,7 @@ import { mockMediaDevices, mockMuteStates, mockConfig, + testScope, } from "../utils/test"; import { ECAddonConnectionState, @@ -89,7 +90,6 @@ import { localRtcMember, localRtcMemberDevice2, } from "../utils/test-fixtures"; -import { ObservableScope } from "./ObservableScope"; import { MediaDevices } from "./MediaDevices"; import { getValue } from "../utils/observable"; import { type Behavior, constant } from "./Behavior"; @@ -347,6 +347,7 @@ function withCallViewModel( const reactions$ = new BehaviorSubject>({}); const vm = new CallViewModel( + testScope(), rtcSession.asMockedSession(), room, mediaDevices, @@ -361,7 +362,6 @@ function withCallViewModel( ); onTestFinished(() => { - vm!.destroy(); participantsSpy!.mockRestore(); mediaSpy!.mockRestore(); eventsSpy!.mockRestore(); @@ -402,6 +402,7 @@ test("test missing RTC config error", async () => { vi.spyOn(AutoDiscovery, "getRawClientConfig").mockResolvedValue({}); const callVM = new CallViewModel( + testScope(), fakeRtcSession.asMockedSession(), matrixRoom, mockMediaDevices({}), @@ -1630,9 +1631,7 @@ test("audio output changes when toggling earpiece mode", () => { getUrlParams.mockReturnValue({ controlledAudioDevices: true }); vi.mocked(ComponentsCore.createMediaDeviceObserver).mockReturnValue(of([])); - const scope = new ObservableScope(); - onTestFinished(() => scope.end()); - const devices = new MediaDevices(scope); + const devices = new MediaDevices(testScope()); window.controls.setAvailableAudioDevices([ { id: "speaker", name: "Speaker", isSpeaker: true }, diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 6b046b28b..0868b4749 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -73,7 +73,6 @@ import { } from "matrix-js-sdk/lib/matrixrtc"; import { type IWidgetApiRequest } from "matrix-widget-api"; -import { ViewModel } from "./ViewModel"; import { LocalUserMediaViewModel, type MediaViewModel, @@ -84,7 +83,7 @@ import { import { accumulate, and$, - finalizeValue, + generateKeyed$, pauseWhen, } from "../utils/observable"; import { @@ -117,11 +116,7 @@ import { } from "../rtcSessionHelpers"; import { E2eeType } from "../e2ee/e2eeType"; import { MatrixKeyProvider } from "../e2ee/matrixKeyProvider"; -import { - type Connection, - type ConnectionOpts, - RemoteConnection, -} from "./Connection"; +import { type Connection, RemoteConnection } from "./Connection"; import { type MuteStates } from "./MuteStates"; import { getUrlParams } from "../UrlParams"; import { type ProcessorState } from "../livekit/TrackProcessorContext"; @@ -176,7 +171,15 @@ interface LayoutScanState { type MediaItem = UserMedia | ScreenShare; -export class CallViewModel extends ViewModel { +/** + * A view model providing all the application logic needed to show the in-call + * UI (may eventually be expanded to cover the lobby and feedback screens in the + * future). + */ +// Throughout this class and related code we must distinguish between MatrixRTC +// state and LiveKit state. We use the common terminology of room "members", RTC +// "memberships", and LiveKit "participants". +export class CallViewModel { private readonly urlParams = getUrlParams(); private readonly livekitAlias = getLivekitAlias(this.matrixRTCSession); @@ -370,26 +373,36 @@ export class CallViewModel extends ViewModel { */ private readonly localConnection$: Behavior | null> = this.scope.behavior( - this.localTransport$.pipe( - map( - (transport) => - transport && - mapAsync(transport, (transport) => { - const opts: ConnectionOpts = { - transport, - client: this.matrixRTCSession.room.client, - scope: this.scope, - remoteTransports$: this.remoteTransports$, - }; - return new PublishConnection( - opts, - this.mediaDevices, - this.muteStates, - this.e2eeLivekitOptions(), - this.scope.behavior(this.trackProcessorState$), - ); - }), - ), + generateKeyed$< + Async | null, + PublishConnection, + Async | null + >( + this.localTransport$, + (transport, createOrGet) => + transport && + mapAsync(transport, (transport) => + createOrGet( + // Stable key that uniquely idenifies the transport + JSON.stringify({ + url: transport.livekit_service_url, + alias: transport.livekit_alias, + }), + (scope) => + new PublishConnection( + { + transport, + client: this.matrixRoom.client, + scope, + remoteTransports$: this.remoteTransports$, + }, + this.mediaDevices, + this.muteStates, + this.e2eeLivekitOptions(), + this.scope.behavior(this.trackProcessorState$), + ), + ), + ), ), ); @@ -416,61 +429,47 @@ export class CallViewModel extends ViewModel { * is *distinct* from the local transport. */ private readonly remoteConnections$ = this.scope.behavior( - this.transports$.pipe( - accumulate(new Map(), (prev, transports) => { - const next = new Map(); + generateKeyed$( + this.transports$, + (transports, createOrGet) => { + const connections: Connection[] = []; // Until the local transport becomes ready we have no idea which // transports will actually need a dedicated remote connection if (transports?.local.state === "ready") { - const oldestMembership = this.matrixRTCSession.getOldestMembership(); + // TODO: Handle custom transport.livekit_alias values here const localServiceUrl = transports.local.value.livekit_service_url; const remoteServiceUrls = new Set( - transports.remote.flatMap(({ membership, transport }) => { - const t = membership.getTransport(oldestMembership ?? membership); - return t && - isLivekitTransport(t) && - t.livekit_service_url !== localServiceUrl - ? [t.livekit_service_url] - : []; - }), + transports.remote.map( + ({ transport }) => transport.livekit_service_url, + ), ); + remoteServiceUrls.delete(localServiceUrl); - for (const remoteServiceUrl of remoteServiceUrls) { - let nextConnection = prev.get(remoteServiceUrl); - if (!nextConnection) { - logger.log( - "SFU remoteConnections$ construct new connection: ", + for (const remoteServiceUrl of remoteServiceUrls) + connections.push( + createOrGet( remoteServiceUrl, - ); - - const args: ConnectionOpts = { - transport: { - type: "livekit", - livekit_service_url: remoteServiceUrl, - livekit_alias: this.livekitAlias, - }, - client: this.matrixRTCSession.room.client, - scope: this.scope, - remoteTransports$: this.remoteTransports$, - }; - nextConnection = new RemoteConnection( - args, - this.e2eeLivekitOptions(), - ); - } else { - logger.log( - "SFU remoteConnections$ use prev connection: ", - remoteServiceUrl, - ); - } - next.set(remoteServiceUrl, nextConnection); - } + (scope) => + new RemoteConnection( + { + transport: { + type: "livekit", + livekit_service_url: remoteServiceUrl, + livekit_alias: this.livekitAlias, + }, + client: this.matrixRoom.client, + scope, + remoteTransports$: this.remoteTransports$, + }, + this.e2eeLivekitOptions(), + ), + ), + ); } - return next; - }), - map((transports) => [...transports.values()]), + return connections; + }, ), ); @@ -755,80 +754,78 @@ export class CallViewModel extends ViewModel { ); /** - * List of MediaItems that we want to display + * List of MediaItems that we want to have tiles for. */ private readonly mediaItems$ = this.scope.behavior( - combineLatest([this.participantsByRoom$, duplicateTiles.value$]).pipe( - scan((prevItems, [participantsByRoom, duplicateTiles]) => { - const newItems: Map = new Map( - function* (this: CallViewModel): Iterable<[string, MediaItem]> { - for (const { - livekitRoom, - participants, - url, - } of participantsByRoom) { - for (const { id, participant, member } of participants) { - for (let i = 0; i < 1 + duplicateTiles; i++) { - const mediaId = `${id}:${i}`; - const prevMedia = prevItems.get(mediaId); - if (prevMedia instanceof UserMedia) - prevMedia.updateParticipant(participant); - - yield [ + generateKeyed$< + [typeof this.participantsByRoom$.value, number], + MediaItem, + MediaItem[] + >( + // Generate a collection of MediaItems from the list of expected (whether + // present or missing) LiveKit participants. + combineLatest([this.participantsByRoom$, duplicateTiles.value$]), + ([participantsByRoom, duplicateTiles], createOrGet) => { + const items: MediaItem[] = []; + + for (const { livekitRoom, participants, url } of participantsByRoom) { + for (const { id, participant, member } of participants) { + for (let i = 0; i < 1 + duplicateTiles; i++) { + const mediaId = `${id}:${i}`; + const item = createOrGet( + mediaId, + (scope) => + // We create UserMedia with or without a participant. + // This will be the initial value of a BehaviourSubject. + // Once a participant appears we will update the BehaviourSubject. (see below) + new UserMedia( + scope, mediaId, - // We create UserMedia with or without a participant. - // This will be the initial value of a BehaviourSubject. - // Once a participant appears we will update the BehaviourSubject. (see above) - prevMedia ?? - new UserMedia( - mediaId, + member, + participant, + this.options.encryptionSystem, + livekitRoom, + url, + this.mediaDevices, + this.pretendToBeDisconnected$, + this.memberDisplaynames$.pipe( + map((m) => m.get(id) ?? "[👻]"), + ), + this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)), + this.reactions$.pipe(map((v) => v[id] ?? undefined)), + ), + ); + items.push(item); + (item as UserMedia).updateParticipant(participant); + + if (participant?.isScreenShareEnabled) { + const screenShareId = `${mediaId}:screen-share`; + items.push( + createOrGet( + screenShareId, + (scope) => + new ScreenShare( + scope, + screenShareId, member, participant, this.options.encryptionSystem, livekitRoom, url, - this.mediaDevices, this.pretendToBeDisconnected$, this.memberDisplaynames$.pipe( map((m) => m.get(id) ?? "[👻]"), ), - this.handsRaised$.pipe(map((v) => v[id]?.time ?? null)), - this.reactions$.pipe(map((v) => v[id] ?? undefined)), ), - ]; - - if (participant?.isScreenShareEnabled) { - const screenShareId = `${mediaId}:screen-share`; - yield [ - screenShareId, - prevItems.get(screenShareId) ?? - new ScreenShare( - screenShareId, - member, - participant, - this.options.encryptionSystem, - livekitRoom, - url, - this.pretendToBeDisconnected$, - this.memberDisplaynames$.pipe( - map((m) => m.get(id) ?? "[👻]"), - ), - ), - ]; - } - } + ), + ); } } - }.bind(this)(), - ); + } + } - for (const [id, t] of prevItems) if (!newItems.has(id)) t.destroy(); - return newItems; - }, new Map()), - map((mediaItems) => [...mediaItems.values()]), - finalizeValue((ts) => { - for (const t of ts) t.destroy(); - }), + return items; + }, ), ); @@ -1739,6 +1736,7 @@ export class CallViewModel extends ViewModel { : null; public constructor( + private readonly scope: ObservableScope, // A call is permanently tied to a single Matrix room private readonly matrixRTCSession: MatrixRTCSession, private readonly matrixRoom: MatrixRoom, @@ -1753,8 +1751,6 @@ export class CallViewModel extends ViewModel { >, private readonly trackProcessorState$: Observable, ) { - super(); - // Start and stop local and remote connections as needed this.connectionInstructions$ .pipe(this.scope.bind()) diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 7b044e1d5..17a5c4c79 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -21,6 +21,7 @@ import { type CallMembership, type LivekitTransport, } from "matrix-js-sdk/lib/matrixrtc"; +import { logger } from "matrix-js-sdk/lib/logger"; import { BehaviorSubject, combineLatest, type Observable } from "rxjs"; import { @@ -218,6 +219,9 @@ export class Connection { public readonly livekitRoom: LivekitRoom, opts: ConnectionOpts, ) { + logger.log( + `[Connection] Creating new connection to ${opts.transport.livekit_service_url} ${opts.transport.livekit_alias}`, + ); const { transport, client, scope, remoteTransports$ } = opts; this.transport = transport; diff --git a/src/state/MediaViewModel.test.ts b/src/state/MediaViewModel.test.ts index 8b186658f..61fa2d8c0 100644 --- a/src/state/MediaViewModel.test.ts +++ b/src/state/MediaViewModel.test.ts @@ -17,8 +17,8 @@ import { mockLocalParticipant, mockMediaDevices, mockRtcMembership, - withLocalMedia, - withRemoteMedia, + createLocalMedia, + createRemoteMedia, withTestScheduler, } from "../utils/test"; import { getValue } from "../utils/observable"; @@ -42,92 +42,89 @@ vi.mock("../Platform", () => ({ const rtcMembership = mockRtcMembership("@alice:example.org", "AAAA"); -test("control a participant's volume", async () => { +test("control a participant's volume", () => { const setVolumeSpy = vi.fn(); - await withRemoteMedia(rtcMembership, {}, { setVolume: setVolumeSpy }, (vm) => - withTestScheduler(({ expectObservable, schedule }) => { - schedule("-ab---c---d|", { - a() { - // Try muting by toggling - vm.toggleLocallyMuted(); - expect(setVolumeSpy).toHaveBeenLastCalledWith(0); - }, - b() { - // Try unmuting by dragging the slider back up - vm.setLocalVolume(0.6); - vm.setLocalVolume(0.8); - vm.commitLocalVolume(); - expect(setVolumeSpy).toHaveBeenCalledWith(0.6); - expect(setVolumeSpy).toHaveBeenLastCalledWith(0.8); - }, - c() { - // Try muting by dragging the slider back down - vm.setLocalVolume(0.2); - vm.setLocalVolume(0); - vm.commitLocalVolume(); - expect(setVolumeSpy).toHaveBeenCalledWith(0.2); - expect(setVolumeSpy).toHaveBeenLastCalledWith(0); - }, - d() { - // Try unmuting by toggling - vm.toggleLocallyMuted(); - // The volume should return to the last non-zero committed volume - expect(setVolumeSpy).toHaveBeenLastCalledWith(0.8); - }, - }); - expectObservable(vm.localVolume$).toBe("ab(cd)(ef)g", { - a: 1, - b: 0, - c: 0.6, - d: 0.8, - e: 0.2, - f: 0, - g: 0.8, - }); - }), - ); + const vm = createRemoteMedia(rtcMembership, {}, { setVolume: setVolumeSpy }); + withTestScheduler(({ expectObservable, schedule }) => { + schedule("-ab---c---d|", { + a() { + // Try muting by toggling + vm.toggleLocallyMuted(); + expect(setVolumeSpy).toHaveBeenLastCalledWith(0); + }, + b() { + // Try unmuting by dragging the slider back up + vm.setLocalVolume(0.6); + vm.setLocalVolume(0.8); + vm.commitLocalVolume(); + expect(setVolumeSpy).toHaveBeenCalledWith(0.6); + expect(setVolumeSpy).toHaveBeenLastCalledWith(0.8); + }, + c() { + // Try muting by dragging the slider back down + vm.setLocalVolume(0.2); + vm.setLocalVolume(0); + vm.commitLocalVolume(); + expect(setVolumeSpy).toHaveBeenCalledWith(0.2); + expect(setVolumeSpy).toHaveBeenLastCalledWith(0); + }, + d() { + // Try unmuting by toggling + vm.toggleLocallyMuted(); + // The volume should return to the last non-zero committed volume + expect(setVolumeSpy).toHaveBeenLastCalledWith(0.8); + }, + }); + expectObservable(vm.localVolume$).toBe("ab(cd)(ef)g", { + a: 1, + b: 0, + c: 0.6, + d: 0.8, + e: 0.2, + f: 0, + g: 0.8, + }); + }); }); -test("toggle fit/contain for a participant's video", async () => { - await withRemoteMedia(rtcMembership, {}, {}, (vm) => - withTestScheduler(({ expectObservable, schedule }) => { - schedule("-ab|", { - a: () => vm.toggleFitContain(), - b: () => vm.toggleFitContain(), - }); - expectObservable(vm.cropVideo$).toBe("abc", { - a: true, - b: false, - c: true, - }); - }), - ); +test("toggle fit/contain for a participant's video", () => { + const vm = createRemoteMedia(rtcMembership, {}, {}); + withTestScheduler(({ expectObservable, schedule }) => { + schedule("-ab|", { + a: () => vm.toggleFitContain(), + b: () => vm.toggleFitContain(), + }); + expectObservable(vm.cropVideo$).toBe("abc", { + a: true, + b: false, + c: true, + }); + }); }); -test("local media remembers whether it should always be shown", async () => { - await withLocalMedia( +test("local media remembers whether it should always be shown", () => { + const vm1 = createLocalMedia( rtcMembership, {}, mockLocalParticipant({}), mockMediaDevices({}), - (vm) => - withTestScheduler(({ expectObservable, schedule }) => { - schedule("-a|", { a: () => vm.setAlwaysShow(false) }); - expectObservable(vm.alwaysShow$).toBe("ab", { a: true, b: false }); - }), ); + withTestScheduler(({ expectObservable, schedule }) => { + schedule("-a|", { a: () => vm1.setAlwaysShow(false) }); + expectObservable(vm1.alwaysShow$).toBe("ab", { a: true, b: false }); + }); + // Next local media should start out *not* always shown - await withLocalMedia( + const vm2 = createLocalMedia( rtcMembership, {}, mockLocalParticipant({}), mockMediaDevices({}), - (vm) => - withTestScheduler(({ expectObservable, schedule }) => { - schedule("-a|", { a: () => vm.setAlwaysShow(true) }); - expectObservable(vm.alwaysShow$).toBe("ab", { a: false, b: true }); - }), ); + withTestScheduler(({ expectObservable, schedule }) => { + schedule("-a|", { a: () => vm2.setAlwaysShow(true) }); + expectObservable(vm2.alwaysShow$).toBe("ab", { a: false, b: true }); + }); }); test("switch cameras", async () => { @@ -164,7 +161,7 @@ test("switch cameras", async () => { const selectVideoInput = vi.fn(); - await withLocalMedia( + const vm = createLocalMedia( rtcMembership, {}, mockLocalParticipant({ @@ -179,27 +176,26 @@ test("switch cameras", async () => { select: selectVideoInput, }, }), - async (vm) => { - // Switch to back camera - getValue(vm.switchCamera$)!(); - expect(restartTrack).toHaveBeenCalledExactlyOnceWith({ - facingMode: "environment", - }); - await waitFor(() => { - expect(selectVideoInput).toHaveBeenCalledTimes(1); - expect(selectVideoInput).toHaveBeenCalledWith("back camera"); - }); - expect(deviceId).toBe("back camera"); - - // Switch to front camera - getValue(vm.switchCamera$)!(); - expect(restartTrack).toHaveBeenCalledTimes(2); - expect(restartTrack).toHaveBeenLastCalledWith({ facingMode: "user" }); - await waitFor(() => { - expect(selectVideoInput).toHaveBeenCalledTimes(2); - expect(selectVideoInput).toHaveBeenLastCalledWith("front camera"); - }); - expect(deviceId).toBe("front camera"); - }, ); + + // Switch to back camera + getValue(vm.switchCamera$)!(); + expect(restartTrack).toHaveBeenCalledExactlyOnceWith({ + facingMode: "environment", + }); + await waitFor(() => { + expect(selectVideoInput).toHaveBeenCalledTimes(1); + expect(selectVideoInput).toHaveBeenCalledWith("back camera"); + }); + expect(deviceId).toBe("back camera"); + + // Switch to front camera + getValue(vm.switchCamera$)!(); + expect(restartTrack).toHaveBeenCalledTimes(2); + expect(restartTrack).toHaveBeenLastCalledWith({ facingMode: "user" }); + await waitFor(() => { + expect(selectVideoInput).toHaveBeenCalledTimes(2); + expect(selectVideoInput).toHaveBeenLastCalledWith("front camera"); + }); + expect(deviceId).toBe("front camera"); }); diff --git a/src/state/MediaViewModel.ts b/src/state/MediaViewModel.ts index de09e24d5..0b79183e6 100644 --- a/src/state/MediaViewModel.ts +++ b/src/state/MediaViewModel.ts @@ -46,7 +46,6 @@ import { throttleTime, } from "rxjs"; -import { ViewModel } from "./ViewModel"; import { alwaysShowSelf } from "../settings/settings"; import { showConnectionStats } from "../settings/settings"; import { accumulate } from "../utils/observable"; @@ -56,6 +55,7 @@ import { type ReactionOption } from "../reactions"; import { platform } from "../Platform"; import { type MediaDevices } from "./MediaDevices"; import { type Behavior } from "./Behavior"; +import { type ObservableScope } from "./ObservableScope"; export function observeTrackReference$( participant: Participant, @@ -216,7 +216,7 @@ export enum EncryptionStatus { PasswordInvalid, } -abstract class BaseMediaViewModel extends ViewModel { +abstract class BaseMediaViewModel { /** * The LiveKit video track for this media. */ @@ -246,6 +246,7 @@ abstract class BaseMediaViewModel extends ViewModel { } public constructor( + protected readonly scope: ObservableScope, /** * An opaque identifier for this media. */ @@ -269,8 +270,6 @@ abstract class BaseMediaViewModel extends ViewModel { public readonly focusURL: string, public readonly displayName$: Behavior, ) { - super(); - const audio$ = this.observeTrackReference$(audioSource); this.video$ = this.observeTrackReference$(videoSource); @@ -403,6 +402,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { public readonly cropVideo$: Behavior = this._cropVideo$; public constructor( + scope: ObservableScope, id: string, member: RoomMember, participant$: Observable, @@ -414,6 +414,7 @@ abstract class BaseUserMediaViewModel extends BaseMediaViewModel { public readonly reaction$: Behavior, ) { super( + scope, id, member, participant$, @@ -537,6 +538,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { ); public constructor( + scope: ObservableScope, id: string, member: RoomMember, participant$: Behavior, @@ -549,6 +551,7 @@ export class LocalUserMediaViewModel extends BaseUserMediaViewModel { reaction$: Behavior, ) { super( + scope, id, member, participant$, @@ -645,6 +648,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { ); public constructor( + scope: ObservableScope, id: string, member: RoomMember, participant$: Observable, @@ -657,6 +661,7 @@ export class RemoteUserMediaViewModel extends BaseUserMediaViewModel { reaction$: Behavior, ) { super( + scope, id, member, participant$, @@ -742,6 +747,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel { ); public constructor( + scope: ObservableScope, id: string, member: RoomMember, participant$: Observable, @@ -753,6 +759,7 @@ export class ScreenShareViewModel extends BaseMediaViewModel { public readonly local: boolean, ) { super( + scope, id, member, participant$, diff --git a/src/state/PublishConnection.ts b/src/state/PublishConnection.ts index 1bb792111..3f01073fe 100644 --- a/src/state/PublishConnection.ts +++ b/src/state/PublishConnection.ts @@ -58,7 +58,7 @@ export class PublishConnection extends Connection { trackerProcessorState$: Behavior, ) { const { scope } = args; - logger.info("[LivekitRoom] Create LiveKit room"); + logger.info("[PublishConnection] Create LiveKit room"); const { controlledAudioDevices } = getUrlParams(); const factory = diff --git a/src/state/ScreenShare.ts b/src/state/ScreenShare.ts index e6fa81ecf..9803a5f40 100644 --- a/src/state/ScreenShare.ts +++ b/src/state/ScreenShare.ts @@ -11,7 +11,7 @@ import { type Room as LivekitRoom, } from "livekit-client"; -import { ObservableScope } from "./ObservableScope.ts"; +import { type ObservableScope } from "./ObservableScope.ts"; import { ScreenShareViewModel } from "./MediaViewModel.ts"; import type { RoomMember } from "matrix-js-sdk"; import type { EncryptionSystem } from "../e2ee/sharedKeyManagement.ts"; @@ -23,10 +23,10 @@ import type { Behavior } from "./Behavior.ts"; * ObservableScope for behaviors that the view model depends on. */ export class ScreenShare { - private readonly scope = new ObservableScope(); public readonly vm: ScreenShareViewModel; public constructor( + private readonly scope: ObservableScope, id: string, member: RoomMember, participant: LocalParticipant | RemoteParticipant, @@ -37,6 +37,7 @@ export class ScreenShare { displayName$: Observable, ) { this.vm = new ScreenShareViewModel( + this.scope, id, member, of(participant), @@ -48,9 +49,4 @@ export class ScreenShare { participant.isLocal, ); } - - public destroy(): void { - this.scope.end(); - this.vm.destroy(); - } } diff --git a/src/state/TileStore.ts b/src/state/TileStore.ts index 85bf8bc71..04633fb93 100644 --- a/src/state/TileStore.ts +++ b/src/state/TileStore.ts @@ -44,10 +44,6 @@ class SpotlightTileData { this.maximised$ = new BehaviorSubject(maximised); this.vm = new SpotlightTileViewModel(this.media$, this.maximised$); } - - public destroy(): void { - this.vm.destroy(); - } } class GridTileData { @@ -65,14 +61,10 @@ class GridTileData { this.media$ = new BehaviorSubject(media); this.vm = new GridTileViewModel(this.media$); } - - public destroy(): void { - this.vm.destroy(); - } } /** - * A collection of tiles to be mapped to a layout. + * An immutable collection of tiles to be mapped to a layout. */ export class TileStore { private constructor( @@ -288,13 +280,6 @@ export class TileStoreBuilder { ); } - // Destroy unused tiles - if (this.spotlight === null && this.prevSpotlight !== null) - this.prevSpotlight.destroy(); - const gridEntries = new Set(grid); - for (const entry of this.prevGrid) - if (!gridEntries.has(entry)) entry.destroy(); - return this.construct(this.spotlight, grid); } } diff --git a/src/state/TileViewModel.ts b/src/state/TileViewModel.ts index 5e5eb29cc..a645a0d19 100644 --- a/src/state/TileViewModel.ts +++ b/src/state/TileViewModel.ts @@ -5,7 +5,6 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial Please see LICENSE in the repository root for full details. */ -import { ViewModel } from "./ViewModel"; import { type MediaViewModel, type UserMediaViewModel } from "./MediaViewModel"; import { type Behavior } from "./Behavior"; @@ -14,21 +13,17 @@ function createId(): string { return (nextId++).toString(); } -export class GridTileViewModel extends ViewModel { +export class GridTileViewModel { public readonly id = createId(); - public constructor(public readonly media$: Behavior) { - super(); - } + public constructor(public readonly media$: Behavior) {} } -export class SpotlightTileViewModel extends ViewModel { +export class SpotlightTileViewModel { public constructor( public readonly media$: Behavior, public readonly maximised$: Behavior, - ) { - super(); - } + ) {} } export type TileViewModel = GridTileViewModel | SpotlightTileViewModel; diff --git a/src/state/UserMedia.ts b/src/state/UserMedia.ts index 42016f7c4..65bd4e928 100644 --- a/src/state/UserMedia.ts +++ b/src/state/UserMedia.ts @@ -22,7 +22,7 @@ import { } from "livekit-client"; import { observeParticipantEvents } from "@livekit/components-core"; -import { ObservableScope } from "./ObservableScope.ts"; +import { type ObservableScope } from "./ObservableScope.ts"; import { LocalUserMediaViewModel, RemoteUserMediaViewModel, @@ -75,11 +75,11 @@ enum SortingBin { * for inclusion in the call layout. */ export class UserMedia { - private readonly scope = new ObservableScope(); private readonly participant$ = new BehaviorSubject(this.initialParticipant); public readonly vm: UserMediaViewModel = this.participant$.value?.isLocal ? new LocalUserMediaViewModel( + this.scope, this.id, this.member, this.participant$ as Behavior, @@ -92,6 +92,7 @@ export class UserMedia { this.scope.behavior(this.reaction$), ) : new RemoteUserMediaViewModel( + this.scope, this.id, this.member, this.participant$ as Observable, @@ -144,6 +145,7 @@ export class UserMedia { ); public constructor( + private readonly scope: ObservableScope, public readonly id: string, private readonly member: RoomMember, private readonly initialParticipant: @@ -168,11 +170,6 @@ export class UserMedia { this.participant$.next(newParticipant); } } - - public destroy(): void { - this.scope.end(); - this.vm.destroy(); - } } export function sharingScreen$(p: Participant): Observable { diff --git a/src/state/ViewModel.ts b/src/state/ViewModel.ts deleted file mode 100644 index e83ae82be..000000000 --- a/src/state/ViewModel.ts +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2023, 2024 New Vector Ltd. - -SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial -Please see LICENSE in the repository root for full details. -*/ - -import { ObservableScope } from "./ObservableScope"; - -/** - * An MVVM view model. - */ -export abstract class ViewModel { - protected readonly scope = new ObservableScope(); - - /** - * Instructs the ViewModel to clean up its resources. If you forget to call - * this, there may be memory leaks! - */ - public destroy(): void { - this.scope.end(); - } -} diff --git a/src/tile/GridTile.test.tsx b/src/tile/GridTile.test.tsx index 783dcc37e..dd0bc9d67 100644 --- a/src/tile/GridTile.test.tsx +++ b/src/tile/GridTile.test.tsx @@ -12,7 +12,7 @@ import { axe } from "vitest-axe"; import { type MatrixRTCSession } from "matrix-js-sdk/lib/matrixrtc"; import { GridTile } from "./GridTile"; -import { mockRtcMembership, withRemoteMedia } from "../utils/test"; +import { mockRtcMembership, createRemoteMedia } from "../utils/test"; import { GridTileViewModel } from "../state/TileViewModel"; import { ReactionsSenderProvider } from "../reactions/useReactionsSender"; import type { CallViewModel } from "../state/CallViewModel"; @@ -25,7 +25,7 @@ global.IntersectionObserver = class MockIntersectionObserver { } as unknown as typeof IntersectionObserver; test("GridTile is accessible", async () => { - await withRemoteMedia( + const vm = createRemoteMedia( mockRtcMembership("@alice:example.org", "AAAA"), { rawDisplayName: "Alice", @@ -36,41 +36,40 @@ test("GridTile is accessible", async () => { getTrackPublication: () => ({}) as Partial as RemoteTrackPublication, }, - async (vm) => { - const fakeRtcSession = { + ); + + const fakeRtcSession = { + on: () => {}, + off: () => {}, + room: { + on: () => {}, + off: () => {}, + client: { + getUserId: () => null, + getDeviceId: () => null, on: () => {}, off: () => {}, - room: { - on: () => {}, - off: () => {}, - client: { - getUserId: () => null, - getDeviceId: () => null, - on: () => {}, - off: () => {}, - }, - }, - memberships: [], - } as unknown as MatrixRTCSession; - const cVm = { - reactions$: constant({}), - handsRaised$: constant({}), - } as Partial as CallViewModel; - const { container } = render( - - {}} - targetWidth={300} - targetHeight={200} - showSpeakingIndicators - focusable={true} - /> - , - ); - expect(await axe(container)).toHaveNoViolations(); - // Name should be visible - screen.getByText("Alice"); + }, }, + memberships: [], + } as unknown as MatrixRTCSession; + const cVm = { + reactions$: constant({}), + handsRaised$: constant({}), + } as Partial as CallViewModel; + const { container } = render( + + {}} + targetWidth={300} + targetHeight={200} + showSpeakingIndicators + focusable={true} + /> + , ); + expect(await axe(container)).toHaveNoViolations(); + // Name should be visible + screen.getByText("Alice"); }); diff --git a/src/tile/SpotlightTile.test.tsx b/src/tile/SpotlightTile.test.tsx index 4cb0f6c23..fb7008b88 100644 --- a/src/tile/SpotlightTile.test.tsx +++ b/src/tile/SpotlightTile.test.tsx @@ -15,8 +15,8 @@ import { mockLocalParticipant, mockMediaDevices, mockRtcMembership, - withLocalMedia, - withRemoteMedia, + createLocalMedia, + createRemoteMedia, } from "../utils/test"; import { SpotlightTileViewModel } from "../state/TileViewModel"; import { constant } from "../state/Behavior"; @@ -27,62 +27,53 @@ global.IntersectionObserver = class MockIntersectionObserver { } as unknown as typeof IntersectionObserver; test("SpotlightTile is accessible", async () => { - await withRemoteMedia( + const vm1 = createRemoteMedia( mockRtcMembership("@alice:example.org", "AAAA"), { rawDisplayName: "Alice", getMxcAvatarUrl: () => "mxc://adfsg", }, {}, - async (vm1) => { - await withLocalMedia( - mockRtcMembership("@bob:example.org", "BBBB"), - { - rawDisplayName: "Bob", - getMxcAvatarUrl: () => "mxc://dlskf", - }, - mockLocalParticipant({}), - mockMediaDevices({}), - async (vm2) => { - const user = userEvent.setup(); - const toggleExpanded = vi.fn(); - const { container } = render( - , - ); + ); - expect(await axe(container)).toHaveNoViolations(); - // Alice should be in the spotlight, with her name and avatar on the - // first page - screen.getByText("Alice"); - const aliceAvatar = screen.getByRole("img"); - expect(screen.queryByRole("button", { name: "common.back" })).toBe( - null, - ); - // Bob should be out of the spotlight, and therefore invisible - expect(isInaccessible(screen.getByText("Bob"))).toBe(true); - // Now navigate to Bob - await user.click(screen.getByRole("button", { name: "Next" })); - screen.getByText("Bob"); - expect(screen.getByRole("img")).not.toBe(aliceAvatar); - expect(isInaccessible(screen.getByText("Alice"))).toBe(true); - // Can toggle whether the tile is expanded - await user.click(screen.getByRole("button", { name: "Expand" })); - expect(toggleExpanded).toHaveBeenCalled(); - }, - ); + const vm2 = createLocalMedia( + mockRtcMembership("@bob:example.org", "BBBB"), + { + rawDisplayName: "Bob", + getMxcAvatarUrl: () => "mxc://dlskf", }, + mockLocalParticipant({}), + mockMediaDevices({}), ); + + const user = userEvent.setup(); + const toggleExpanded = vi.fn(); + const { container } = render( + , + ); + + expect(await axe(container)).toHaveNoViolations(); + // Alice should be in the spotlight, with her name and avatar on the + // first page + screen.getByText("Alice"); + const aliceAvatar = screen.getByRole("img"); + expect(screen.queryByRole("button", { name: "common.back" })).toBe(null); + // Bob should be out of the spotlight, and therefore invisible + expect(isInaccessible(screen.getByText("Bob"))).toBe(true); + // Now navigate to Bob + await user.click(screen.getByRole("button", { name: "Next" })); + screen.getByText("Bob"); + expect(screen.getByRole("img")).not.toBe(aliceAvatar); + expect(isInaccessible(screen.getByText("Alice"))).toBe(true); + // Can toggle whether the tile is expanded + await user.click(screen.getByRole("button", { name: "Expand" })); + expect(toggleExpanded).toHaveBeenCalled(); }); diff --git a/src/utils/observable.test.ts b/src/utils/observable.test.ts index 5f488fb16..e039c846f 100644 --- a/src/utils/observable.test.ts +++ b/src/utils/observable.test.ts @@ -6,9 +6,10 @@ Please see LICENSE in the repository root for full details. */ import { test } from "vitest"; +import { Subject } from "rxjs"; import { withTestScheduler } from "./test"; -import { pauseWhen } from "./observable"; +import { generateKeyed$, pauseWhen } from "./observable"; test("pauseWhen", () => { withTestScheduler(({ behavior, expectObservable }) => { @@ -22,3 +23,43 @@ test("pauseWhen", () => { ).toBe(outputMarbles); }); }); + +test("generateKeyed$ has the right output and ends scopes at the right times", () => { + const scope1$ = new Subject(); + const scope2$ = new Subject(); + const scope3$ = new Subject(); + const scope4$ = new Subject(); + const scopeSubjects = [scope1$, scope2$, scope3$, scope4$]; + + withTestScheduler(({ hot, expectObservable }) => { + // Each scope should start when the input number reaches or surpasses their + // number and end when the input number drops back below their number. + // At the very end, unsubscribing should end all remaining scopes. + const inputMarbles = " 123242"; + const outputMarbles = " abcbdb"; + const subscriptionMarbles = "^-----!"; + const scope1Marbles = " y-----n"; + const scope2Marbles = " -y----n"; + const scope3Marbles = " --ynyn"; + const scope4Marbles = " ----yn"; + + expectObservable( + generateKeyed$(hot(inputMarbles), (input, createOrGet) => { + for (let i = 1; i <= +input; i++) { + createOrGet(i.toString(), (scope) => { + scopeSubjects[i - 1].next("y"); + scope.onEnd(() => scopeSubjects[i - 1].next("n")); + return i.toString(); + }); + } + return "abcd"[+input - 1]; + }), + subscriptionMarbles, + ).toBe(outputMarbles); + + expectObservable(scope1$).toBe(scope1Marbles); + expectObservable(scope2$).toBe(scope2Marbles); + expectObservable(scope3$).toBe(scope3Marbles); + expectObservable(scope4$).toBe(scope4Marbles); + }); +}); diff --git a/src/utils/observable.ts b/src/utils/observable.ts index 74acfaf2c..eb8179910 100644 --- a/src/utils/observable.ts +++ b/src/utils/observable.ts @@ -23,6 +23,7 @@ import { } from "rxjs"; import { type Behavior } from "../state/Behavior"; +import { ObservableScope } from "../state/ObservableScope"; const nothing = Symbol("nothing"); @@ -117,3 +118,71 @@ export function pauseWhen(pause$: Behavior) { map(([value]) => value), ); } + +/** + * Maps a changing input value to an output value consisting of items that have + * automatically generated ObservableScopes tied to a key. Items will be + * automatically created when their key is requested for the first time, reused + * when the same key is requested at a later time, and destroyed (have their + * scope ended) when the key is no longer requested. + * + * @param input$ The input value to be mapped. + * @param project A function mapping input values to output values. This + * function receives an additional callback `createOrGet` which can be used + * within the function body to request that an item be generated for a certain + * key. The caller provides a factory which will be used to create the item if + * it is being requested for the first time. Otherwise, the item previously + * existing under that key will be returned. + */ +export function generateKeyed$( + input$: Observable, + project: ( + input: In, + createOrGet: ( + key: string, + factory: (scope: ObservableScope) => Item, + ) => Item, + ) => Out, +): Observable { + return input$.pipe( + // Keep track of the existing items over time, so we can reuse them + scan< + In, + { + items: Map; + output: Out; + }, + { items: Map } + >( + (state, data) => { + const nextItems = new Map< + string, + { item: Item; scope: ObservableScope } + >(); + + const output = project(data, (key, factory) => { + let item = state.items.get(key); + if (item === undefined) { + // First time requesting the key; create the item + const scope = new ObservableScope(); + item = { item: factory(scope), scope }; + } + nextItems.set(key, item); + return item.item; + }); + + // Destroy all items that are no longer being requested + for (const [key, { scope }] of state.items) + if (!nextItems.has(key)) scope.end(); + + return { items: nextItems, output }; + }, + { items: new Map() }, + ), + finalizeValue((state) => { + // Destroy all remaining items when no longer subscribed + for (const { scope } of state.items.values()) scope.end(); + }), + map(({ output }) => output), + ); +} diff --git a/src/utils/test-viewmodel.ts b/src/utils/test-viewmodel.ts index d6a4bb10a..fa6bb6911 100644 --- a/src/utils/test-viewmodel.ts +++ b/src/utils/test-viewmodel.ts @@ -27,6 +27,7 @@ import { mockMediaDevices, mockMuteStates, MockRTCSession, + testScope, } from "./test"; import { aliceRtcMember, localRtcMember } from "./test-fixtures"; import { type RaisedHandInfo, type ReactionInfo } from "../reactions"; @@ -134,6 +135,7 @@ export function getBasicCallViewModelEnvironment( // const remoteParticipants$ = of([aliceParticipant]); const vm = new CallViewModel( + testScope(), rtcSession.asMockedSession(), matrixRoom, mockMediaDevices({}), diff --git a/src/utils/test.ts b/src/utils/test.ts index d50c1af4e..baef14b72 100644 --- a/src/utils/test.ts +++ b/src/utils/test.ts @@ -6,7 +6,7 @@ Please see LICENSE in the repository root for full details. */ import { map, type Observable, of, type SchedulerLike } from "rxjs"; import { type RunHelpers, TestScheduler } from "rxjs/testing"; -import { expect, type MockedObject, vi, vitest } from "vitest"; +import { expect, type MockedObject, onTestFinished, vi, vitest } from "vitest"; import { type RoomMember, type Room as MatrixRoom, @@ -89,6 +89,15 @@ interface TestRunnerGlobal { rxjsTestScheduler?: SchedulerLike; } +/** + * Create a new ObservableScope which ends when the current test ends. + */ +export function testScope(): ObservableScope { + const scope = new ObservableScope(); + onTestFinished(() => scope.end()); + return scope; +} + /** * Run Observables with a scheduler that virtualizes time, for testing purposes. */ @@ -259,14 +268,14 @@ export function mockLocalParticipant( } as Partial as LocalParticipant; } -export async function withLocalMedia( +export function createLocalMedia( localRtcMember: CallMembership, roomMember: Partial, localParticipant: LocalParticipant, mediaDevices: MediaDevices, - continuation: (vm: LocalUserMediaViewModel) => void | Promise, -): Promise { - const vm = new LocalUserMediaViewModel( +): LocalUserMediaViewModel { + return new LocalUserMediaViewModel( + testScope(), "local", mockMatrixRoomMember(localRtcMember, roomMember), constant(localParticipant), @@ -280,11 +289,6 @@ export async function withLocalMedia( constant(null), constant(null), ); - try { - await continuation(vm); - } finally { - vm.destroy(); - } } export function mockRemoteParticipant( @@ -300,14 +304,14 @@ export function mockRemoteParticipant( } as RemoteParticipant; } -export async function withRemoteMedia( +export function createRemoteMedia( localRtcMember: CallMembership, roomMember: Partial, participant: Partial, - continuation: (vm: RemoteUserMediaViewModel) => void | Promise, -): Promise { +): RemoteUserMediaViewModel { const remoteParticipant = mockRemoteParticipant(participant); - const vm = new RemoteUserMediaViewModel( + return new RemoteUserMediaViewModel( + testScope(), "remote", mockMatrixRoomMember(localRtcMember, roomMember), of(remoteParticipant), @@ -321,11 +325,6 @@ export async function withRemoteMedia( constant(null), constant(null), ); - try { - await continuation(vm); - } finally { - vm.destroy(); - } } export function mockConfig(config: Partial = {}): void {