From aef54687d6ed353ada30f937f94189f8fe5f7b98 Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Tue, 30 Jul 2024 16:20:06 -0400 Subject: [PATCH] Abort Firestore listeners on terminate. (#8399) * Abort onSnapshotListeners on terminate. * Pretty * Fix race condition --- .../firestore/src/core/component_provider.ts | 1 + packages/firestore/src/core/event_manager.ts | 41 ++++++++++++++++--- .../test/integration/api/batch_writes.test.ts | 5 +-- .../test/integration/api/database.test.ts | 19 ++++++++- 4 files changed, 56 insertions(+), 10 deletions(-) diff --git a/packages/firestore/src/core/component_provider.ts b/packages/firestore/src/core/component_provider.ts index 618653b9237..a3153e797b2 100644 --- a/packages/firestore/src/core/component_provider.ts +++ b/packages/firestore/src/core/component_provider.ts @@ -485,5 +485,6 @@ export class OnlineComponentProvider { async terminate(): Promise { await remoteStoreShutdown(this.remoteStore); this.datastore?.terminate(); + this.eventManager?.terminate(); } } diff --git a/packages/firestore/src/core/event_manager.ts b/packages/firestore/src/core/event_manager.ts index b53c45669cb..72d801f3934 100644 --- a/packages/firestore/src/core/event_manager.ts +++ b/packages/firestore/src/core/event_manager.ts @@ -17,7 +17,7 @@ import { debugAssert, debugCast } from '../util/assert'; import { wrapInUserErrorIfRecoverable } from '../util/async_queue'; -import { FirestoreError } from '../util/error'; +import { Code, FirestoreError } from '../util/error'; import { EventHandler } from '../util/misc'; import { ObjectMap } from '../util/obj_map'; @@ -64,6 +64,7 @@ export interface EventManager { onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise; onFirstRemoteStoreListen?: (query: Query) => Promise; onLastRemoteStoreUnlisten?: (query: Query) => Promise; + terminate(): void; } export function newEventManager(): EventManager { @@ -71,12 +72,9 @@ export function newEventManager(): EventManager { } export class EventManagerImpl implements EventManager { - queries = new ObjectMap( - q => canonifyQuery(q), - queryEquals - ); + queries: ObjectMap = newQueriesObjectMap(); - onlineState = OnlineState.Unknown; + onlineState: OnlineState = OnlineState.Unknown; snapshotsInSyncListeners: Set> = new Set(); @@ -98,6 +96,20 @@ export class EventManagerImpl implements EventManager { * still listening to the cache. */ onLastRemoteStoreUnlisten?: (query: Query) => Promise; + + terminate(): void { + errorAllTargets( + this, + new FirestoreError(Code.ABORTED, 'Firestore shutting down') + ); + } +} + +function newQueriesObjectMap(): ObjectMap { + return new ObjectMap( + q => canonifyQuery(q), + queryEquals + ); } function validateEventManager(eventManagerImpl: EventManagerImpl): void { @@ -334,6 +346,23 @@ export function removeSnapshotsInSyncListener( eventManagerImpl.snapshotsInSyncListeners.delete(observer); } +function errorAllTargets( + eventManager: EventManager, + error: FirestoreError +): void { + const eventManagerImpl = debugCast(eventManager, EventManagerImpl); + const queries = eventManagerImpl.queries; + + // Prevent further access by clearing ObjectMap. + eventManagerImpl.queries = newQueriesObjectMap(); + + queries.forEach((_, queryInfo) => { + for (const listener of queryInfo.listeners) { + listener.onError(error); + } + }); +} + // Call all global snapshot listeners that have been set. function raiseSnapshotsInSyncEvent(eventManagerImpl: EventManagerImpl): void { eventManagerImpl.snapshotsInSyncListeners.forEach(observer => { diff --git a/packages/firestore/test/integration/api/batch_writes.test.ts b/packages/firestore/test/integration/api/batch_writes.test.ts index 0baf112d47a..0110b4fa686 100644 --- a/packages/firestore/test/integration/api/batch_writes.test.ts +++ b/packages/firestore/test/integration/api/batch_writes.test.ts @@ -155,12 +155,11 @@ apiDescribe('Database batch writes', persistence => { ); return accumulator .awaitEvent() - .then(initialSnap => { + .then(async initialSnap => { expect(initialSnap.docs.length).to.equal(0); // Atomically write two documents. - // eslint-disable-next-line @typescript-eslint/no-floating-promises - writeBatch(db).set(docA, { a: 1 }).set(docB, { b: 2 }).commit(); + await writeBatch(db).set(docA, { a: 1 }).set(docB, { b: 2 }).commit(); return accumulator.awaitEvent(); }) diff --git a/packages/firestore/test/integration/api/database.test.ts b/packages/firestore/test/integration/api/database.test.ts index 406a2a985dd..6418d4a9651 100644 --- a/packages/firestore/test/integration/api/database.test.ts +++ b/packages/firestore/test/integration/api/database.test.ts @@ -63,7 +63,8 @@ import { FieldPath, newTestFirestore, SnapshotOptions, - newTestApp + newTestApp, + FirestoreError } from '../util/firebase_export'; import { apiDescribe, @@ -1442,6 +1443,22 @@ apiDescribe('Database', persistence => { }); }); + it('query listener throws error on termination', async () => { + return withTestDoc(persistence, async (docRef, firestore) => { + const deferred: Deferred = new Deferred(); + const unsubscribe = onSnapshot(docRef, snapshot => {}, deferred.resolve); + + await terminate(firestore); + + await expect(deferred.promise) + .to.eventually.haveOwnProperty('message') + .equal('Firestore shutting down'); + + // Call should proceed without error. + unsubscribe(); + }); + }); + it('can wait for pending writes', async () => { await withTestDoc(persistence, async (docRef, firestore) => { // Prevent pending writes receiving acknowledgement.