Skip to content

Commit c7cff7c

Browse files
KyleAMathewsclaude
andcommitted
Fix error propagation from sync clients to collections
Errors from Electric SQL and TanStack Query weren't being propagated to collections, causing two critical issues: 1. `preload()` would hang indefinitely when sync errors occurred, blocking apps waiting for data 2. Collections would be marked as 'ready' even when they had no synced data, leading to empty results **Changes:** - **electric-db-collection**: Implement 10-second grace period before marking collection as errored, allowing Electric's built-in retry logic to recover from transitory network issues. Remove premature `markReady()` call that was marking collections ready with no data. - **query-db-collection**: Set error status immediately after TanStack Query exhausts retries (no grace period needed since Query handles retries internally). - **preload()**: Listen for `status:error` events and reject the promise, preventing indefinite hangs when sync fails. - **Collection lifecycle**: Add `markError()` method and make `events` public. Auto-handle error recovery in `markReady()` by transitioning through loading state (error → loading → ready). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 6692aad commit c7cff7c

File tree

10 files changed

+374
-33
lines changed

10 files changed

+374
-33
lines changed

.beads/electric-error-propegate.db

80 KB
Binary file not shown.

REVIEW_FEEDBACK.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Review Feedback
2+
3+
- Blocking – `.preload()` now hangs when the initial sync errors (`packages/electric-db-collection/src/electric.ts:655`). We removed the `markReady()` call in the stream `onError` handler and now just rethrow. Unfortunately, `CollectionSyncManager.preload()` (`packages/db/src/collection/sync.ts:203`) only resolves once `markReady()` is triggered and never rejects later if the lifecycle flips to `error`. So if the first sync attempt fails and the user hasn’t provided a custom `shapeOptions.onError`, any `await collection.preload()` will wait forever. Could we either reinstate the ready transition or wire the lifecycle into an error path that causes `preload()` to reject (e.g. keep a resolver/rejector registered on `status:error`)?
4+
- Suggestion – if you decide to push the lifecycle into `error` after a timeout, make sure `preload()` can observe that state change. Right now it registers only `onFirstReady`, so it still wouldn’t unblock unless we also reject when `status:error` fires.

packages/db/src/collection/index.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ export class CollectionImpl<
215215
public utils: Record<string, Fn> = {}
216216

217217
// Managers
218-
private _events: CollectionEventsManager
218+
public events: CollectionEventsManager
219219
private _changes: CollectionChangesManager<TOutput, TKey, TSchema, TInput>
220220
private _lifecycle: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>
221221
private _sync: CollectionSyncManager<TOutput, TKey, TSchema, TInput>
@@ -261,7 +261,7 @@ export class CollectionImpl<
261261
}
262262

