Skip to content

Commit

Permalink
Abort Firestore listeners on terminate. (#8399)
Browse files Browse the repository at this point in the history
* Abort onSnapshotListeners on terminate.

* Pretty

* Fix race condition
  • Loading branch information
tom-andersen authored Jul 30, 2024
1 parent 6bb2e89 commit aef5468
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 10 deletions.
1 change: 1 addition & 0 deletions packages/firestore/src/core/component_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,5 +485,6 @@ export class OnlineComponentProvider {
async terminate(): Promise<void> {
await remoteStoreShutdown(this.remoteStore);
this.datastore?.terminate();
this.eventManager?.terminate();
}
}
41 changes: 35 additions & 6 deletions packages/firestore/src/core/event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import { debugAssert, debugCast } from '../util/assert';
import { wrapInUserErrorIfRecoverable } from '../util/async_queue';
import { FirestoreError } from '../util/error';
import { Code, FirestoreError } from '../util/error';
import { EventHandler } from '../util/misc';
import { ObjectMap } from '../util/obj_map';

Expand Down Expand Up @@ -64,19 +64,17 @@ export interface EventManager {
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
terminate(): void;
}

export function newEventManager(): EventManager {
return new EventManagerImpl();
}

export class EventManagerImpl implements EventManager {
queries = new ObjectMap<Query, QueryListenersInfo>(
q => canonifyQuery(q),
queryEquals
);
queries: ObjectMap<Query, QueryListenersInfo> = newQueriesObjectMap();

onlineState = OnlineState.Unknown;
onlineState: OnlineState = OnlineState.Unknown;

snapshotsInSyncListeners: Set<Observer<void>> = new Set();

Expand All @@ -98,6 +96,20 @@ export class EventManagerImpl implements EventManager {
* still listening to the cache.
*/
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;

terminate(): void {
errorAllTargets(
this,
new FirestoreError(Code.ABORTED, 'Firestore shutting down')
);
}
}

function newQueriesObjectMap(): ObjectMap<Query, QueryListenersInfo> {
return new ObjectMap<Query, QueryListenersInfo>(
q => canonifyQuery(q),
queryEquals
);
}

function validateEventManager(eventManagerImpl: EventManagerImpl): void {
Expand Down Expand Up @@ -334,6 +346,23 @@ export function removeSnapshotsInSyncListener(
eventManagerImpl.snapshotsInSyncListeners.delete(observer);
}

function errorAllTargets(
eventManager: EventManager,
error: FirestoreError
): void {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
const queries = eventManagerImpl.queries;

// Prevent further access by clearing ObjectMap.
eventManagerImpl.queries = newQueriesObjectMap();

queries.forEach((_, queryInfo) => {
for (const listener of queryInfo.listeners) {
listener.onError(error);
}
});
}

// Call all global snapshot listeners that have been set.
function raiseSnapshotsInSyncEvent(eventManagerImpl: EventManagerImpl): void {
eventManagerImpl.snapshotsInSyncListeners.forEach(observer => {
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/test/integration/api/batch_writes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,11 @@ apiDescribe('Database batch writes', persistence => {
);
return accumulator
.awaitEvent()
.then(initialSnap => {
.then(async initialSnap => {
expect(initialSnap.docs.length).to.equal(0);

// Atomically write two documents.
// eslint-disable-next-line @typescript-eslint/no-floating-promises
writeBatch(db).set(docA, { a: 1 }).set(docB, { b: 2 }).commit();
await writeBatch(db).set(docA, { a: 1 }).set(docB, { b: 2 }).commit();

return accumulator.awaitEvent();
})
Expand Down
19 changes: 18 additions & 1 deletion packages/firestore/test/integration/api/database.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ import {
FieldPath,
newTestFirestore,
SnapshotOptions,
newTestApp
newTestApp,
FirestoreError
} from '../util/firebase_export';
import {
apiDescribe,
Expand Down Expand Up @@ -1442,6 +1443,22 @@ apiDescribe('Database', persistence => {
});
});

it('query listener throws error on termination', async () => {
return withTestDoc(persistence, async (docRef, firestore) => {
const deferred: Deferred<FirestoreError> = new Deferred();
const unsubscribe = onSnapshot(docRef, snapshot => {}, deferred.resolve);

await terminate(firestore);

await expect(deferred.promise)
.to.eventually.haveOwnProperty('message')
.equal('Firestore shutting down');

// Call should proceed without error.
unsubscribe();
});
});

it('can wait for pending writes', async () => {
await withTestDoc(persistence, async (docRef, firestore) => {
// Prevent pending writes receiving acknowledgement.
Expand Down

0 comments on commit aef5468

Please sign in to comment.