diff --git a/.changeset/odd-bobcats-see.md b/.changeset/odd-bobcats-see.md new file mode 100644 index 000000000..c6e4eadf7 --- /dev/null +++ b/.changeset/odd-bobcats-see.md @@ -0,0 +1,22 @@ +--- +'@solana/subscribable': major +'@solana/kit': major +--- + +Collapse `loading` and `retrying` into a single `loading` status on `ReactiveStreamStore`, mirroring the action store's `running` (which is itself the merged "first call vs subsequent call" state). `data` and `error` are preserved through `loading` for stale-while-revalidate — UI can render the prior outcome alongside an in-flight reconnect. + +`ReactiveState` drops the `retrying` variant. `loading` widens from `{ data: undefined, error: undefined }` to `{ data: T | undefined, error: unknown }`. Both `createReactiveStoreFromDataPublisherFactory` and `createReactiveStoreWithInitialValueAndSlotTracking` now transition every `connect()` through `loading` (preserving `currentState.data` and `currentState.error`); a subsequent `loaded` clears `error`, a subsequent `error` replaces it. + +```ts +// Previously: +{ status: 'error', data: lastValue, error: caughtError } +// connect() → +{ status: 'retrying', data: lastValue, error: undefined } // error cleared, separate status + +// Now: +{ status: 'error', data: lastValue, error: caughtError } +// connect() → +{ status: 'loading', data: lastValue, error: caughtError } // error preserved, unified status +``` + +Migration: replace `status === 'retrying'` checks with `status === 'loading' && data !== undefined` (or just `status === 'loading'` if you don't need to distinguish first-load vs reconnect — the SWR pattern lets you render whatever is in `data` regardless). diff --git a/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts b/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts index 7ba6fb367..b8aa20531 100644 --- a/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts +++ b/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts @@ -667,7 +667,7 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => { store.retry(); expect(rpcRequest.send).toHaveBeenCalledTimes(1); }); - it('transitions to `retrying` with preserved data and clears the error', async () => { + it('transitions back to `loading` with preserved data AND error (SWR)', async () => { expect.assertions(1); const { rpcInstances, rpcRequest, rpcSubscriptionRequest, subscriptionInstances } = createRetryableMocks(); const store = createReactiveStoreWithInitialValueAndSlotTracking({ @@ -679,13 +679,14 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => { store.connect(); rpcInstances[0].resolve(rpcResponse(100, { count: 42 })); await jest.runAllTimersAsync(); - subscriptionInstances[0].error(new Error('stream died')); + const fail = new Error('stream died'); + subscriptionInstances[0].error(fail); await jest.runAllTimersAsync(); store.retry(); expect(store.getUnifiedState()).toStrictEqual({ data: { context: { slot: 100n }, value: 42 }, - error: undefined, - status: 'retrying', + error: fail, + status: 'loading', }); }); it('re-invokes the RPC request and subscription on retry', async () => { @@ -750,7 +751,7 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => { status: 'error', }); }); - it('notifies subscribers on the retrying transition', async () => { + it('notifies subscribers on the error → loading transition after retry', async () => { expect.assertions(1); const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks(); const store = createReactiveStoreWithInitialValueAndSlotTracking({ diff --git a/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts b/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts index cdaaaed34..33054a36a 100644 --- a/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts +++ b/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts @@ -57,14 +57,14 @@ const IDLE_STATE: ReactiveState = Object.freeze({ * - The returned store starts in `status: 'idle'`. Call * {@link ReactiveStreamStore.connect | `connect()`} to fire the RPC request and open the * subscription. - * - From `idle`, the store transitions through `loading` until the first response or notification - * arrives, then to `loaded` with a {@link SolanaRpcResponse} containing the value and the slot - * context at which it was observed. + * - The store transitions through `loading` until the first response or notification arrives, + * then to `loaded` with a {@link SolanaRpcResponse} containing the value and the slot context + * at which it was observed. * - On error from either source, the store transitions to `status: 'error'` preserving the last * known value. Only the first error per connection window is captured. - * - A subsequent `connect()` after `loaded` or `error` aborts the current connection, transitions - * through `status: 'retrying'` (preserving stale data), and re-fires the RPC request and - * subscription with a fresh inner abort signal. + * - A subsequent `connect()` aborts the current connection, transitions back to + * `status: 'loading'` (preserving the last known `data` and `error` for stale-while-revalidate), + * and re-fires the RPC request and subscription with a fresh inner abort signal. * - {@link ReactiveStreamStore.reset | `reset()`} aborts the current connection and returns the * store to `idle`, clearing `data` and `error`. * - Attach a caller-provided cancellation source via @@ -145,12 +145,9 @@ export function createReactiveStoreWithInitialValueAndSlotTracking { if (signal.aborted) return; - // `lastUpdateSlot` persists across retries so the store never regresses. If the - // retried RPC returns a slot older than one we've already seen, we wait for the - // subscription to deliver something newer before leaving `retrying`. + // `lastUpdateSlot` persists across reconnects so the store never regresses. If + // the re-fetched RPC returns a slot older than one we've already seen, we wait + // for the subscription to deliver something newer before leaving `loading`. if (slot < lastUpdateSlot) return; lastUpdateSlot = slot; handleValue({ context: { slot }, value: rpcValueMapper(value) }); diff --git a/packages/subscribable/README.md b/packages/subscribable/README.md index d05e7321f..6c70e76a6 100644 --- a/packages/subscribable/README.md +++ b/packages/subscribable/README.md @@ -35,18 +35,17 @@ This type represents a reactive store that holds the latest value published to a ```ts type ReactiveState = - | { data: undefined; error: undefined; status: 'loading' } + | { data: T | undefined; error: unknown; status: 'loading' } | { data: T; error: undefined; status: 'loaded' } | { data: T | undefined; error: unknown; status: 'error' } - | { data: T | undefined; error: undefined; status: 'retrying' } | { data: undefined; error: undefined; status: 'idle' }; ``` > Also exported as `ReactiveStore` for backwards compatibility. That alias is deprecated and will be removed in a future major release. -The store starts in `status: 'idle'`. Call `connect()` to open the underlying stream; the store will transition through `loading` → `loaded` (or `error`). A subsequent `connect()` after `loaded` or `error` transitions through `retrying` while preserving the last known value. Call `reset()` to tear down the connection and return to `idle` without permanently killing the store. +The store starts in `status: 'idle'`. Call `connect()` to open the underlying stream; the store will transition through `loading` → `loaded` (or `error`). Every subsequent `connect()` transitions back through `loading`, preserving the last known `data` and `error` (stale-while-revalidate). A subsequent `loaded` clears the error; a subsequent `error` replaces it. Call `reset()` to tear down the connection and return to `idle` without permanently killing the store. -```ts +```tsx const store: ReactiveStreamStore = /* ... */; // React — snapshot identity is stable between updates, so it can be passed directly. @@ -55,9 +54,14 @@ useEffect(() => { store.connect(); return () => store.reset(); }, [store]); -if (state.status === 'error') return ; -if (state.status === 'loading' || state.status === 'idle') return ; -return ; +// Stale-while-revalidate: keep showing the last value while a reconnect is in flight. +return ( + <> + {state.data !== undefined && } + {state.status === 'loading' && state.data === undefined && } + {state.status === 'error' && } + +); // Vue const snapshot = shallowRef(store.getUnifiedState()); @@ -195,33 +199,32 @@ Things to note: - If there are messages in the queue and the abort signal fires, all queued messages will be vended to the iterator after which it will return. - Any new iterators created after the first error is encountered will reject with that error when polled. -### `createReactiveStoreFromDataPublisherFactory({ abortSignal, createDataPublisher, dataChannelName, errorChannelName })` +### `createReactiveStoreFromDataPublisherFactory({ createDataPublisher, dataChannelName, errorChannelName })` -Returns a `ReactiveStreamStore` that wires itself to a fresh `DataPublisher` on every `connect()`. Accepts an async factory so the store can tear down a broken stream and open a new one without losing subscribers or the last known value. +Returns a `ReactiveStreamStore` that wires itself to a fresh `DataPublisher` on every `connect()`. Accepts an async factory so the store can tear down a broken stream and open a new one without losing subscribers or the last known value. The factory receives the per-connection `AbortSignal` so the underlying transport can stop when the connection window closes. ```ts const store = createReactiveStoreFromDataPublisherFactory({ - abortSignal: AbortSignal.timeout(60_000), - async createDataPublisher() { - return await openMyConnection(); - }, - dataChannelName: 'notification', + createDataPublisher: signal => getDataPublisherFromEventEmitter(new WebSocket(url, { signal })), + dataChannelName: 'message', errorChannelName: 'error', }); -store.subscribe(() => { +const unsubscribe = store.subscribe(() => { const snapshot = store.getUnifiedState(); - if (snapshot.status === 'error') store.connect(); + if (snapshot.status === 'error') console.error('Connection failed:', snapshot.error); + else if (snapshot.status === 'loaded') console.log('Latest:', snapshot.data); }); -store.connect(); +// Fresh 30-second clock per connection attempt: +store.withSignal(AbortSignal.timeout(30_000)).connect(); ``` Things to note: - The returned store starts in `status: 'idle'`. Call `connect()` to open the first stream. -- `createDataPublisher` is invoked on every `connect()`. From `idle`, the store transitions through `loading`; from any other status, through `retrying` while preserving the last known value. +- `createDataPublisher` is invoked on every `connect()`. The store transitions through `loading`, preserving the last known `data` and `error` (stale-while-revalidate). - If `createDataPublisher` rejects, the store transitions to `status: 'error'` with the rejection as the error. Call `connect()` to try again. - `reset()` aborts the current connection and returns the store to `idle`, clearing `data` and `error`. A follow-up `connect()` opens a fresh stream. -- Triggering the caller's `abortSignal` disconnects the store permanently; subsequent `connect()` calls are no-ops. +- Attach a caller-provided cancellation source via `store.withSignal(signal).connect()` — the signal is composed with the per-connection controller via `AbortSignal.any`. Aborting the caller's signal transitions the store to `error` with that abort reason. ### `demultiplexDataPublisher(publisher, sourceChannelName, messageTransformer)` diff --git a/packages/subscribable/src/__tests__/reactive-stream-store-test.ts b/packages/subscribable/src/__tests__/reactive-stream-store-test.ts index 705533b3a..03c0d84e5 100644 --- a/packages/subscribable/src/__tests__/reactive-stream-store-test.ts +++ b/packages/subscribable/src/__tests__/reactive-stream-store-test.ts @@ -122,7 +122,7 @@ describe('createReactiveStoreFromDataPublisherFactory', () => { status: 'error', }); }); - it('from `error`, transitions through `retrying` preserving stale data', async () => { + it('from `error`, transitions back through `loading` preserving stale data AND error (SWR)', async () => { expect.assertions(1); const { mockRequest, publishers } = createFactory(); const store = createReactiveStoreFromDataPublisherFactory({ @@ -133,15 +133,16 @@ describe('createReactiveStoreFromDataPublisherFactory', () => { store.connect(); await jest.runAllTimersAsync(); publishers[0].publish('data', { value: 42 }); - publishers[0].publish('error', new Error('fail')); + const fail = new Error('fail'); + publishers[0].publish('error', fail); store.connect(); expect(store.getUnifiedState()).toStrictEqual({ data: { value: 42 }, - error: undefined, - status: 'retrying', + error: fail, + status: 'loading', }); }); - it('from `loaded`, transitions through `retrying` preserving the last value', async () => { + it('from `loaded`, transitions back through `loading` preserving the last value', async () => { expect.assertions(1); const { mockRequest, publishers } = createFactory(); const store = createReactiveStoreFromDataPublisherFactory({ @@ -156,7 +157,7 @@ describe('createReactiveStoreFromDataPublisherFactory', () => { expect(store.getUnifiedState()).toStrictEqual({ data: { value: 42 }, error: undefined, - status: 'retrying', + status: 'loading', }); }); it('invokes the factory again on each connect()', async () => { @@ -193,7 +194,7 @@ describe('createReactiveStoreFromDataPublisherFactory', () => { status: 'loaded', }); }); - it('notifies subscribers on the retrying transition', async () => { + it('notifies subscribers on the loaded → loading transition after reconnect', async () => { expect.assertions(1); const { mockRequest, publishers } = createFactory(); const store = createReactiveStoreFromDataPublisherFactory({ @@ -402,7 +403,7 @@ describe('createReactiveStoreFromDataPublisherFactory', () => { expect(mockRequest).toHaveBeenCalledTimes(callsBefore); expect(store.getUnifiedState().status).toBe('loading'); }); - it('transitions to `retrying` from `error`', async () => { + it('transitions back to `loading` from `error`, preserving stale data and error (SWR)', async () => { expect.assertions(1); const { mockRequest, publishers } = createFactory(); const store = createReactiveStoreFromDataPublisherFactory({ @@ -413,12 +414,13 @@ describe('createReactiveStoreFromDataPublisherFactory', () => { store.connect(); await jest.runAllTimersAsync(); publishers[0].publish('data', { value: 42 }); - publishers[0].publish('error', new Error('fail')); + const fail = new Error('fail'); + publishers[0].publish('error', fail); store.retry(); expect(store.getUnifiedState()).toStrictEqual({ data: { value: 42 }, - error: undefined, - status: 'retrying', + error: fail, + status: 'loading', }); }); }); diff --git a/packages/subscribable/src/reactive-stream-store.ts b/packages/subscribable/src/reactive-stream-store.ts index dae7d4067..9e3fdd35e 100644 --- a/packages/subscribable/src/reactive-stream-store.ts +++ b/packages/subscribable/src/reactive-stream-store.ts @@ -37,19 +37,18 @@ type FactoryConfig = Readonly<{ * - `idle`: the store has not yet been connected, or has been reset via * {@link ReactiveStreamStore.reset | `reset()`}. Call * {@link ReactiveStreamStore.connect | `connect()`} to open the underlying stream. - * - `loading`: a first connection is in progress; no data has arrived yet. + * - `loading`: a connection is in progress. `data` and `error` are preserved from the previous + * connection (if any) — stale-while-revalidate UX. A subsequent `loaded` clears `error`; a + * subsequent `error` replaces it. * - `loaded`: a value has been received and no error is active. * - `error`: the stream failed. `data` holds the last known value (or `undefined` if none ever * arrived) and `error` holds the failure. - * - `retrying`: a follow-up `connect()` is in progress after a previous outcome. `error` is - * cleared; `data` is preserved from the previous connection if any. */ export type ReactiveState = - | { readonly data: T | undefined; readonly error: undefined; readonly status: 'retrying' } | { readonly data: T | undefined; readonly error: unknown; readonly status: 'error' } + | { readonly data: T | undefined; readonly error: unknown; readonly status: 'loading' } | { readonly data: T; readonly error: undefined; readonly status: 'loaded' } - | { readonly data: undefined; readonly error: undefined; readonly status: 'idle' } - | { readonly data: undefined; readonly error: undefined; readonly status: 'loading' }; + | { readonly data: undefined; readonly error: undefined; readonly status: 'idle' }; const IDLE_STATE: ReactiveState = Object.freeze({ data: undefined, @@ -63,9 +62,9 @@ const IDLE_STATE: ReactiveState = Object.freeze({ * `from()`, and other reactive primitives that expect a `{ subscribe, getUnifiedState }` contract. * * The store starts in `status: 'idle'`. Call {@link ReactiveStreamStore.connect | `connect()`} - * to open the underlying stream; the store will then transition through `loading` → `loaded` (or - * `error`). A subsequent `connect()` after `loaded` or `error` transitions through `retrying` - * while preserving the last known value. + * to open the underlying stream; the store transitions through `loading` → `loaded` (or `error`). + * Subsequent `connect()` calls also pass through `loading` while preserving the last known + * `data` and `error` (stale-while-revalidate). * * @example * ```ts @@ -86,9 +85,9 @@ const IDLE_STATE: ReactiveState = Object.freeze({ export type ReactiveStreamStore = { /** * Open the underlying stream. Aborts any currently active connection, invokes the configured - * factory, and transitions the store through `loading` (when called from `idle` or while a - * connection is already in flight) or `retrying` (when called after a previous outcome — - * i.e. `loaded` or `error`) before settling into `loaded` (on data) or `error` (on failure). + * factory, and transitions the store to `loading` (preserving the last known `data` and + * `error` for stale-while-revalidate) before settling into `loaded` (on data) or `error` + * (on failure). */ connect(): void; /** @@ -104,7 +103,7 @@ export type ReactiveStreamStore = { * notification has arrived yet. On error, continues to return the last known value. * * @deprecated Use {@link ReactiveStreamStore.getUnifiedState | `getUnifiedState()`} instead. This - * getter returns only the value field and does not surface lifecycle status (e.g. `retrying`). + * getter returns only the value field and does not surface lifecycle status (e.g. `loading`). */ getState(): T | undefined; /** @@ -204,9 +203,8 @@ export type ReactiveStreamSource = { * Things to note: * * - The returned store starts in `status: 'idle'`. Call `connect()` to open the first stream. - * - `createDataPublisher` is invoked on every `connect()`. From `idle`, the store transitions - * through `loading`; from any other status, through `retrying` while preserving the last - * known value. + * - `createDataPublisher` is invoked on every `connect()`. The store transitions through + * `loading`, preserving the last known `data` and `error` (stale-while-revalidate). * - If `createDataPublisher` rejects, the store transitions to `status: 'error'` with the * rejection as the error. Call `connect()` to try again. * - `reset()` aborts the current connection and returns the store to `idle`, clearing `data` @@ -267,15 +265,9 @@ export function createReactiveStoreFromDataPublisherFactory({ setState({ data: currentState.data, error: callerSignal.reason, status: 'error' }); return; } - // Transition based on whether we have a prior outcome to preserve. If already `loading`, - // a connection is in flight — we've just aborted it and will rewire to a fresh factory - // invocation below, but there's no user-visible value yet, so stay in `loading` rather - // than detour through `retrying`. - if (currentState.status === 'idle') { - setState({ data: undefined, error: undefined, status: 'loading' }); - } else if (currentState.status !== 'loading') { - setState({ data: currentState.data, error: undefined, status: 'retrying' }); - } + // Transition to `loading`, preserving the last known `data` and `error` for SWR. If + // already `loading` with the same data/error, `setState` no-ops — no spurious notify. + setState({ data: currentState.data, error: currentState.error, status: 'loading' }); // Inner signal is passed to the data publisher (composed with caller signal if any). const innerController = new AbortController(); currentInnerController = innerController;