263263
this._changes = new CollectionChangesManager()
264-
this._events = new CollectionEventsManager()
264+
this.events = new CollectionEventsManager()
265265
this._indexes = new CollectionIndexesManager()
266266
this._lifecycle = new CollectionLifecycleManager(config, this.id)
267267
this._mutations = new CollectionMutationsManager(config, this.id)
@@ -272,9 +272,9 @@ export class CollectionImpl<
272272
collection: this, // Required for passing to CollectionSubscription
273273
lifecycle: this._lifecycle,
274274
sync: this._sync,
275-
events: this._events,
275+
events: this.events,
276276
})
277-
this._events.setDeps({
277+
this.events.setDeps({
278278
collection: this, // Required for adding to emitted events
279279
})
280280
this._indexes.setDeps({
@@ -283,7 +283,7 @@ export class CollectionImpl<
283283
})
284284
this._lifecycle.setDeps({
285285
changes: this._changes,
286-
events: this._events,
286+
events: this.events,
287287
indexes: this._indexes,
288288
state: this._state,
289289
sync: this._sync,
@@ -810,7 +810,7 @@ export class CollectionImpl<
810810
event: T,
811811
callback: CollectionEventHandler<T>
812812
) {
813-
return this._events.on(event, callback)
813+
return this.events.on(event, callback)
814814
}
815815

816816
/**
@@ -820,7 +820,7 @@ export class CollectionImpl<
820820
event: T,
821821
callback: CollectionEventHandler<T>
822822
) {
823-
return this._events.once(event, callback)
823+
return this.events.once(event, callback)
824824
}
825825

826826
/**
@@ -830,7 +830,7 @@ export class CollectionImpl<
830830
event: T,
831831
callback: CollectionEventHandler<T>
832832
) {
833-
this._events.off(event, callback)
833+
this.events.off(event, callback)
834834
}
835835

836836
/**
@@ -840,7 +840,7 @@ export class CollectionImpl<
840840
event: T,
841841
timeout?: number
842842
) {
843-
return this._events.waitFor(event, timeout)
843+
return this.events.waitFor(event, timeout)
844844
}
845845

846846
/**

packages/db/src/collection/lifecycle.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ export class CollectionLifecycleManager<
7878
loading: [`initialCommit`, `ready`, `error`, `cleaned-up`],
7979
initialCommit: [`ready`, `error`, `cleaned-up`],
8080
ready: [`cleaned-up`, `error`],
81-
error: [`cleaned-up`, `idle`],
81+
error: [`cleaned-up`, `idle`, `loading`],
8282
"cleaned-up": [`loading`, `error`],
8383
}
8484

@@ -144,6 +144,11 @@ export class CollectionLifecycleManager<
144144
* @private - Should only be called by sync implementations
145145
*/
146146
public markReady(): void {
147+
// If recovering from error, transition to loading first
148+
if (this.status === `error`) {
149+
this.setStatus(`loading`)
150+
}
151+
147152
this.validateStatusTransition(this.status, `ready`)
148153
// Can transition to ready from loading or initialCommit states
149154
if (this.status === `loading` || this.status === `initialCommit`) {
@@ -170,6 +175,15 @@ export class CollectionLifecycleManager<
170175
}
171176
}
172177

178+
/**
179+
* Mark the collection as being in an error state
180+
* This is called by sync implementations when persistent errors occur
181+
* @private - Should only be called by sync implementations
182+
*/
183+
public markError(): void {
184+
this.setStatus(`error`)
185+
}
186+
173187
/**
174188
* Start the garbage collection timer
175189
* Called when the collection becomes inactive (no subscribers)

packages/db/src/collection/sync.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ export class CollectionSyncManager<
159159
markReady: () => {
160160
this.lifecycle.markReady()
161161
},
162+
markError: () => {
163+
this.lifecycle.markError()
164+
},
162165
truncate: () => {
163166
const pendingTransaction =
164167
this.state.pendingSyncedTransactions[
@@ -221,6 +224,19 @@ export class CollectionSyncManager<
221224
resolve()
222225
})
223226

227+
// Also listen for error status transitions and reject the promise
228+
const unsubscribeError = this.collection.events.once(
229+
`status:error`,
230+
() => {
231+
reject(new CollectionIsInErrorStateError())
232+
}
233+
)
234+
235+
// Clean up error listener when promise resolves
236+
this.lifecycle.onFirstReady(() => {
237+
unsubscribeError()
238+
})
239+
224240
// Start sync if collection hasn't started yet or was cleaned up
225241
if (
226242
this.lifecycle.status === `idle` ||
@@ -229,6 +245,7 @@ export class CollectionSyncManager<
229245
try {
230246
this.startSync()
231247
} catch (error) {
248+
unsubscribeError()
232249
reject(error)
233250
return
234251
}

packages/db/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ export interface SyncConfig<
172172
write: (message: Omit<ChangeMessage<T>, `key`>) => void
173173
commit: () => void
174174
markReady: () => void
175+
markError: () => void
175176
truncate: () => void
176177
}) => void | CleanupFn | SyncConfigRes
177178

packages/electric-db-collection/src/electric.ts

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,10 @@ function createElectricSync<T extends Row<unknown>>(
587587
collectionId?: string
588588
}
589589
): SyncConfig<T> {
590+
// Track first error for grace period before setting collection to error status
591+
let firstErrorTimestamp: number | null = null
592+
let errorGracePeriodTimeout: ReturnType<typeof setTimeout> | null = null
593+
const ERROR_GRACE_PERIOD_MS = 10000 // 10 seconds
590594
const {
591595
seenTxids,
592596
seenSnapshots,
@@ -620,7 +624,7 @@ function createElectricSync<T extends Row<unknown>>(
620624

621625
return {
622626
sync: (params: Parameters<SyncConfig<T>[`sync`]>[0]) => {
623-
const { begin, write, commit, markReady, truncate, collection } = params
627+
const { begin, write, commit, markReady, markError, truncate } = params
624628

625629
// Abort controller for the stream - wraps the signal if provided
626630
const abortController = new AbortController()
@@ -655,25 +659,29 @@ function createElectricSync<T extends Row<unknown>>(
655659
...shapeOptions,
656660
signal: abortController.signal,
657661
onError: (errorParams) => {
658-
// Just immediately mark ready if there's an error to avoid blocking
659-
// apps waiting for `.preload()` to finish.
660662
// Note that Electric sends a 409 error on a `must-refetch` message, but the
661-
// ShapeStream handled this and it will not reach this handler, therefor
662-
// this markReady will not be triggers by a `must-refetch`.
663-
markReady()
663+
// ShapeStream handles this and it will not reach this handler.
664+
// If the error is transitory, ShapeStream will retry and eventually call
665+
// markReady() naturally when it receives 'up-to-date'.
666+
667+
// Track first error for grace period
668+
if (firstErrorTimestamp === null) {
669+
firstErrorTimestamp = Date.now()
670+
671+
// After 10 seconds of continuous errors, set collection status to error
672+
errorGracePeriodTimeout = setTimeout(() => {
673+
markError()
674+
}, ERROR_GRACE_PERIOD_MS)
675+
}
664676

665677
if (shapeOptions.onError) {
666678
return shapeOptions.onError(errorParams)
667679
} else {
668-
console.error(
669-
`An error occurred while syncing collection: ${collection.id}, \n` +
670-
`it has been marked as ready to avoid blocking apps waiting for '.preload()' to finish. \n` +
671-
`You can provide an 'onError' handler on the shapeOptions to handle this error, and this message will not be logged.`,
672-
errorParams
673-
)
680+
// If no custom error handler is provided, throw the error
681+
// This ensures errors propagate to the app and aligns with
682+
// Electric SQL's documented behavior
683+
throw errorParams
674684
}
675-
676-
return
677685
},
678686
})
679687
let transactionStarted = false
@@ -767,6 +775,15 @@ function createElectricSync<T extends Row<unknown>>(
767775
}
768776

769777
if (hasUpToDate) {
778+
// Clear error tracking on successful sync (recovery from transitory errors)
779+
if (firstErrorTimestamp !== null) {
780+
firstErrorTimestamp = null
781+
if (errorGracePeriodTimeout !== null) {
782+
clearTimeout(errorGracePeriodTimeout)
783+
errorGracePeriodTimeout = null
784+
}
785+
}
786+
770787
// Clear the current batch buffer since we're now up-to-date
771788
currentBatchMessages.setState(() => [])
772789

0 commit comments

Comments
 (0)