From 224419457c3fd2e5813166dbd7d6d9a03322143c Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 18 Apr 2024 13:47:49 -0400 Subject: [PATCH] Firestore: Fix spurious "Backend didn't respond within 10 seconds" errors when network just slow (#8145) --- .changeset/early-tomatoes-occur.md | 6 +++ .../platform/browser/webchannel_connection.ts | 1 + .../src/platform/node/grpc_connection.ts | 6 +++ packages/firestore/src/remote/connection.ts | 8 +++- .../firestore/src/remote/persistent_stream.ts | 8 ++++ packages/firestore/src/remote/remote_store.ts | 9 +++++ .../firestore/src/remote/stream_bridge.ts | 17 +++++++++ .../integration/browser/webchannel.test.ts | 4 ++ .../test/integration/remote/stream.test.ts | 38 +++++++++++++++++-- 9 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 .changeset/early-tomatoes-occur.md diff --git a/.changeset/early-tomatoes-occur.md b/.changeset/early-tomatoes-occur.md new file mode 100644 index 00000000000..b57ec9cd8b5 --- /dev/null +++ b/.changeset/early-tomatoes-occur.md @@ -0,0 +1,6 @@ +--- +'@firebase/firestore': patch +'firebase': patch +--- + +Prevent spurious "Backend didn't respond within 10 seconds" errors when network is indeed responding, just slowly. diff --git a/packages/firestore/src/platform/browser/webchannel_connection.ts b/packages/firestore/src/platform/browser/webchannel_connection.ts index 05cd79ecf9e..38d78996b6e 100644 --- a/packages/firestore/src/platform/browser/webchannel_connection.ts +++ b/packages/firestore/src/platform/browser/webchannel_connection.ts @@ -306,6 +306,7 @@ export class WebChannelConnection extends RestConnection { LOG_TAG, `RPC '${rpcName}' stream ${streamId} transport opened.` ); + streamBridge.callOnConnected(); } }); diff --git a/packages/firestore/src/platform/node/grpc_connection.ts b/packages/firestore/src/platform/node/grpc_connection.ts index ab9e44792ca..dec3137af76 100644 --- a/packages/firestore/src/platform/node/grpc_connection.ts +++ b/packages/firestore/src/platform/node/grpc_connection.ts @@ -286,9 +286,15 @@ export class GrpcConnection implements Connection { } }); + let onConnectedSent = false; grpcStream.on('data', (msg: Resp) => { if (!closed) { logDebug(LOG_TAG, `RPC '${rpcName}' stream ${streamId} received:`, msg); + // Emulate the "onConnected" event that WebChannelConnection sends. + if (!onConnectedSent) { + stream.callOnConnected(); + onConnectedSent = true; + } stream.callOnMessage(msg); } }); diff --git a/packages/firestore/src/remote/connection.ts b/packages/firestore/src/remote/connection.ts index b727bc63c17..2bf982eb4d1 100644 --- a/packages/firestore/src/remote/connection.ts +++ b/packages/firestore/src/remote/connection.ts @@ -109,10 +109,14 @@ export interface Connection { * A bidirectional stream that can be used to send an receive messages. * * A stream can be closed locally with close() or can be closed remotely or - * through network errors. onClose is guaranteed to be called. onOpen will only - * be called if the stream successfully established a connection. + * through network errors. onClose is guaranteed to be called. onOpen will be + * called once the stream is ready to send messages (which may or may not be + * before an actual connection to the backend has been established). The + * onConnected event is called when an actual, physical connection with the + * backend has been established, and may occur before or after the onOpen event. */ export interface Stream { + onConnected(callback: () => void): void; onOpen(callback: () => void): void; onClose(callback: (err?: FirestoreError) => void): void; onMessage(callback: (msg: O) => void): void; diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index b7f657876b4..21c35aace28 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -125,6 +125,11 @@ const enum PersistentStreamState { * events by the concrete implementation classes. */ export interface PersistentStreamListener { + /** + * Called after receiving an acknowledgement from the server, confirming that + * we are able to connect to it. + */ + onConnected: () => Promise; /** * Called after the stream was established and can accept outgoing * messages @@ -483,6 +488,9 @@ export abstract class PersistentStream< const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount); this.stream = this.startRpc(authToken, appCheckToken); + this.stream.onConnected(() => { + dispatchIfNotClosed(() => this.listener!.onConnected()); + }); this.stream.onOpen(() => { dispatchIfNotClosed(() => { debugAssert( diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index 8cb361bfe09..2e9cb1546e3 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -403,6 +403,13 @@ function cleanUpWatchStreamState(remoteStoreImpl: RemoteStoreImpl): void { remoteStoreImpl.watchChangeAggregator = undefined; } +async function onWatchStreamConnected( + remoteStoreImpl: RemoteStoreImpl +): Promise { + // Mark the client as online since we got a "connected" notification. + remoteStoreImpl.onlineStateTracker.set(OnlineState.Online); +} + async function onWatchStreamOpen( remoteStoreImpl: RemoteStoreImpl ): Promise { @@ -923,6 +930,7 @@ function ensureWatchStream( remoteStoreImpl.datastore, remoteStoreImpl.asyncQueue, { + onConnected: onWatchStreamConnected.bind(null, remoteStoreImpl), onOpen: onWatchStreamOpen.bind(null, remoteStoreImpl), onClose: onWatchStreamClose.bind(null, remoteStoreImpl), onWatchChange: onWatchStreamChange.bind(null, remoteStoreImpl) @@ -969,6 +977,7 @@ function ensureWriteStream( remoteStoreImpl.datastore, remoteStoreImpl.asyncQueue, { + onConnected: () => Promise.resolve(), onOpen: onWriteStreamOpen.bind(null, remoteStoreImpl), onClose: onWriteStreamClose.bind(null, remoteStoreImpl), onHandshakeComplete: onWriteHandshakeComplete.bind( diff --git a/packages/firestore/src/remote/stream_bridge.ts b/packages/firestore/src/remote/stream_bridge.ts index bfecb707b8a..78ae01116ac 100644 --- a/packages/firestore/src/remote/stream_bridge.ts +++ b/packages/firestore/src/remote/stream_bridge.ts @@ -26,6 +26,7 @@ import { Stream } from './connection'; * interface. The stream callbacks are invoked with the callOn... methods. */ export class StreamBridge implements Stream { + private wrappedOnConnected: (() => void) | undefined; private wrappedOnOpen: (() => void) | undefined; private wrappedOnClose: ((err?: FirestoreError) => void) | undefined; private wrappedOnMessage: ((msg: O) => void) | undefined; @@ -38,6 +39,14 @@ export class StreamBridge implements Stream { this.closeFn = args.closeFn; } + onConnected(callback: () => void): void { + debugAssert( + !this.wrappedOnConnected, + 'Called onConnected on stream twice!' + ); + this.wrappedOnConnected = callback; + } + onOpen(callback: () => void): void { debugAssert(!this.wrappedOnOpen, 'Called onOpen on stream twice!'); this.wrappedOnOpen = callback; @@ -61,6 +70,14 @@ export class StreamBridge implements Stream { this.sendFn(msg); } + callOnConnected(): void { + debugAssert( + this.wrappedOnConnected !== undefined, + 'Cannot call onConnected because no callback was set' + ); + this.wrappedOnConnected(); + } + callOnOpen(): void { debugAssert( this.wrappedOnOpen !== undefined, diff --git a/packages/firestore/test/integration/browser/webchannel.test.ts b/packages/firestore/test/integration/browser/webchannel.test.ts index 9d9847fc665..622bc67ade2 100644 --- a/packages/firestore/test/integration/browser/webchannel.test.ts +++ b/packages/firestore/test/integration/browser/webchannel.test.ts @@ -63,6 +63,10 @@ describeFn('WebChannel', () => { } }; + // Register an "onConnected" callback since it's required, even though we + // don't care about this event. + stream.onConnected(() => {}); + // Once the stream is open, send an "add_target" request stream.onOpen(() => { stream.send(payload); diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index 6726f92a772..53819ae21d5 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -22,7 +22,10 @@ import { Token } from '../../../src/api/credentials'; import { SnapshotVersion } from '../../../src/core/snapshot_version'; +import { Target } from '../../../src/core/target'; +import { TargetData, TargetPurpose } from '../../../src/local/target_data'; import { MutationResult } from '../../../src/model/mutation'; +import { ResourcePath } from '../../../src/model/path'; import { newPersistentWatchStream, newPersistentWriteStream @@ -57,7 +60,8 @@ type StreamEventType = | 'mutationResult' | 'watchChange' | 'open' - | 'close'; + | 'close' + | 'connected'; const SINGLE_MUTATION = [setMutation('docs/1', { foo: 'bar' })]; @@ -117,6 +121,10 @@ class StreamStatusListener implements WatchStreamListener, WriteStreamListener { return this.resolvePending('watchChange'); } + onConnected(): Promise { + return this.resolvePending('connected'); + } + onOpen(): Promise { return this.resolvePending('open'); } @@ -148,6 +156,14 @@ describe('Watch Stream', () => { }); }); }); + + it('gets connected event before first message', () => { + return withTestWatchStream(async (watchStream, streamListener) => { + await streamListener.awaitCallback('open'); + watchStream.watch(sampleTargetData()); + await streamListener.awaitCallback('connected'); + }); + }); }); class MockAuthCredentialsProvider extends EmptyAuthCredentialsProvider { @@ -190,6 +206,7 @@ describe('Write Stream', () => { 'Handshake must be complete before writing mutations' ); writeStream.writeHandshake(); + await streamListener.awaitCallback('connected'); await streamListener.awaitCallback('handshakeComplete'); // Now writes should succeed @@ -205,9 +222,10 @@ describe('Write Stream', () => { return withTestWriteStream((writeStream, streamListener, queue) => { return streamListener .awaitCallback('open') - .then(() => { + .then(async () => { writeStream.writeHandshake(); - return streamListener.awaitCallback('handshakeComplete'); + await streamListener.awaitCallback('connected'); + await streamListener.awaitCallback('handshakeComplete'); }) .then(() => { writeStream.markIdle(); @@ -228,6 +246,7 @@ describe('Write Stream', () => { return withTestWriteStream(async (writeStream, streamListener, queue) => { await streamListener.awaitCallback('open'); writeStream.writeHandshake(); + await streamListener.awaitCallback('connected'); await streamListener.awaitCallback('handshakeComplete'); // Mark the stream idle, but immediately cancel the idle timer by issuing another write. @@ -336,3 +355,16 @@ export async function withTestWatchStream( streamListener.verifyNoPendingCallbacks(); }); } + +function sampleTargetData(): TargetData { + const target: Target = { + path: ResourcePath.emptyPath(), + collectionGroup: null, + orderBy: [], + filters: [], + limit: null, + startAt: null, + endAt: null + }; + return new TargetData(target, 1, TargetPurpose.Listen, 1); +}