Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cruel-buckets-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Added `isLoadingMore` property and `loadingMore:change` events to collections and live queries, enabling UIs to display loading indicators when more data is being fetched via `syncMore`. Each live query maintains its own isolated loading state based on its subscriptions, preventing loading status "bleed" between independent queries that share the same source collections.
58 changes: 58 additions & 0 deletions .changeset/smooth-goats-ring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
"@tanstack/react-db": patch
---

Add `useLiveInfiniteQuery` hook for infinite scrolling with live updates.

The new `useLiveInfiniteQuery` hook provides an infinite query pattern similar to TanStack Query's `useInfiniteQuery`, but with live updates from your local collection. It uses `liveQueryCollection.utils.setWindow()` internally to efficiently paginate through ordered data without recreating the query on each page fetch.

**Key features:**

- Automatic live updates as data changes in the collection
- Efficient pagination using dynamic window adjustment
- Peek-ahead mechanism to detect when more pages are available
- Compatible with TanStack Query's infinite query API patterns

**Example usage:**

```tsx
import { useLiveInfiniteQuery } from "@tanstack/react-db"

function PostList() {
const { data, pages, fetchNextPage, hasNextPage, isLoading } =
useLiveInfiniteQuery(
(q) =>
q
.from({ posts: postsCollection })
.orderBy(({ posts }) => posts.createdAt, "desc"),
{
pageSize: 20,
getNextPageParam: (lastPage, allPages) =>
lastPage.length === 20 ? allPages.length : undefined,
}
)

if (isLoading) return <div>Loading...</div>

return (
<div>
{pages.map((page, i) => (
<div key={i}>
{page.map((post) => (
<PostCard key={post.id} post={post} />
))}
</div>
))}
{hasNextPage && (
<button onClick={() => fetchNextPage()}>Load More</button>
)}
</div>
)
}
```

**Requirements:**

- Query must include `.orderBy()` for the window mechanism to work
- Returns flattened `data` array and `pages` array for flexible rendering
- Automatically detects new pages when data is synced to the collection
99 changes: 25 additions & 74 deletions packages/db/src/collection/events.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventEmitter } from "../event-emitter.js"
import type { Collection } from "./index.js"
import type { CollectionStatus } from "../types.js"

Expand Down Expand Up @@ -31,9 +32,21 @@ export interface CollectionSubscribersChangeEvent {
subscriberCount: number
}

/**
* Event emitted when the collection's loading more state changes
*/
export interface CollectionLoadingSubsetChangeEvent {
type: `loadingSubset:change`
collection: Collection<any, any, any, any, any>
isLoadingSubset: boolean
previousIsLoadingSubset: boolean
loadingSubsetTransition: `start` | `end`
}

export type AllCollectionEvents = {
"status:change": CollectionStatusChangeEvent
"subscribers:change": CollectionSubscribersChangeEvent
"loadingSubset:change": CollectionLoadingSubsetChangeEvent
} & {
[K in CollectionStatus as `status:${K}`]: CollectionStatusEvent<K>
}
Expand All @@ -42,94 +55,32 @@ export type CollectionEvent =
| AllCollectionEvents[keyof AllCollectionEvents]
| CollectionStatusChangeEvent
| CollectionSubscribersChangeEvent
| CollectionLoadingSubsetChangeEvent

export type CollectionEventHandler<T extends keyof AllCollectionEvents> = (
event: AllCollectionEvents[T]
) => void

