Skip to content

Commit 93ebf98

Browse files
committed
add txid tracking tests
1 parent aebab16 commit 93ebf98

File tree

3 files changed

+157
-3
lines changed

3 files changed

+157
-3
lines changed

packages/db-collection-e2e/src/suites/progressive.suite.ts

Lines changed: 144 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Progressive Mode Test Suite (Electric only)
2+
* Progressive Mode Test Suite
33
*
44
* Tests progressive sync mode behavior including:
55
* - Snapshot loading during initial sync
@@ -11,12 +11,14 @@
1111
import { describe, expect, it } from "vitest"
1212
import { createLiveQueryCollection, eq, gt } from "@tanstack/db"
1313
import { waitFor, waitForQueryData } from "../utils/helpers"
14-
import type { E2ETestConfig } from "../types"
14+
import type { E2ETestConfig, User } from "../types"
15+
import type { Collection } from "@tanstack/db"
16+
import type { ElectricCollectionUtils } from "@tanstack/electric-db-collection"
1517

1618
export function createProgressiveTestSuite(
1719
getConfig: () => Promise<E2ETestConfig>
1820
) {
19-
describe(`Progressive Mode Suite (Electric only)`, () => {
21+
describe(`Progressive Mode Suite`, () => {
2022
describe(`Basic Progressive Mode`, () => {
2123
it(`should explicitly validate snapshot phase and atomic swap transition`, async () => {
2224
const config = await getConfig()
@@ -406,6 +408,145 @@ export function createProgressiveTestSuite(
406408
})
407409
})
408410

411+
describe(`Txid Tracking Behavior (Electric only)`, () => {
412+
it(`should not track txids during snapshot phase but track them after atomic swap`, async () => {
413+
const config = await getConfig()
414+
if (
415+
!config.collections.progressive ||
416+
!config.mutations?.insertUser ||
417+
!config.getTxid
418+
) {
419+
return // Skip if progressive collections, mutations, or getTxid not available
420+
}
421+
const progressiveUsers = config.collections.progressive
422+
.users as Collection<User, string, ElectricCollectionUtils>
423+
424+
// awaitTxId is guaranteed to exist on ElectricCollectionUtils
425+
// This test is Electric-only via the describe block name
426+
427+
// Start sync but don't release yet (stay in snapshot phase)
428+
progressiveUsers.startSyncImmediate()
429+
await new Promise((resolve) => setTimeout(resolve, 100))
430+
431+
// Should be in loading state (snapshot phase)
432+
if (progressiveUsers.status !== `loading`) {
433+
console.log(
434+
`Collection already ready, cannot test snapshot phase txid behavior`
435+
)
436+
return
437+
}
438+
439+
// === PHASE 1: INSERT DURING SNAPSHOT PHASE ===
440+
const snapshotPhaseUser = {
441+
id: crypto.randomUUID(),
442+
name: `Snapshot Phase User`,
443+
444+
age: 28,
445+
isActive: true,
446+
createdAt: new Date(),
447+
metadata: null,
448+
deletedAt: null,
449+
}
450+
451+
// Insert user and track when awaitTxId completes
452+
let txidResolved = false
453+
454+
// Start the insert
455+
await config.mutations.insertUser(snapshotPhaseUser)
456+
457+
// Get the txid from postgres
458+
const txid = await config.getTxid()
459+
460+
if (!txid) {
461+
console.log(`Could not get txid, skipping txid tracking validation`)
462+
config.progressiveTestControl?.releaseInitialSync()
463+
return
464+
}
465+
466+
// Start awaiting the txid (should NOT resolve during snapshot phase)
467+
progressiveUsers.utils.awaitTxId(txid, 60000).then(() => {
468+
txidResolved = true
469+
})
470+
471+
// Wait a moment for sync to process
472+
await new Promise((resolve) => setTimeout(resolve, 500))
473+
474+
// Txid should NOT have resolved yet (snapshot phase, txids not tracked)
475+
expect(txidResolved).toBe(false)
476+
477+
// Query for the user (triggers fetchSnapshot with this user)
478+
const query = createLiveQueryCollection((q) =>
479+
q
480+
.from({ user: progressiveUsers })
481+
.where(({ user }) => eq(user.id, snapshotPhaseUser.id))
482+
)
483+
484+
await query.preload()
485+
await waitForQueryData(query, { minSize: 1, timeout: 10000 })
486+
487+
// User should be in snapshot data
488+
expect(query.size).toBe(1)
489+
expect(query.get(snapshotPhaseUser.id)).toBeDefined()
490+
491+
// But collection is still in snapshot phase
492+
expect(progressiveUsers.status).toBe(`loading`)
493+
494+
// Txid should STILL not have resolved (snapshot doesn't track txids)
495+
expect(txidResolved).toBe(false)
496+
497+
// === PHASE 2: TRIGGER ATOMIC SWAP ===
498+
if (config.progressiveTestControl) {
499+
config.progressiveTestControl.releaseInitialSync()
500+
}
501+
502+
// Wait for atomic swap to complete
503+
await waitFor(() => progressiveUsers.status === `ready`, {
504+
timeout: 30000,
505+
message: `Progressive collection did not complete sync`,
506+
})
507+
508+
// NOW txid should resolve (buffered messages include txids)
509+
await waitFor(() => txidResolved, {
510+
timeout: 5000,
511+
message: `Txid did not resolve after atomic swap`,
512+
})
513+
514+
expect(txidResolved).toBe(true)
515+
516+
// === PHASE 3: VERIFY TXID TRACKING POST-SWAP ===
517+
// User should still be present after atomic swap
518+
expect(progressiveUsers.get(snapshotPhaseUser.id)).toBeDefined()
519+
520+
// Now insert another user and verify txid tracking works
521+
const postSwapUser = {
522+
id: crypto.randomUUID(),
523+
name: `Post Swap User`,
524+
525+
age: 29,
526+
isActive: true,
527+
createdAt: new Date(),
528+
metadata: null,
529+
deletedAt: null,
530+
}
531+
532+
await config.mutations.insertUser(postSwapUser)
533+
534+
// Wait for incremental update (txid tracking should work now)
535+
if (config.hasReplicationLag) {
536+
await waitFor(() => progressiveUsers.has(postSwapUser.id), {
537+
timeout: 10000,
538+
message: `Post-swap user not synced via incremental update`,
539+
})
540+
}
541+
542+
// Both users should be present
543+
expect(progressiveUsers.get(snapshotPhaseUser.id)).toBeDefined()
544+
expect(progressiveUsers.get(postSwapUser.id)).toBeDefined()
545+
546+
await query.cleanup()
547+
})
548+
})
549+
409550
describe(`Progressive Mode Resilience`, () => {
410551
it(`should handle cleanup and restart during snapshot phase`, async () => {
411552
const config = await getConfig()

packages/db-collection-e2e/src/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ export interface E2ETestConfig {
7676
insertPost: (post: Post) => Promise<void>
7777
}
7878

79+
// Helper to get txid for Electric txid tracking tests (Electric only)
80+
getTxid?: () => Promise<number | null>
81+
7982
// Indicates if the backend has replication lag (e.g., Electric sync)
8083
// When true, tests will wait for mutations to propagate before proceeding
8184
hasReplicationLag?: boolean

packages/electric-db-collection/e2e/electric.e2e.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,16 @@ describe(`Electric Collection E2E Tests`, () => {
431431
commentsUpToDateControl.current?.()
432432
},
433433
},
434+
getTxid: async () => {
435+
// Get the current transaction ID from the last operation
436+
// This uses pg_current_xact_id_if_assigned() which returns the txid
437+
// Note: This gets the CURRENT transaction's ID, so must be called
438+
// immediately after an insert in the same transaction context
439+
const result = await dbClient.query(
440+
`SELECT pg_current_xact_id_if_assigned()::text::bigint as txid`
441+
)
442+
return result.rows[0]?.txid || null
443+
},
434444
mutations: {
435445
// Use direct SQL for Electric tests (simulates external changes)
436446
// This tests that Electric sync picks up database changes

0 commit comments

Comments
 (0)