Skip to content

Commit a24fe7f

Browse files
committed
address review
1 parent 1527028 commit a24fe7f

File tree

1 file changed

+109
-61
lines changed

1 file changed

+109
-61
lines changed

packages/electric-db-collection/src/electric.ts

Lines changed: 109 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,93 @@ function hasTxids<T extends Row<unknown>>(
279279
return `txids` in message.headers && Array.isArray(message.headers.txids)
280280
}
281281

282+
/**
283+
* Creates a deduplicated loadSubset handler for progressive/on-demand modes
284+
* Returns null for eager mode, or a DeduplicatedLoadSubset instance for other modes.
285+
* Handles fetching snapshots in progressive mode during buffering phase,
286+
* and requesting snapshots in on-demand mode
287+
*/
288+
function createLoadSubsetDedupe<T extends Row<unknown>>({
289+
stream,
290+
syncMode,
291+
isBufferingInitialSync,
292+
begin,
293+
write,
294+
commit,
295+
collectionId,
296+
}: {
297+
stream: ShapeStream<T>
298+
syncMode: ElectricSyncMode
299+
isBufferingInitialSync: () => boolean
300+
begin: () => void
301+
write: (mutation: {
302+
type: `insert` | `update` | `delete`
303+
value: T
304+
metadata: Record<string, unknown>
305+
}) => void
306+
commit: () => void
307+
collectionId?: string
308+
}): DeduplicatedLoadSubset | null {
309+
// Eager mode doesn't need subset loading
310+
if (syncMode === `eager`) {
311+
return null
312+
}
313+
314+
const loadSubset = async (opts: LoadSubsetOptions) => {
315+
// In progressive mode, use fetchSnapshot during snapshot phase
316+
if (isBufferingInitialSync()) {
317+
// Progressive mode snapshot phase: fetch and apply immediately
318+
const snapshotParams = compileSQL<T>(opts)
319+
try {
320+
const { data: rows } = await stream.fetchSnapshot(snapshotParams)
321+
322+
// Check again if we're still buffering - we might have received up-to-date
323+
// and completed the atomic swap while waiting for the snapshot
324+
if (!isBufferingInitialSync()) {
325+
debug(
326+
`${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching`
327+
)
328+
return
329+
}
330+
331+
// Apply snapshot data in a sync transaction (only if we have data)
332+
if (rows.length > 0) {
333+
begin()
334+
for (const row of rows) {
335+
write({
336+
type: `insert`,
337+
value: row.value,
338+
metadata: {
339+
...row.headers,
340+
},
341+
})
342+
}
343+
commit()
344+
345+
debug(
346+
`${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows`
347+
)
348+
}
349+
} catch (error) {
350+
debug(
351+
`${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`,
352+
error
353+
)
354+
throw error
355+
}
356+
} else if (syncMode === `progressive`) {
357+
// Progressive mode after full sync complete: no need to load more
358+
return
359+
} else {
360+
// On-demand mode: use requestSnapshot
361+
const snapshotParams = compileSQL<T>(opts)
362+
await stream.requestSnapshot(snapshotParams)
363+
}
364+
}
365+
366+
return new DeduplicatedLoadSubset({ loadSubset })
367+
}
368+
282369
/**
283370
* Type for the awaitTxId utility function
284371
*/
@@ -696,10 +783,10 @@ function createElectricSync<T extends Row<unknown>>(
696783

697784
// Wrap markReady to wait for test hook in progressive mode
698785
let progressiveReadyGate: Promise<void> | null = null
699-
const wrappedMarkReady = () => {
786+
const wrappedMarkReady = (isBuffering: boolean) => {
700787
// Only create gate if we're in buffering phase (first up-to-date)
701788
if (
702-
isBufferingInitialSync &&
789+
isBuffering &&
703790
syncMode === `progressive` &&
704791
testHooks?.beforeMarkingReady
705792
) {
@@ -780,61 +867,23 @@ function createElectricSync<T extends Row<unknown>>(
780867
let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode
781868

782869
// Progressive mode state
783-
let isBufferingInitialSync = syncMode === `progressive` // True until first up-to-date in progressive mode
870+
// Helper to determine if we're buffering the initial sync
871+
const isBufferingInitialSync = () =>
872+
syncMode === `progressive` && !hasReceivedUpToDate
784873
const bufferedMessages: Array<Message<T>> = [] // Buffer change messages during initial sync
785874

786875
// Create deduplicated loadSubset wrapper for non-eager modes
787876
// This prevents redundant snapshot requests when multiple concurrent
788877
// live queries request overlapping or subset predicates
789-
const loadSubsetDedupe =
790-
syncMode === `eager`
791-
? null
792-
: new DeduplicatedLoadSubset({
793-
loadSubset: async (opts: LoadSubsetOptions) => {
794-
// In progressive mode, use fetchSnapshot during snapshot phase
795-
if (syncMode === `progressive`) {
796-
if (hasReceivedUpToDate) {
797-
// Full sync complete, no need to load more
798-
return
799-
}
800-
// Snapshot phase: fetch and apply immediately
801-
const snapshotParams = compileSQL<T>(opts)
802-
try {
803-
const snapshot = await stream.fetchSnapshot(snapshotParams)
804-
const rows = snapshot.data
805-
806-
// Apply snapshot data in a sync transaction (only if we have data)
807-
if (rows.length > 0) {
808-
begin()
809-
for (const row of rows) {
810-
write({
811-
type: `insert`,
812-
value: row.value,
813-
metadata: {
814-
...row.headers,
815-
},
816-
})
817-
}
818-
commit()
819-
820-
debug(
821-
`${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows`
822-
)
823-
}
824-
} catch (error) {
825-
debug(
826-
`${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`,
827-
error
828-
)
829-
throw error
830-
}
831-
} else {
832-
// On-demand mode: use requestSnapshot
833-
const snapshotParams = compileSQL<T>(opts)
834-
await stream.requestSnapshot(snapshotParams)
835-
}
836-
},
837-
})
878+
const loadSubsetDedupe = createLoadSubsetDedupe({
879+
stream,
880+
syncMode,
881+
isBufferingInitialSync,
882+
begin,
883+
write,
884+
commit,
885+
collectionId,
886+
})
838887

839888
unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
840889
let hasUpToDate = false
@@ -855,7 +904,7 @@ function createElectricSync<T extends Row<unknown>>(
855904

856905
// Check for txids in the message and add them to our store
857906
// Skip during buffered initial sync in progressive mode (txids will be extracted during atomic swap)
858-
if (hasTxids(message) && !isBufferingInitialSync) {
907+
if (hasTxids(message) && !isBufferingInitialSync()) {
859908
message.headers.txids?.forEach((txid) => newTxids.add(txid))
860909
}
861910

@@ -890,7 +939,7 @@ function createElectricSync<T extends Row<unknown>>(
890939
}
891940

892941
// In buffered initial sync of progressive mode, buffer messages instead of writing
893-
if (isBufferingInitialSync) {
942+
if (isBufferingInitialSync()) {
894943
bufferedMessages.push(message)
895944
} else {
896945
// Normal processing: write changes immediately
@@ -910,7 +959,7 @@ function createElectricSync<T extends Row<unknown>>(
910959
}
911960
} else if (isSnapshotEndMessage(message)) {
912961
// Skip snapshot-end tracking during buffered initial sync (will be extracted during atomic swap)
913-
if (!isBufferingInitialSync) {
962+
if (!isBufferingInitialSync()) {
914963
newSnapshots.push(parseSnapshotMessage(message))
915964
}
916965
hasSnapshotEnd = true
@@ -936,15 +985,14 @@ function createElectricSync<T extends Row<unknown>>(
936985
// Reset flags so we continue accumulating changes until next up-to-date
937986
hasUpToDate = false
938987
hasSnapshotEnd = false
939-
hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync
940-
isBufferingInitialSync = syncMode === `progressive` // Reset buffering state
988+
hasReceivedUpToDate = false // Reset for progressive mode (isBufferingInitialSync will reflect this)
941989
bufferedMessages.length = 0 // Clear buffered messages
942990
}
943991
}
944992

945993
if (hasUpToDate || hasSnapshotEnd) {
946994
// PROGRESSIVE MODE: Atomic swap on first up-to-date
947-
if (isBufferingInitialSync && hasUpToDate) {
995+
if (isBufferingInitialSync() && hasUpToDate) {
948996
debug(
949997
`${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`
950998
)
@@ -981,8 +1029,8 @@ function createElectricSync<T extends Row<unknown>>(
9811029
// Commit the atomic swap
9821030
commit()
9831031

984-
// Exit buffering phase - now in normal sync mode
985-
isBufferingInitialSync = false
1032+
// Exit buffering phase by marking that we've received up-to-date
1033+
// isBufferingInitialSync() will now return false
9861034
bufferedMessages.length = 0
9871035

9881036
debug(
@@ -1001,7 +1049,7 @@ function createElectricSync<T extends Row<unknown>>(
10011049

10021050
if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) {
10031051
// Mark the collection as ready now that sync is up to date
1004-
wrappedMarkReady()
1052+
wrappedMarkReady(isBufferingInitialSync())
10051053
}
10061054

10071055
// Track that we've received the first up-to-date for progressive mode

0 commit comments

Comments
 (0)