export class CollectionEventsManager {
export class CollectionEventsManager extends EventEmitter<AllCollectionEvents> {
private collection!: Collection<any, any, any, any, any>
private listeners = new Map<
keyof AllCollectionEvents,
Set<CollectionEventHandler<any>>
>()

constructor() {}
constructor() {
super()
}

setDeps(deps: { collection: Collection<any, any, any, any, any> }) {
this.collection = deps.collection
}

on<T extends keyof AllCollectionEvents>(
event: T,
callback: CollectionEventHandler<T>
) {
if (!this.listeners.has(event)) {
this.listeners.set(event, new Set())
}
this.listeners.get(event)!.add(callback)

return () => {
this.listeners.get(event)?.delete(callback)
}
}

once<T extends keyof AllCollectionEvents>(
event: T,
callback: CollectionEventHandler<T>
) {
const unsubscribe = this.on(event, (eventPayload) => {
callback(eventPayload)
unsubscribe()
})
return unsubscribe
}

off<T extends keyof AllCollectionEvents>(
event: T,
callback: CollectionEventHandler<T>
) {
this.listeners.get(event)?.delete(callback)
}

waitFor<T extends keyof AllCollectionEvents>(
event: T,
timeout?: number
): Promise<AllCollectionEvents[T]> {
return new Promise((resolve, reject) => {
let timeoutId: NodeJS.Timeout | undefined
const unsubscribe = this.on(event, (eventPayload) => {
if (timeoutId) {
clearTimeout(timeoutId)
timeoutId = undefined
}
resolve(eventPayload)
unsubscribe()
})
if (timeout) {
timeoutId = setTimeout(() => {
timeoutId = undefined
unsubscribe()
reject(new Error(`Timeout waiting for event ${event}`))
}, timeout)
}
})
}

/**
* Emit an event to all listeners
* Public API for emitting collection events
*/
emit<T extends keyof AllCollectionEvents>(
event: T,
eventPayload: AllCollectionEvents[T]
) {
this.listeners.get(event)?.forEach((listener) => {
try {
listener(eventPayload)
} catch (error) {
// Re-throw in a microtask to surface the error
queueMicrotask(() => {
throw error
})
}
})
): void {
this.emitInner(event, eventPayload)
}

emitStatusChange<T extends CollectionStatus>(
Expand Down Expand Up @@ -166,6 +117,6 @@ export class CollectionEventsManager {
}

cleanup() {
this.listeners.clear()
this.clearListeners()
}
}
24 changes: 10 additions & 14 deletions packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import type {
InferSchemaOutput,
InsertConfig,
NonSingleResult,
OnLoadMoreOptions,
OperationConfig,
SingleResult,
SubscribeChangesOptions,
Expand Down Expand Up @@ -218,7 +217,7 @@ export class CollectionImpl<
private _events: CollectionEventsManager
private _changes: CollectionChangesManager<TOutput, TKey, TSchema, TInput>
public _lifecycle: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>
private _sync: CollectionSyncManager<TOutput, TKey, TSchema, TInput>
public _sync: CollectionSyncManager<TOutput, TKey, TSchema, TInput>
private _indexes: CollectionIndexesManager<TOutput, TKey, TSchema, TInput>
private _mutations: CollectionMutationsManager<
TOutput,
Expand Down Expand Up @@ -303,6 +302,7 @@ export class CollectionImpl<
collection: this, // Required for passing to config.sync callback
state: this._state,
lifecycle: this._lifecycle,
events: this._events,
})

// Only start sync immediately if explicitly enabled
Expand Down Expand Up @@ -356,23 +356,19 @@ export class CollectionImpl<
}

/**
* Start sync immediately - internal method for compiled queries
* This bypasses lazy loading for special cases like live query results
* Check if the collection is currently loading more data
* @returns true if the collection has pending load more operations, false otherwise
*/
public startSyncImmediate(): void {
this._sync.startSync()
public get isLoadingSubset(): boolean {
return this._sync.isLoadingSubset
}

/**
* Requests the sync layer to load more data.
* @param options Options to control what data is being loaded
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
* If data loading is synchronous, the data is loaded when the method returns.
* Start sync immediately - internal method for compiled queries
* This bypasses lazy loading for special cases like live query results
*/
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather keep this method such that we can keep the _sync property private

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, syncMore is an internal method that shouldering be presented on the front he public api. The managers having the underscore marks them as "internal implementation but exposed for debugging and internal communication". I would prefer to not present syncMore on the prompts when people are interacting with a collection.

if (this._sync.syncOnLoadMoreFn) {
return this._sync.syncOnLoadMoreFn(options)
}
public startSyncImmediate(): void {
this._sync.startSync()
}

/**
Expand Down
Loading
Loading