Skip to content

Commit 85a5bd7

Browse files
committed
fix electric colleciton progressive mode
1 parent db764b7 commit 85a5bd7

File tree

10 files changed

+775
-150
lines changed

10 files changed

+775
-150
lines changed

packages/db-collection-e2e/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ export { createDeduplicationTestSuite } from "./suites/deduplication.suite"
2525
export { createCollationTestSuite } from "./suites/collation.suite"
2626
export { createMutationsTestSuite } from "./suites/mutations.suite"
2727
export { createLiveUpdatesTestSuite } from "./suites/live-updates.suite"
28+
export { createProgressiveTestSuite } from "./suites/progressive.suite"
Lines changed: 387 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,387 @@
1+
/**
2+
* Progressive Mode Test Suite (Electric only)
3+
*
4+
* Tests progressive sync mode behavior including:
5+
* - Snapshot loading during initial sync
6+
* - Atomic swap on first up-to-date
7+
* - Incremental updates after swap
8+
* - Txid tracking behavior
9+
*/
10+
11+
import { describe, expect, it } from "vitest"
12+
import { createLiveQueryCollection, eq, gt } from "@tanstack/db"
13+
import { waitFor, waitForQueryData } from "../utils/helpers"
14+
import type { E2ETestConfig } from "../types"
15+
16+
export function createProgressiveTestSuite(
17+
getConfig: () => Promise<E2ETestConfig>
18+
) {
19+
describe(`Progressive Mode Suite (Electric only)`, () => {
20+
describe(`Basic Progressive Mode`, () => {
21+
it(`should validate snapshot phase behavior and atomic swap with status transition`, async () => {
22+
const config = await getConfig()
23+
if (!config.collections.progressive) {
24+
return // Skip if progressive collections not available
25+
}
26+
const progressiveUsers = config.collections.progressive.users
27+
28+
// Create a query - this will trigger a snapshot fetch if still in snapshot phase
29+
const query = createLiveQueryCollection((q) =>
30+
q
31+
.from({ user: progressiveUsers })
32+
.where(({ user }) => eq(user.age, 25))
33+
)
34+
35+
await query.preload()
36+
await waitForQueryData(query, { minSize: 1, timeout: 10000 })
37+
38+
const querySize = query.size
39+
const queryItems = Array.from(query.values())
40+
41+
// Validate query data
42+
expect(querySize).toBeGreaterThan(0)
43+
queryItems.forEach((user) => {
44+
expect(user.age).toBe(25)
45+
expect(user.id).toBeDefined()
46+
})
47+
48+
// If we're still loading, we should be in snapshot phase
49+
// Base collection should have data from snapshot (query subset)
50+
const statusDuringQuery = progressiveUsers.status
51+
if (statusDuringQuery === `loading`) {
52+
// We're in snapshot phase! Validate snapshot behavior
53+
// Collection should have the snapshot data
54+
expect(progressiveUsers.size).toBeGreaterThan(0)
55+
56+
// But collection size should be <= query size (only snapshot loaded)
57+
// Actually it might have multiple snapshots if other tests ran, so just verify we have data
58+
expect(progressiveUsers.size).toBeGreaterThan(0)
59+
}
60+
61+
// Wait for full sync to complete
62+
await waitFor(() => progressiveUsers.status === `ready`, {
63+
timeout: 30000,
64+
message: `Progressive collection did not complete sync`,
65+
})
66+
67+
// After atomic swap to full synced state
68+
// Collection should have ALL users (not just age=25)
69+
const finalCollectionSize = progressiveUsers.size
70+
expect(finalCollectionSize).toBeGreaterThan(querySize) // More than just our query subset
71+
72+
// Query should still work with consistent data
73+
const finalQueryItems = Array.from(query.values())
74+
finalQueryItems.forEach((user) => {
75+
expect(user.age).toBe(25) // Still matches predicate
76+
expect(user.id).toBeDefined()
77+
})
78+
79+
// Verify some of the original snapshot items are still present
80+
queryItems.forEach((originalUser) => {
81+
const foundInCollection = progressiveUsers.get(originalUser.id)
82+
expect(foundInCollection).toBeDefined()
83+
expect(foundInCollection?.age).toBe(25)
84+
})
85+
86+
await query.cleanup()
87+
})
88+
89+
it(`should load snapshots during initial sync and perform atomic swap`, async () => {
90+
const config = await getConfig()
91+
if (!config.collections.progressive) {
92+
return // Skip if progressive collections not available
93+
}
94+
const progressiveUsers = config.collections.progressive.users
95+
96+
// Progressive collections should only be marked ready AFTER first up-to-date
97+
// If already ready, the full sync completed very fast - we can still test the end state
98+
const wasStillLoading = progressiveUsers.status === `loading`
99+
100+
// Query a subset
101+
const query = createLiveQueryCollection((q) =>
102+
q
103+
.from({ user: progressiveUsers })
104+
.where(({ user }) => eq(user.age, 25))
105+
)
106+
107+
await query.preload()
108+
109+
// Wait for query to have data (either from snapshot during loading, or from final state if already ready)
110+
await waitForQueryData(query, { minSize: 1, timeout: 10000 })
111+
112+
const beforeSwapSize = query.size
113+
const beforeSwapItems = Array.from(query.values())
114+
115+
// Verify all items match the predicate
116+
beforeSwapItems.forEach((user) => {
117+
expect(user.age).toBe(25)
118+
expect(user.id).toBeDefined()
119+
expect(user.name).toBeDefined()
120+
})
121+
122+
if (wasStillLoading) {
123+
// If we caught it during snapshot phase, wait for atomic swap
124+
await waitFor(() => progressiveUsers.status === `ready`, {
125+
timeout: 30000,
126+
message: `Progressive collection did not complete sync`,
127+
})
128+
129+
// After atomic swap, verify data is consistent
130+
// The query should have the same data (from full sync)
131+
const afterSwapItems = Array.from(query.values())
132+
expect(afterSwapItems.length).toBeGreaterThanOrEqual(beforeSwapSize)
133+
134+
// All original items should still be present
135+
beforeSwapItems.forEach((originalUser) => {
136+
const stillPresent = afterSwapItems.some(
137+
(u) => u.id === originalUser.id
138+
)
139+
expect(stillPresent).toBe(true)
140+
})
141+
} else {
142+
// Already ready - verify final state is correct
143+
expect(progressiveUsers.status).toBe(`ready`)
144+
}
145+
146+
// Final validation: all items still match predicate
147+
Array.from(query.values()).forEach((user) => {
148+
expect(user.age).toBe(25)
149+
})
150+
151+
await query.cleanup()
152+
})
153+
154+
it(`should handle multiple snapshots with different predicates`, async () => {
155+
const config = await getConfig()
156+
if (!config.collections.progressive) {
157+
return // Skip if progressive collections not available
158+
}
159+
const progressiveUsers = config.collections.progressive.users
160+
161+
// Create multiple queries with different predicates
162+
const query1 = createLiveQueryCollection((q) =>
163+
q
164+
.from({ user: progressiveUsers })
165+
.where(({ user }) => eq(user.age, 25))
166+
)
167+
168+
const query2 = createLiveQueryCollection((q) =>
169+
q
170+
.from({ user: progressiveUsers })
171+
.where(({ user }) => gt(user.age, 30))
172+
)
173+
174+
await Promise.all([query1.preload(), query2.preload()])
175+
176+
// Wait for both to load snapshots
177+
await Promise.all([
178+
waitForQueryData(query1, { minSize: 1, timeout: 10000 }),
179+
waitForQueryData(query2, { minSize: 1, timeout: 10000 }),
180+
])
181+
182+
expect(query1.size).toBeGreaterThan(0)
183+
expect(query2.size).toBeGreaterThan(0)
184+
185+
// Verify data correctness
186+
const query1Snapshot = Array.from(query1.values())
187+
const query2Snapshot = Array.from(query2.values())
188+
189+
query1Snapshot.forEach((user) => {
190+
expect(user.age).toBe(25)
191+
})
192+
query2Snapshot.forEach((user) => {
193+
expect(user.age).toBeGreaterThan(30)
194+
})
195+
196+
// Wait for full sync
197+
await waitFor(() => progressiveUsers.status === `ready`, {
198+
timeout: 30000,
199+
message: `Progressive collection did not complete sync`,
200+
})
201+
202+
// Both queries should still have data after swap with same predicates
203+
expect(query1.size).toBeGreaterThan(0)
204+
expect(query2.size).toBeGreaterThan(0)
205+
206+
// Verify predicates still match after swap
207+
Array.from(query1.values()).forEach((user) => {
208+
expect(user.age).toBe(25)
209+
})
210+
Array.from(query2.values()).forEach((user) => {
211+
expect(user.age).toBeGreaterThan(30)
212+
})
213+
214+
await Promise.all([query1.cleanup(), query2.cleanup()])
215+
})
216+
})
217+
218+
describe(`Incremental Updates After Swap`, () => {
219+
it(`should receive incremental updates after atomic swap`, async () => {
220+
const config = await getConfig()
221+
if (!config.collections.progressive || !config.mutations?.insertUser) {
222+
return // Skip if progressive collections or mutations not available
223+
}
224+
const progressiveUsers = config.collections.progressive.users
225+
226+
// Wait for full sync first
227+
await waitFor(() => progressiveUsers.status === `ready`, {
228+
timeout: 30000,
229+
message: `Progressive collection did not complete sync`,
230+
})
231+
232+
const initialSize = progressiveUsers.size
233+
234+
// Insert new data
235+
const newUser = {
236+
id: crypto.randomUUID(),
237+
name: `Progressive Test User`,
238+
239+
age: 35,
240+
isActive: true,
241+
createdAt: new Date(),
242+
metadata: null,
243+
deletedAt: null,
244+
}
245+
246+
await config.mutations.insertUser(newUser)
247+
248+
// Wait for incremental update
249+
if (config.hasReplicationLag) {
250+
await waitFor(() => progressiveUsers.size > initialSize, {
251+
timeout: 10000,
252+
message: `New user not synced via incremental update`,
253+
})
254+
}
255+
256+
expect(progressiveUsers.size).toBeGreaterThan(initialSize)
257+
258+
// Verify the new user is in the collection with correct data
259+
const foundUser = progressiveUsers.get(newUser.id)
260+
expect(foundUser).toBeDefined()
261+
expect(foundUser?.id).toBe(newUser.id)
262+
expect(foundUser?.name).toBe(newUser.name)
263+
expect(foundUser?.email).toBe(newUser.email)
264+
expect(foundUser?.age).toBe(newUser.age)
265+
})
266+
})
267+
268+
describe(`Predicate Handling`, () => {
269+
it(`should correctly handle predicates during and after snapshot phase`, async () => {
270+
const config = await getConfig()
271+
if (!config.collections.progressive) {
272+
return // Skip if progressive collections not available
273+
}
274+
const progressiveUsers = config.collections.progressive.users
275+
276+
// Create query with predicate during snapshot phase
277+
const query = createLiveQueryCollection((q) =>
278+
q
279+
.from({ user: progressiveUsers })
280+
.where(({ user }) => gt(user.age, 25))
281+
.orderBy(({ user }) => [user.age, `asc`])
282+
.limit(5)
283+
)
284+
285+
await query.preload()
286+
await waitForQueryData(query, { minSize: 1, timeout: 10000 })
287+
288+
const snapshotPhaseSize = query.size
289+
290+
// Wait for atomic swap
291+
await waitFor(() => progressiveUsers.status === `ready`, {
292+
timeout: 30000,
293+
message: `Progressive collection did not complete sync`,
294+
})
295+
296+
// Verify predicate still works after swap
297+
const afterSwapSize = query.size
298+
const afterSwapItems = Array.from(query.values())
299+
300+
// Size should be reasonable (at least what we had in snapshot phase)
301+
expect(afterSwapSize).toBeGreaterThanOrEqual(snapshotPhaseSize)
302+
303+
// All items should match the predicate
304+
afterSwapItems.forEach((user) => {
305+
expect(user.age).toBeGreaterThan(25)
306+
})
307+
308+
// Should respect limit
309+
expect(afterSwapSize).toBeLessThanOrEqual(5)
310+
311+
await query.cleanup()
312+
})
313+
314+
it(`should deduplicate snapshot requests during snapshot phase`, async () => {
315+
const config = await getConfig()
316+
if (!config.collections.progressive) {
317+
return // Skip if progressive collections not available
318+
}
319+
const progressiveUsers = config.collections.progressive.users
320+
321+
// Create multiple identical queries (should be deduplicated)
322+
const queries = Array.from({ length: 3 }, () =>
323+
createLiveQueryCollection((q) =>
324+
q
325+
.from({ user: progressiveUsers })
326+
.where(({ user }) => eq(user.age, 30))
327+
)
328+
)
329+
330+
// Execute concurrently
331+
await Promise.all(queries.map((q) => q.preload()))
332+
333+
// Wait for data
334+
await Promise.all(
335+
queries.map((q) =>
336+
waitForQueryData(q, { minSize: 1, timeout: 10000 })
337+
)
338+
)
339+
340+
// All should have the same size and same data
341+
const sizes = queries.map((q) => q.size)
342+
expect(new Set(sizes).size).toBe(1) // All sizes are identical
343+
344+
// Verify all queries have identical data (deduplication working)
345+
const firstQueryData = Array.from(queries[0]!.values())
346+
const firstQueryIds = new Set(firstQueryData.map((u) => u.id))
347+
348+
queries.forEach((query) => {
349+
const queryData = Array.from(query.values())
350+
queryData.forEach((user) => {
351+
expect(user.age).toBe(30) // All match predicate
352+
expect(firstQueryIds.has(user.id)).toBe(true) // Same items
353+
})
354+
})
355+
356+
await Promise.all(queries.map((q) => q.cleanup()))
357+
})
358+
})
359+
360+
describe(`Progressive Mode Resilience`, () => {
361+
it(`should handle cleanup and restart during snapshot phase`, async () => {
362+
const config = await getConfig()
363+
if (!config.collections.progressive) {
364+
return // Skip if progressive collections not available
365+
}
366+
const progressiveUsers = config.collections.progressive.users
367+
368+
// This test verifies the collection can be cleaned up even during snapshot phase
369+
// and that the atomic swap doesn't cause issues
370+
371+
const query = createLiveQueryCollection((q) =>
372+
q
373+
.from({ user: progressiveUsers })
374+
.where(({ user }) => eq(user.age, 25))
375+
)
376+
377+
await query.preload()
378+
379+
// Don't wait for data, just cleanup immediately
380+
await query.cleanup()
381+
382+
// Should not throw
383+
expect(true).toBe(true)
384+
})
385+
})
386+
})
387+
}

0 commit comments

Comments
 (0)