Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .changeset/odd-bobcats-see.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
'@solana/subscribable': major
'@solana/kit': major
---

Collapse `loading` and `retrying` into a single `loading` status on `ReactiveStreamStore`, mirroring the action store's `running` (which is itself the merged "first call vs subsequent call" state). `data` and `error` are preserved through `loading` for stale-while-revalidate — UI can render the prior outcome alongside an in-flight reconnect.

`ReactiveState<T>` drops the `retrying` variant. `loading` widens from `{ data: undefined, error: undefined }` to `{ data: T | undefined, error: unknown }`. Both `createReactiveStoreFromDataPublisherFactory` and `createReactiveStoreWithInitialValueAndSlotTracking` now transition every `connect()` through `loading` (preserving `currentState.data` and `currentState.error`); a subsequent `loaded` clears `error`, a subsequent `error` replaces it.

```ts
// Previously:
{ status: 'error', data: lastValue, error: caughtError }
// connect() →
{ status: 'retrying', data: lastValue, error: undefined } // error cleared, separate status

// Now:
{ status: 'error', data: lastValue, error: caughtError }
// connect() →
{ status: 'loading', data: lastValue, error: caughtError } // error preserved, unified status
```

Migration: replace `status === 'retrying'` checks with `status === 'loading' && data !== undefined` (or just `status === 'loading'` if you don't need to distinguish first-load vs reconnect — the SWR pattern lets you render whatever is in `data` regardless).
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => {
store.retry();
expect(rpcRequest.send).toHaveBeenCalledTimes(1);
});
it('transitions to `retrying` with preserved data and clears the error', async () => {
it('transitions back to `loading` with preserved data AND error (SWR)', async () => {
expect.assertions(1);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest, subscriptionInstances } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
Expand All @@ -679,13 +679,14 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => {
store.connect();
rpcInstances[0].resolve(rpcResponse(100, { count: 42 }));
await jest.runAllTimersAsync();
subscriptionInstances[0].error(new Error('stream died'));
const fail = new Error('stream died');
subscriptionInstances[0].error(fail);
await jest.runAllTimersAsync();
store.retry();
expect(store.getUnifiedState()).toStrictEqual({
data: { context: { slot: 100n }, value: 42 },
error: undefined,
status: 'retrying',
error: fail,
status: 'loading',
});
});
it('re-invokes the RPC request and subscription on retry', async () => {
Expand Down Expand Up @@ -750,7 +751,7 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => {
status: 'error',
});
});
it('notifies subscribers on the retrying transition', async () => {
it('notifies subscribers on the error → loading transition after retry', async () => {
expect.assertions(1);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ const IDLE_STATE: ReactiveState<never> = Object.freeze({
* - The returned store starts in `status: 'idle'`. Call
* {@link ReactiveStreamStore.connect | `connect()`} to fire the RPC request and open the
* subscription.
* - From `idle`, the store transitions through `loading` until the first response or notification
* arrives, then to `loaded` with a {@link SolanaRpcResponse} containing the value and the slot
* context at which it was observed.
* - The store transitions through `loading` until the first response or notification arrives,
* then to `loaded` with a {@link SolanaRpcResponse} containing the value and the slot context
* at which it was observed.
* - On error from either source, the store transitions to `status: 'error'` preserving the last
* known value. Only the first error per connection window is captured.
* - A subsequent `connect()` after `loaded` or `error` aborts the current connection, transitions
* through `status: 'retrying'` (preserving stale data), and re-fires the RPC request and
* subscription with a fresh inner abort signal.
* - A subsequent `connect()` aborts the current connection, transitions back to
* `status: 'loading'` (preserving the last known `data` and `error` for stale-while-revalidate),
* and re-fires the RPC request and subscription with a fresh inner abort signal.
* - {@link ReactiveStreamStore.reset | `reset()`} aborts the current connection and returns the
* store to `idle`, clearing `data` and `error`.
* - Attach a caller-provided cancellation source via
Expand Down Expand Up @@ -145,12 +145,9 @@ export function createReactiveStoreWithInitialValueAndSlotTracking<TRpcValue, TS
setState({ data: currentState.data, error: callerSignal.reason, status: 'error' });
return;
}
// Transition based on whether we have prior data/error to preserve.
if (currentState.status === 'idle') {
setState({ data: undefined, error: undefined, status: 'loading' });
} else {
setState({ data: currentState.data, error: undefined, status: 'retrying' });
}
// Transition to `loading`, preserving the last known `data` and `error` for SWR. If
// already `loading` with the same data/error, `setState` no-ops — no spurious notify.
setState({ data: currentState.data, error: currentState.error, status: 'loading' });

const innerController = new AbortController();
currentInnerController = innerController;
Expand Down Expand Up @@ -186,9 +183,9 @@ export function createReactiveStoreWithInitialValueAndSlotTracking<TRpcValue, TS
.send({ abortSignal: signal })
.then(({ context: { slot }, value }) => {
if (signal.aborted) return;
// `lastUpdateSlot` persists across retries so the store never regresses. If the
// retried RPC returns a slot older than one we've already seen, we wait for the
// subscription to deliver something newer before leaving `retrying`.
// `lastUpdateSlot` persists across reconnects so the store never regresses. If
// the re-fetched RPC returns a slot older than one we've already seen, we wait
// for the subscription to deliver something newer before leaving `loading`.
if (slot < lastUpdateSlot) return;
lastUpdateSlot = slot;
handleValue({ context: { slot }, value: rpcValueMapper(value) });
Expand Down
41 changes: 22 additions & 19 deletions packages/subscribable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,17 @@ This type represents a reactive store that holds the latest value published to a

```ts
type ReactiveState<T> =
| { data: undefined; error: undefined; status: 'loading' }
| { data: T | undefined; error: unknown; status: 'loading' }
| { data: T; error: undefined; status: 'loaded' }
| { data: T | undefined; error: unknown; status: 'error' }
| { data: T | undefined; error: undefined; status: 'retrying' }
| { data: undefined; error: undefined; status: 'idle' };
```

> Also exported as `ReactiveStore<T>` for backwards compatibility. That alias is deprecated and will be removed in a future major release.

The store starts in `status: 'idle'`. Call `connect()` to open the underlying stream; the store will transition through `loading` → `loaded` (or `error`). A subsequent `connect()` after `loaded` or `error` transitions through `retrying` while preserving the last known value. Call `reset()` to tear down the connection and return to `idle` without permanently killing the store.
The store starts in `status: 'idle'`. Call `connect()` to open the underlying stream; the store will transition through `loading` → `loaded` (or `error`). Every subsequent `connect()` transitions back through `loading`, preserving the last known `data` and `error` (stale-while-revalidate). A subsequent `loaded` clears the error; a subsequent `error` replaces it. Call `reset()` to tear down the connection and return to `idle` without permanently killing the store.

```ts
```tsx
const store: ReactiveStreamStore<AccountInfo> = /* ... */;

// React — snapshot identity is stable between updates, so it can be passed directly.
Expand All @@ -55,9 +54,14 @@ useEffect(() => {
store.connect();
return () => store.reset();
}, [store]);
if (state.status === 'error') return <ErrorMessage error={state.error} onRetry={store.connect} />;
if (state.status === 'loading' || state.status === 'idle') return <Spinner />;
return <View data={state.data} />;
// Stale-while-revalidate: keep showing the last value while a reconnect is in flight.
return (
<>
{state.data !== undefined && <View data={state.data} />}
{state.status === 'loading' && state.data === undefined && <Spinner />}
{state.status === 'error' && <RetryBanner error={state.error} onRetry={store.connect} />}
</>
);

// Vue
const snapshot = shallowRef(store.getUnifiedState());
Expand Down Expand Up @@ -195,33 +199,32 @@ Things to note:
- If there are messages in the queue and the abort signal fires, all queued messages will be vended to the iterator after which it will return.
- Any new iterators created after the first error is encountered will reject with that error when polled.

### `createReactiveStoreFromDataPublisherFactory({ abortSignal, createDataPublisher, dataChannelName, errorChannelName })`
### `createReactiveStoreFromDataPublisherFactory({ createDataPublisher, dataChannelName, errorChannelName })`

Returns a `ReactiveStreamStore` that wires itself to a fresh `DataPublisher` on every `connect()`. Accepts an async factory so the store can tear down a broken stream and open a new one without losing subscribers or the last known value.
Returns a `ReactiveStreamStore` that wires itself to a fresh `DataPublisher` on every `connect()`. Accepts an async factory so the store can tear down a broken stream and open a new one without losing subscribers or the last known value. The factory receives the per-connection `AbortSignal` so the underlying transport can stop when the connection window closes.

```ts
const store = createReactiveStoreFromDataPublisherFactory({
abortSignal: AbortSignal.timeout(60_000),
async createDataPublisher() {
return await openMyConnection();
},
dataChannelName: 'notification',
createDataPublisher: signal => getDataPublisherFromEventEmitter(new WebSocket(url, { signal })),
dataChannelName: 'message',
errorChannelName: 'error',
});
store.subscribe(() => {
const unsubscribe = store.subscribe(() => {
const snapshot = store.getUnifiedState();
if (snapshot.status === 'error') store.connect();
if (snapshot.status === 'error') console.error('Connection failed:', snapshot.error);
else if (snapshot.status === 'loaded') console.log('Latest:', snapshot.data);
});
store.connect();
// Fresh 30-second clock per connection attempt:
store.withSignal(AbortSignal.timeout(30_000)).connect();
```

Things to note:

- The returned store starts in `status: 'idle'`. Call `connect()` to open the first stream.
- `createDataPublisher` is invoked on every `connect()`. From `idle`, the store transitions through `loading`; from any other status, through `retrying` while preserving the last known value.
- `createDataPublisher` is invoked on every `connect()`. The store transitions through `loading`, preserving the last known `data` and `error` (stale-while-revalidate).
- If `createDataPublisher` rejects, the store transitions to `status: 'error'` with the rejection as the error. Call `connect()` to try again.
- `reset()` aborts the current connection and returns the store to `idle`, clearing `data` and `error`. A follow-up `connect()` opens a fresh stream.
- Triggering the caller's `abortSignal` disconnects the store permanently; subsequent `connect()` calls are no-ops.
- Attach a caller-provided cancellation source via `store.withSignal(signal).connect()` — the signal is composed with the per-connection controller via `AbortSignal.any`. Aborting the caller's signal transitions the store to `error` with that abort reason.

### `demultiplexDataPublisher(publisher, sourceChannelName, messageTransformer)`

Expand Down
24 changes: 13 additions & 11 deletions packages/subscribable/src/__tests__/reactive-stream-store-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ describe('createReactiveStoreFromDataPublisherFactory', () => {
status: 'error',
});
});
it('from `error`, transitions through `retrying` preserving stale data', async () => {
it('from `error`, transitions back through `loading` preserving stale data AND error (SWR)', async () => {
expect.assertions(1);
const { mockRequest, publishers } = createFactory();
const store = createReactiveStoreFromDataPublisherFactory({
Expand All @@ -133,15 +133,16 @@ describe('createReactiveStoreFromDataPublisherFactory', () => {
store.connect();
await jest.runAllTimersAsync();
publishers[0].publish('data', { value: 42 });
publishers[0].publish('error', new Error('fail'));
const fail = new Error('fail');
publishers[0].publish('error', fail);
store.connect();
expect(store.getUnifiedState()).toStrictEqual({
data: { value: 42 },
error: undefined,
status: 'retrying',
error: fail,
status: 'loading',
});
});
it('from `loaded`, transitions through `retrying` preserving the last value', async () => {
it('from `loaded`, transitions back through `loading` preserving the last value', async () => {
expect.assertions(1);
const { mockRequest, publishers } = createFactory();
const store = createReactiveStoreFromDataPublisherFactory({
Expand All @@ -156,7 +157,7 @@ describe('createReactiveStoreFromDataPublisherFactory', () => {
expect(store.getUnifiedState()).toStrictEqual({
data: { value: 42 },
error: undefined,
status: 'retrying',
status: 'loading',
});
});
it('invokes the factory again on each connect()', async () => {
Expand Down Expand Up @@ -193,7 +194,7 @@ describe('createReactiveStoreFromDataPublisherFactory', () => {
status: 'loaded',
});
});
it('notifies subscribers on the retrying transition', async () => {
it('notifies subscribers on the loaded → loading transition after reconnect', async () => {
expect.assertions(1);
const { mockRequest, publishers } = createFactory();
const store = createReactiveStoreFromDataPublisherFactory({
Expand Down Expand Up @@ -402,7 +403,7 @@ describe('createReactiveStoreFromDataPublisherFactory', () => {
expect(mockRequest).toHaveBeenCalledTimes(callsBefore);
expect(store.getUnifiedState().status).toBe('loading');
});
it('transitions to `retrying` from `error`', async () => {
it('transitions back to `loading` from `error`, preserving stale data and error (SWR)', async () => {
expect.assertions(1);
const { mockRequest, publishers } = createFactory();
const store = createReactiveStoreFromDataPublisherFactory({
Expand All @@ -413,12 +414,13 @@ describe('createReactiveStoreFromDataPublisherFactory', () => {
store.connect();
await jest.runAllTimersAsync();
publishers[0].publish('data', { value: 42 });
publishers[0].publish('error', new Error('fail'));
const fail = new Error('fail');
publishers[0].publish('error', fail);
store.retry();
expect(store.getUnifiedState()).toStrictEqual({
data: { value: 42 },
error: undefined,
status: 'retrying',
error: fail,
status: 'loading',
});
});
});
Expand Down
Loading
Loading