diff --git a/.changeset/clever-spies-shout.md b/.changeset/clever-spies-shout.md new file mode 100644 index 000000000..7ac70f982 --- /dev/null +++ b/.changeset/clever-spies-shout.md @@ -0,0 +1,11 @@ +--- +'@solana/subscribable': minor +'@solana/errors': minor +'@solana/kit': minor +--- + +Add `retry()` and `getUnifiedState()` to `ReactiveStore`. The new `getUnifiedState()` returns a discriminated `{ data, error, status }` snapshot with stable identity, so stores can be passed directly to `useSyncExternalStore` without an intermediate wrapper. `getState()` and `getError()` remain on the type but are now `@deprecated` in favour of the unified snapshot. + +A new `createReactiveStoreFromDataPublisherFactory` function is also introduced. It accepts a `createDataPublisher: () => Promise` factory rather than a ready-made publisher, which lets the store reconnect via `retry()` after an error. The existing `createReactiveStoreFromDataPublisher` is now `@deprecated`; calling `retry()` on a store it produced throws a new `SolanaError` with code `SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED`. + +`createReactiveStoreWithInitialValueAndSlotTracking` (from `@solana/kit`) now supports `retry()`, which re-sends the RPC request and re-subscribes to the subscription with a fresh abort signal while preserving the last known slot and value. diff --git a/packages/errors/src/codes.ts b/packages/errors/src/codes.ts index 8e4700642..cbb5c7610 100644 --- a/packages/errors/src/codes.ts +++ b/packages/errors/src/codes.ts @@ -374,6 +374,10 @@ export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFF export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED = 8190003; export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT = 8190004; +// Subscribable-related errors. +// Reserve error codes in the range [8195000-8195999]. +export const SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED = 8195000; + // Program-client-related errors. // Reserve error codes in the range [8500000-8500999]. export const SOLANA_ERROR__PROGRAM_CLIENTS__INSUFFICIENT_ACCOUNT_METAS = 8500000; @@ -641,6 +645,7 @@ export type SolanaErrorCode = | typeof SOLANA_ERROR__SIGNER__TRANSACTION_SENDING_SIGNER_MISSING | typeof SOLANA_ERROR__SIGNER__WALLET_ACCOUNT_CANNOT_SIGN_TRANSACTION | typeof SOLANA_ERROR__SIGNER__WALLET_MULTISIGN_UNIMPLEMENTED + | typeof SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED | typeof SOLANA_ERROR__SUBTLE_CRYPTO__CANNOT_EXPORT_NON_EXTRACTABLE_KEY | typeof SOLANA_ERROR__SUBTLE_CRYPTO__DIGEST_UNIMPLEMENTED | typeof SOLANA_ERROR__SUBTLE_CRYPTO__DISALLOWED_IN_INSECURE_CONTEXT diff --git a/packages/errors/src/messages.ts b/packages/errors/src/messages.ts index 89cb5bcf3..03d84d199 100644 --- a/packages/errors/src/messages.ts +++ b/packages/errors/src/messages.ts @@ -229,6 +229,7 @@ import { SOLANA_ERROR__SIGNER__TRANSACTION_SENDING_SIGNER_MISSING, SOLANA_ERROR__SIGNER__WALLET_ACCOUNT_CANNOT_SIGN_TRANSACTION, SOLANA_ERROR__SIGNER__WALLET_MULTISIGN_UNIMPLEMENTED, + SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED, SOLANA_ERROR__SUBTLE_CRYPTO__CANNOT_EXPORT_NON_EXTRACTABLE_KEY, SOLANA_ERROR__SUBTLE_CRYPTO__DIGEST_UNIMPLEMENTED, SOLANA_ERROR__SUBTLE_CRYPTO__DISALLOWED_IN_INSECURE_CONTEXT, @@ -711,6 +712,9 @@ export const SolanaErrorMessages: Readonly<{ 'The account supports the following features: $supportedFeatures.', [SOLANA_ERROR__SIGNER__WALLET_MULTISIGN_UNIMPLEMENTED]: 'Wallet account signers do not support signing multiple messages/transactions in a single operation', + [SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED]: + 'This `ReactiveStore` does not support retry. Use `createReactiveStoreFromDataPublisherFactory` ' + + 'to construct a retryable store.', [SOLANA_ERROR__SUBTLE_CRYPTO__CANNOT_EXPORT_NON_EXTRACTABLE_KEY]: 'Cannot export a non-extractable key.', [SOLANA_ERROR__SUBTLE_CRYPTO__DIGEST_UNIMPLEMENTED]: 'No digest implementation could be found.', [SOLANA_ERROR__SUBTLE_CRYPTO__DISALLOWED_IN_INSECURE_CONTEXT]: diff --git a/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts b/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts index f5a5bbac3..8d8d2679e 100644 --- a/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts +++ b/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts @@ -530,4 +530,292 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => { expect(subscriber).not.toHaveBeenCalled(); }); }); + + describe('getUnifiedState()', () => { + it('starts in `loading` status', () => { + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error: undefined, + status: 'loading', + }); + }); + it('transitions to `loaded` after the RPC response arrives', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + resolve(rpcResponse(100, { count: 42 })); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState()).toStrictEqual({ + data: { context: { slot: 100n }, value: 42 }, + error: undefined, + status: 'loaded', + }); + }); + it('transitions to `error` on RPC failure, preserving nothing (no prior data)', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, reject } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const failure = new Error('rpc failed'); + reject(failure); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error: failure, + status: 'error', + }); + }); + it('transitions to `error` on subscription failure, preserving the RPC value', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, error } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + resolve(rpcResponse(100, { count: 42 })); + await jest.runAllTimersAsync(); + const failure = new Error('subscription failed'); + error(failure); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState()).toStrictEqual({ + data: { context: { slot: 100n }, value: 42 }, + error: failure, + status: 'error', + }); + }); + }); + + describe('retry()', () => { + // Helper: returns mocks where each invocation of rpcRequest.send() / + // rpcSubscriptionRequest.subscribe() yields a fresh controllable instance — needed to + // exercise retry, which re-invokes both. + function createRetryableMocks() { + const rpcInstances: { + reject(error: unknown): void; + resolve(response: SolanaRpcResponse): void; + }[] = []; + const subscriptionInstances: { + error(err: unknown): void; + pushNotification(notification: SolanaRpcResponse): void; + }[] = []; + const rpcRequest: PendingRpcRequest> = { + send: jest.fn().mockImplementation(() => { + const { promise, resolve, reject } = Promise.withResolvers>(); + rpcInstances.push({ reject, resolve }); + return promise; + }), + }; + const rpcSubscriptionRequest: PendingRpcSubscriptionsRequest> = { + reactive: jest.fn().mockRejectedValue(new Error('not implemented')), + subscribe: jest.fn().mockImplementation(() => { + const instance = createMockSubscriptionRequest(); + subscriptionInstances.push({ + error: instance.error, + pushNotification: instance.pushNotification, + }); + return (instance.mockRequest.subscribe as jest.Mock)(); + }), + }; + return { rpcInstances, rpcRequest, rpcSubscriptionRequest, subscriptionInstances }; + } + + it('is a no-op when the store is not in error state', async () => { + expect.assertions(1); + const { rpcRequest, rpcSubscriptionRequest } = createRetryableMocks(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + await jest.runAllTimersAsync(); + store.retry(); + expect(rpcRequest.send).toHaveBeenCalledTimes(1); + }); + it('transitions to `retrying` with preserved data and clears the error', async () => { + expect.assertions(1); + const { rpcInstances, rpcRequest, rpcSubscriptionRequest, subscriptionInstances } = createRetryableMocks(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + rpcInstances[0].resolve(rpcResponse(100, { count: 42 })); + await jest.runAllTimersAsync(); + subscriptionInstances[0].error(new Error('stream died')); + await jest.runAllTimersAsync(); + store.retry(); + expect(store.getUnifiedState()).toStrictEqual({ + data: { context: { slot: 100n }, value: 42 }, + error: undefined, + status: 'retrying', + }); + }); + it('re-invokes the RPC request and subscription on retry', async () => { + expect.assertions(2); + const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + rpcInstances[0].reject(new Error('boom')); + await jest.runAllTimersAsync(); + store.retry(); + await jest.runAllTimersAsync(); + expect(rpcRequest.send).toHaveBeenCalledTimes(2); + expect(rpcSubscriptionRequest.subscribe).toHaveBeenCalledTimes(2); + }); + it('recovers to `loaded` when the retried RPC succeeds', async () => { + expect.assertions(1); + const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + rpcInstances[0].reject(new Error('first failure')); + await jest.runAllTimersAsync(); + store.retry(); + await jest.runAllTimersAsync(); + rpcInstances[1].resolve(rpcResponse(200, { count: 99 })); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState()).toStrictEqual({ + data: { context: { slot: 200n }, value: 99 }, + error: undefined, + status: 'loaded', + }); + }); + it('transitions to `error` again when the retried RPC also fails', async () => { + expect.assertions(1); + const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + rpcInstances[0].reject(new Error('first')); + await jest.runAllTimersAsync(); + store.retry(); + await jest.runAllTimersAsync(); + const secondFailure = new Error('second'); + rpcInstances[1].reject(secondFailure); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error: secondFailure, + status: 'error', + }); + }); + it('notifies subscribers on the retrying transition', async () => { + expect.assertions(1); + const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + rpcInstances[0].reject(new Error('fail')); + await jest.runAllTimersAsync(); + const subscriber = jest.fn(); + store.subscribe(subscriber); + store.retry(); + expect(subscriber).toHaveBeenCalledTimes(1); + }); + it('does not re-invoke the RPC request after the caller has aborted', async () => { + expect.assertions(1); + const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + rpcInstances[0].reject(new Error('fail')); + await jest.runAllTimersAsync(); + abortController.abort(); + store.retry(); + await jest.runAllTimersAsync(); + expect(rpcRequest.send).toHaveBeenCalledTimes(1); + }); + it('leaves the store in `error` state after the caller has aborted', async () => { + expect.assertions(1); + const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const failure = new Error('fail'); + rpcInstances[0].reject(failure); + await jest.runAllTimersAsync(); + abortController.abort(); + store.retry(); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error: failure, + status: 'error', + }); + }); + it('does not notify subscribers after the caller has aborted', async () => { + expect.assertions(1); + const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + rpcInstances[0].reject(new Error('fail')); + await jest.runAllTimersAsync(); + abortController.abort(); + const subscriber = jest.fn(); + store.subscribe(subscriber); + store.retry(); + await jest.runAllTimersAsync(); + expect(subscriber).not.toHaveBeenCalled(); + }); + }); }); diff --git a/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts b/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts index 214ae9ba3..c49a40c5b 100644 --- a/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts +++ b/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts @@ -1,7 +1,7 @@ import type { PendingRpcRequest } from '@solana/rpc'; import type { PendingRpcSubscriptionsRequest } from '@solana/rpc-subscriptions'; import type { SolanaRpcResponse } from '@solana/rpc-types'; -import type { ReactiveStore } from '@solana/subscribable'; +import type { ReactiveState, ReactiveStore } from '@solana/subscribable'; type CreateReactiveStoreWithInitialValueAndSlotTrackingConfig = Readonly<{ /** @@ -31,6 +31,12 @@ type CreateReactiveStoreWithInitialValueAndSlotTrackingConfig TItem; }>; +const LOADING_STATE: ReactiveState = Object.freeze({ + data: undefined, + error: undefined, + status: 'loading', +}); + /** * Creates a {@link ReactiveStore} that combines an initial RPC fetch with an ongoing subscription * to keep its state up to date. @@ -42,14 +48,16 @@ type CreateReactiveStoreWithInitialValueAndSlotTrackingConfig { - * const error = balanceStore.getError(); - * if (error) console.error('Error:', error); - * else { - * const state = balanceStore.getState(); - * if (state) console.log(`Balance at slot ${state.context.slot}:`, state.value); + * const state = balanceStore.getUnifiedState(); + * if (state.status === 'error') { + * console.error('Error:', state.error); + * balanceStore.retry(); + * } else if (state.status === 'loaded') { + * console.log(`Balance at slot ${state.data.context.slot}:`, state.data.value); * } * }); * ``` @@ -95,66 +104,85 @@ export function createReactiveStoreWithInitialValueAndSlotTracking): ReactiveStore< SolanaRpcResponse > { - let currentState: SolanaRpcResponse | undefined; - let currentError: unknown; + let currentState: ReactiveState> = LOADING_STATE; let lastUpdateSlot = -1n; const subscribers = new Set<() => void>(); - const abortController = new AbortController(); - abortSignal.addEventListener('abort', () => abortController.abort(abortSignal.reason)); - const signal = abortController.signal; + const outerController = new AbortController(); + abortSignal.addEventListener('abort', () => outerController.abort(abortSignal.reason)); - function notifySubscribers() { + function notify() { subscribers.forEach(cb => cb()); } - function handleError(err: unknown) { - // Ignore if the signal has already been aborted - if (signal.aborted) return; - // Only capture the first error - if (currentError !== undefined) return; - currentError = err; - abortController.abort(err); - notifySubscribers(); - } + function connect() { + if (outerController.signal.aborted) return; + const innerController = new AbortController(); + const forwardAbort = () => innerController.abort(outerController.signal.reason); + outerController.signal.addEventListener('abort', forwardAbort, { signal: innerController.signal }); + const innerSignal = innerController.signal; - rpcRequest - .send({ abortSignal: signal }) - .then(({ context: { slot }, value }) => { - if (signal.aborted) return; - if (slot < lastUpdateSlot) return; - lastUpdateSlot = slot; - currentState = { context: { slot }, value: rpcValueMapper(value) }; - notifySubscribers(); - }) - .catch(handleError); + function handleError(err: unknown) { + if (innerSignal.aborted) return; + if (currentState.status === 'error') return; + currentState = { data: currentState.data, error: err, status: 'error' }; + innerController.abort(err); + notify(); + } - rpcSubscriptionRequest - .subscribe({ abortSignal: signal }) - .then(async notifications => { - for await (const { - context: { slot }, - value, - } of notifications) { - if (signal.aborted) return; - if (slot < lastUpdateSlot) continue; + function handleValue(value: SolanaRpcResponse) { + currentState = { data: value, error: undefined, status: 'loaded' }; + notify(); + } + + rpcRequest + .send({ abortSignal: innerSignal }) + .then(({ context: { slot }, value }) => { + if (innerSignal.aborted) return; + // `lastUpdateSlot` persists across retries so the store never regresses. If the + // retried RPC returns a slot older than one we've already seen, we wait for the + // subscription to deliver something newer before leaving `retrying`. + if (slot < lastUpdateSlot) return; lastUpdateSlot = slot; - currentState = { + handleValue({ context: { slot }, value: rpcValueMapper(value) }); + }) + .catch(handleError); + + rpcSubscriptionRequest + .subscribe({ abortSignal: innerSignal }) + .then(async notifications => { + for await (const { context: { slot }, - value: rpcSubscriptionValueMapper(value), - }; - notifySubscribers(); - } - }) - .catch(handleError); + value, + } of notifications) { + if (innerSignal.aborted) return; + if (slot < lastUpdateSlot) continue; + lastUpdateSlot = slot; + handleValue({ context: { slot }, value: rpcSubscriptionValueMapper(value) }); + } + }) + .catch(handleError); + } + + connect(); return { getError(): unknown { - return currentError; + return currentState.error; }, getState(): SolanaRpcResponse | undefined { + return currentState.data; + }, + getUnifiedState(): ReactiveState> { return currentState; }, + retry(): void { + if (outerController.signal.aborted) return; + if (currentState.status !== 'error') return; + currentState = { data: currentState.data, error: undefined, status: 'retrying' }; + notify(); + connect(); + }, subscribe(callback: () => void): () => void { subscribers.add(callback); return () => { diff --git a/packages/subscribable/README.md b/packages/subscribable/README.md index ff2d9c0cd..202dc371d 100644 --- a/packages/subscribable/README.md +++ b/packages/subscribable/README.md @@ -29,26 +29,38 @@ dataPublisher.on('error', e => { ### `ReactiveStore` -This type represents a reactive store that holds the latest value published to a data channel. It exposes a `{ getState, getError, subscribe }` contract compatible with `useSyncExternalStore`, Svelte stores, and other reactive primitives. +This type represents a reactive store that holds the latest value published to a data channel. It exposes a `{ getUnifiedState, retry, subscribe }` contract compatible with `useSyncExternalStore`, Svelte stores, and other reactive primitives. + +`getUnifiedState()` returns a discriminated snapshot of the store's lifecycle: + +```ts +type ReactiveState = + | { data: undefined; error: undefined; status: 'loading' } + | { data: T; error: undefined; status: 'loaded' } + | { data: T | undefined; error: unknown; status: 'error' } + | { data: T | undefined; error: undefined; status: 'retrying' }; +``` ```ts const store: ReactiveStore = /* ... */; -// React -const state = useSyncExternalStore(store.subscribe, () => { - if (store.getError()) throw store.getError(); - return store.getState(); -}); +// React — snapshot identity is stable between updates, so it can be passed directly. +const state = useSyncExternalStore(store.subscribe, store.getUnifiedState); +if (state.status === 'error') return ; +if (state.status === 'loading') return ; +return ; // Vue -const data = shallowRef(store.getState()); -const error = shallowRef(store.getError()); +const snapshot = shallowRef(store.getUnifiedState()); store.subscribe(() => { - data.value = store.getState(); - error.value = store.getError(); + snapshot.value = store.getUnifiedState(); }); ``` +`retry()` re-opens the stream after an error. When the underlying store supports restart (see [`createReactiveStoreFromDataPublisherFactory`](#createreactivestorefromdatapublisherfactory-abortsignal-createdatapublisher-datachannelname-errorchannelname-)), the store transitions to `status: 'retrying'` and reconnects. Stores that cannot be restarted throw a `SolanaError` with code `SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED` instead. + +The individual `getState()` and `getError()` getters on `ReactiveStore` are `@deprecated` — prefer `getUnifiedState()`, which exposes the same information with a stable snapshot identity and `status` discriminator. + ### `TypedEventEmitter` This type allows you to type `addEventListener` and `removeEventListener` so that the call signature of the listener matches the event type given. @@ -105,7 +117,9 @@ Things to note: ### `createReactiveStoreFromDataPublisher({ abortSignal, dataChannelName, dataPublisher, errorChannelName })` -Returns a `ReactiveStore` given a data publisher. The store holds the most recent message published to `dataChannelName` and notifies subscribers on each update. When a message is published to `errorChannelName`, the error is captured in `getError()` and subscribers are notified. Triggering the abort signal disconnects the store from the data publisher. +> **Deprecated.** Prefer [`createReactiveStoreFromDataPublisherFactory`](#createreactivestorefromdatapublisherfactory-abortsignal-createdatapublisher-datachannelname-errorchannelname-) — it supports `retry()`. Because this function accepts a ready-made `DataPublisher` rather than a factory, it cannot restart the underlying source, and calling `retry()` on the returned store throws a `SolanaError` with code `SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED`. + +Returns a `ReactiveStore` given a data publisher. The store holds the most recent message published to `dataChannelName` and notifies subscribers on each update. When a message is published to `errorChannelName`, the store transitions to `status: 'error'` preserving the last known value. Triggering the abort signal disconnects the store from the data publisher. ```ts const store = createReactiveStoreFromDataPublisher({ @@ -115,16 +129,42 @@ const store = createReactiveStoreFromDataPublisher({ errorChannelName: 'error', }); const unsubscribe = store.subscribe(() => { - console.log('State updated:', store.getState()); + console.log('State updated:', store.getUnifiedState()); }); ``` Things to note: -- `getState()` returns `undefined` until the first notification arrives. -- On error, `getState()` continues to return the last known value and `getError()` returns the error. Only the first error is captured. +- `getUnifiedState()` starts in `status: 'loading'` until the first notification arrives. +- On error, `status` becomes `'error'` with the last known value preserved on `data`. Only the first error is captured. - The function returned by `subscribe` is idempotent — calling it multiple times is safe. +### `createReactiveStoreFromDataPublisherFactory({ abortSignal, createDataPublisher, dataChannelName, errorChannelName })` + +Returns a `ReactiveStore` that wires itself to a fresh `DataPublisher` on construction and on every `retry()`. Unlike `createReactiveStoreFromDataPublisher`, this variant accepts an async factory so the store can tear down a broken stream and open a new one without losing subscribers or the last known value. + +```ts +const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: AbortSignal.timeout(60_000), + async createDataPublisher() { + return await openMyConnection(); + }, + dataChannelName: 'notification', + errorChannelName: 'error', +}); +store.subscribe(() => { + const snapshot = store.getUnifiedState(); + if (snapshot.status === 'error') store.retry(); +}); +``` + +Things to note: + +- `createDataPublisher` is called once on construction and again on every `retry()`. +- `retry()` is a no-op unless the store is in `status: 'error'`; otherwise the store transitions to `status: 'retrying'` (preserving stale data) and reconnects. +- If `createDataPublisher` rejects, the store transitions to `status: 'error'` with the rejection as the error. Call `retry()` to try again. +- Triggering the caller's `abortSignal` disconnects the store permanently; subsequent `retry()` calls are no-ops. + ### `demultiplexDataPublisher(publisher, sourceChannelName, messageTransformer)` Given a channel that carries messages for multiple subscribers on a single channel name, this function returns a new `DataPublisher` that splits them into multiple channel names. diff --git a/packages/subscribable/src/__tests__/reactive-store-test.ts b/packages/subscribable/src/__tests__/reactive-store-test.ts index cb406e93a..bec316364 100644 --- a/packages/subscribable/src/__tests__/reactive-store-test.ts +++ b/packages/subscribable/src/__tests__/reactive-store-test.ts @@ -1,5 +1,9 @@ +import { SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED, SolanaError } from '@solana/errors'; + import { DataPublisher } from '../data-publisher'; -import { createReactiveStoreFromDataPublisher } from '../reactive-store'; +import { createReactiveStoreFromDataPublisher, createReactiveStoreFromDataPublisherFactory } from '../reactive-store'; + +jest.useFakeTimers(); describe('createReactiveStoreFromDataPublisher', () => { let mockDataPublisher: DataPublisher; @@ -116,6 +120,79 @@ describe('createReactiveStoreFromDataPublisher', () => { }); }); + describe('getUnifiedState()', () => { + it('starts in `loading` status with no data or error', () => { + const store = createReactiveStoreFromDataPublisher({ + abortSignal: new AbortController().signal, + dataChannelName: 'data', + dataPublisher: mockDataPublisher, + errorChannelName: 'error', + }); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error: undefined, + status: 'loading', + }); + }); + it('transitions to `loaded` with the value when a notification arrives', () => { + const store = createReactiveStoreFromDataPublisher({ + abortSignal: new AbortController().signal, + dataChannelName: 'data', + dataPublisher: mockDataPublisher, + errorChannelName: 'error', + }); + publish('data', { value: 42 }); + expect(store.getUnifiedState()).toStrictEqual({ + data: { value: 42 }, + error: undefined, + status: 'loaded', + }); + }); + it('transitions to `error` preserving the last known value', () => { + const store = createReactiveStoreFromDataPublisher({ + abortSignal: new AbortController().signal, + dataChannelName: 'data', + dataPublisher: mockDataPublisher, + errorChannelName: 'error', + }); + const error = new Error('o no'); + publish('data', { value: 42 }); + publish('error', error); + expect(store.getUnifiedState()).toStrictEqual({ + data: { value: 42 }, + error, + status: 'error', + }); + }); + it('transitions to `error` with undefined data when no value arrived first', () => { + const store = createReactiveStoreFromDataPublisher({ + abortSignal: new AbortController().signal, + dataChannelName: 'data', + dataPublisher: mockDataPublisher, + errorChannelName: 'error', + }); + const error = new Error('o no'); + publish('error', error); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error, + status: 'error', + }); + }); + }); + + describe('retry()', () => { + it('throws a SolanaError because a raw DataPublisher cannot be restarted', () => { + const store = createReactiveStoreFromDataPublisher({ + abortSignal: new AbortController().signal, + dataChannelName: 'data', + dataPublisher: mockDataPublisher, + errorChannelName: 'error', + }); + expect(() => store.retry()).toThrow(new SolanaError(SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED)); + }); + }); + describe('subscribe()', () => { it('calls the subscriber when a notification arrives', () => { const store = createReactiveStoreFromDataPublisher({ @@ -267,3 +344,332 @@ describe('createReactiveStoreFromDataPublisher', () => { }); }); }); + +describe('createReactiveStoreFromDataPublisherFactory', () => { + function createMockDataPublisher(): { + mockOn: jest.Mock; + publish(channel: string, payload: unknown): void; + publisher: DataPublisher; + } { + const mockOn = jest.fn().mockReturnValue(function unsubscribe() {}); + return { + mockOn, + publish(channel: string, payload: unknown) { + mockOn.mock.calls + .filter( + ([actualChannel, , options]: [string, unknown, { signal?: AbortSignal } | undefined]) => + actualChannel === channel && !options?.signal?.aborted, + ) + .forEach(([_, listener]) => listener(payload)); + }, + publisher: { on: mockOn }, + }; + } + + // Helper: returns a factory that hands out a fresh mock DataPublisher per invocation, plus + // a parallel array of those publishers for test assertions. + function createFactory() { + const publishers: ReturnType[] = []; + const mockRequest = jest.fn().mockImplementation(() => { + const p = createMockDataPublisher(); + publishers.push(p); + return Promise.resolve(p.publisher); + }); + return { mockRequest, publishers }; + } + + describe('initial connection', () => { + it('starts in `loading` before the factory resolves', () => { + const { mockRequest } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error: undefined, + status: 'loading', + }); + }); + it('transitions to `loaded` once the factory resolves and data arrives', async () => { + expect.assertions(1); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + publishers[0].publish('data', { value: 42 }); + expect(store.getUnifiedState()).toStrictEqual({ + data: { value: 42 }, + error: undefined, + status: 'loaded', + }); + }); + it('transitions to `error` when the factory rejects', async () => { + expect.assertions(1); + const failure = new Error('connection refused'); + const mockRequest = jest.fn().mockRejectedValue(failure); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error: failure, + status: 'error', + }); + }); + it('transitions to `error` on an error channel message, preserving the last known value', async () => { + expect.assertions(1); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + publishers[0].publish('data', { value: 42 }); + const failure = new Error('stream died'); + publishers[0].publish('error', failure); + expect(store.getUnifiedState()).toStrictEqual({ + data: { value: 42 }, + error: failure, + status: 'error', + }); + }); + }); + + describe('retry()', () => { + it('is a no-op when the store is not in `error` state', async () => { + expect.assertions(2); + const { mockRequest } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + const callsBefore = mockRequest.mock.calls.length; + store.retry(); + expect(mockRequest).toHaveBeenCalledTimes(callsBefore); + expect(store.getUnifiedState().status).toBe('loading'); + }); + it('transitions to `retrying` and preserves stale data', async () => { + expect.assertions(1); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + publishers[0].publish('data', { value: 42 }); + publishers[0].publish('error', new Error('fail')); + store.retry(); + expect(store.getUnifiedState()).toStrictEqual({ + data: { value: 42 }, + error: undefined, + status: 'retrying', + }); + }); + it('invokes the factory a second time', async () => { + expect.assertions(1); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + publishers[0].publish('error', new Error('fail')); + store.retry(); + expect(mockRequest).toHaveBeenCalledTimes(2); + }); + it('transitions back to `loaded` when the retried stream publishes a value', async () => { + expect.assertions(1); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + publishers[0].publish('error', new Error('fail')); + store.retry(); + await jest.runAllTimersAsync(); + publishers[1].publish('data', { value: 'recovered' }); + expect(store.getUnifiedState()).toStrictEqual({ + data: { value: 'recovered' }, + error: undefined, + status: 'loaded', + }); + }); + it('notifies subscribers on the retrying transition', async () => { + expect.assertions(1); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + publishers[0].publish('error', new Error('fail')); + const subscriber = jest.fn(); + store.subscribe(subscriber); + store.retry(); + expect(subscriber).toHaveBeenCalledTimes(1); + }); + it('can recover from a factory-rejection error by retrying', async () => { + expect.assertions(2); + const publisher = createMockDataPublisher(); + const mockRequest = jest + .fn() + .mockRejectedValueOnce(new Error('transient')) + .mockResolvedValue(publisher.publisher); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState().status).toBe('error'); + store.retry(); + await jest.runAllTimersAsync(); + publisher.publish('data', { value: 99 }); + expect(store.getUnifiedState()).toStrictEqual({ + data: { value: 99 }, + error: undefined, + status: 'loaded', + }); + }); + it('transitions back to `error` when the retried factory rejects again', async () => { + expect.assertions(1); + const firstFailure = new Error('first'); + const secondFailure = new Error('second'); + const mockRequest = jest.fn().mockRejectedValueOnce(firstFailure).mockRejectedValue(secondFailure); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: new AbortController().signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + store.retry(); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error: secondFailure, + status: 'error', + }); + }); + }); + + describe('abort signal', () => { + it('prevents further state updates once the caller aborts', async () => { + expect.assertions(1); + const abortController = new AbortController(); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: abortController.signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + abortController.abort(); + publishers[0].publish('data', { value: 'late' }); + expect(store.getUnifiedState().status).toBe('loading'); + }); + it('aborts the signal forwarded to the inner DataPublisher listeners', async () => { + expect.assertions(2); + const abortController = new AbortController(); + const { mockRequest, publishers } = createFactory(); + createReactiveStoreFromDataPublisherFactory({ + abortSignal: abortController.signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + const dataSignal = publishers[0].mockOn.mock.calls.find(([channel]: [string]) => channel === 'data')![2] + .signal; + expect(dataSignal.aborted).toBe(false); + abortController.abort(); + expect(dataSignal.aborted).toBe(true); + }); + it('retry() after abort does not re-invoke the factory', async () => { + expect.assertions(1); + const abortController = new AbortController(); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: abortController.signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + publishers[0].publish('error', new Error('fail')); + abortController.abort(); + const callsBefore = mockRequest.mock.calls.length; + store.retry(); + await jest.runAllTimersAsync(); + expect(mockRequest).toHaveBeenCalledTimes(callsBefore); + }); + it('retry() after abort leaves the store in `error` state', async () => { + expect.assertions(1); + const abortController = new AbortController(); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: abortController.signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + const failure = new Error('fail'); + publishers[0].publish('error', failure); + abortController.abort(); + store.retry(); + await jest.runAllTimersAsync(); + expect(store.getUnifiedState()).toStrictEqual({ + data: undefined, + error: failure, + status: 'error', + }); + }); + it('retry() after abort does not notify subscribers', async () => { + expect.assertions(1); + const abortController = new AbortController(); + const { mockRequest, publishers } = createFactory(); + const store = createReactiveStoreFromDataPublisherFactory({ + abortSignal: abortController.signal, + createDataPublisher: mockRequest, + dataChannelName: 'data', + errorChannelName: 'error', + }); + await jest.runAllTimersAsync(); + publishers[0].publish('error', new Error('fail')); + abortController.abort(); + const subscriber = jest.fn(); + store.subscribe(subscriber); + store.retry(); + await jest.runAllTimersAsync(); + expect(subscriber).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/subscribable/src/reactive-store.ts b/packages/subscribable/src/reactive-store.ts index 992db73f3..1f6e799ab 100644 --- a/packages/subscribable/src/reactive-store.ts +++ b/packages/subscribable/src/reactive-store.ts @@ -1,3 +1,4 @@ +import { SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED, SolanaError } from '@solana/errors'; import { AbortController } from '@solana/event-target-impl'; import { DataPublisher } from './data-publisher'; @@ -24,41 +25,99 @@ type Config = Readonly<{ errorChannelName: string; }>; +type FactoryConfig = Readonly<{ + /** + * Triggering this abort signal will cause the store to stop updating and will disconnect it from + * any active {@link DataPublisher}. Subsequent calls to {@link ReactiveStore.retry | `retry()`} + * are no-ops once this signal has fired. + */ + abortSignal: AbortSignal; + /** + * An async factory that produces a fresh {@link DataPublisher} each time it is invoked. Called + * once on construction and again on every {@link ReactiveStore.retry | `retry()`}. Rejections + * surface as a store error. + */ + createDataPublisher: () => Promise; + /** + * Messages from this channel of the produced `DataPublisher` will be used to update the store's + * state. + */ + dataChannelName: string; + /** + * Messages from this channel of the produced `DataPublisher` will transition the store to an + * error state, preserving the last known value. + */ + errorChannelName: string; +}>; + +/** + * The lifecycle state of a {@link ReactiveStore} as a single snapshot. + * + * - `loading`: the store is waiting for its first value. `data` is `undefined`. + * - `loaded`: a value has been received and no error is active. `data` is defined. + * - `error`: the stream failed. `data` is the last known value (or `undefined` if no value ever + * arrived), and `error` holds the failure. + * - `retrying`: a {@link ReactiveStore.retry | `retry()`} is in progress after a previous error. + * `error` is cleared; `data` is preserved from before the failure if present. + */ +export type ReactiveState = + | { readonly data: T | undefined; readonly error: undefined; readonly status: 'retrying' } + | { readonly data: T | undefined; readonly error: unknown; readonly status: 'error' } + | { readonly data: T; readonly error: undefined; readonly status: 'loaded' } + | { readonly data: undefined; readonly error: undefined; readonly status: 'loading' }; + +const LOADING_STATE: ReactiveState = Object.freeze({ + data: undefined, + error: undefined, + status: 'loading', +}); + /** * A reactive store that holds the latest value published to a data channel and allows external * systems to subscribe to changes. Compatible with `useSyncExternalStore`, Svelte stores, Solid's - * `from()`, and other reactive primitives that expect a `{ subscribe, getState }` contract. + * `from()`, and other reactive primitives that expect a `{ subscribe, getUnifiedState }` contract. * * @example * ```ts - * // React — throw error from snapshot function to surface via Error Boundary - * const state = useSyncExternalStore(store.subscribe, () => { - * if (store.getError()) throw store.getError(); - * return store.getState(); - * }); - * - * // Vue — check error reactively in a composable - * const data = shallowRef(store.getState()); - * const error = shallowRef(store.getError()); - * store.subscribe(() => { - * data.value = store.getState(); - * error.value = store.getError(); - * }); + * // React — the unified state snapshot has stable identity per update, making it suitable as + * // the second argument to `useSyncExternalStore`. + * const state = useSyncExternalStore(store.subscribe, store.getUnifiedState); + * if (state.status === 'error') return ; + * if (state.status === 'loading') return ; + * return ; * ``` * - * @see {@link createReactiveStoreFromDataPublisher} + * @see {@link createReactiveStoreFromDataPublisherFactory} */ export type ReactiveStore = { /** * Returns the error published to the error channel, or `undefined` if no error has occurred. - * Once set, the error is preserved — subsequent errors do not overwrite it. + * + * @deprecated Use {@link ReactiveStore.getUnifiedState | `getUnifiedState()`} instead. This + * getter returns only the error field and cannot narrow the relationship between the current + * value, error, and status. */ getError(): unknown; /** * Returns the most recent value published to the data channel, or `undefined` if no * notification has arrived yet. On error, continues to return the last known value. + * + * @deprecated Use {@link ReactiveStore.getUnifiedState | `getUnifiedState()`} instead. This + * getter returns only the value field and does not surface lifecycle status (e.g. `retrying`). */ getState(): T | undefined; + /** + * Returns the current lifecycle snapshot: `{ data, error, status }`. The returned object has + * stable identity between state changes, making it safe to pass directly as the + * `getSnapshot` argument to React's `useSyncExternalStore`. + * + * @see {@link ReactiveState} + */ + getUnifiedState(): ReactiveState; + /** + * Re-opens the stream after an error. No-op when the store is not in the `error` state. + */ + retry(): void; /** * Registers a callback to be called whenever the state changes or an error is received. * Returns an unsubscribe function. Safe to call multiple times. @@ -76,25 +135,19 @@ export type ReactiveStore = { * * Things to note: * - * - `getState()` returns `undefined` until the first notification arrives. - * - On error, `getState()` continues to return the last known value and `getError()` returns the - * error. Only the first error is captured. + * - `getUnifiedState()` starts in `status: 'loading'` until the first notification arrives. + * - On error, `getUnifiedState().data` continues to return the last known value and `error` holds + * the failure. Only the first error is captured. * - The function returned by `subscribe` is idempotent — calling it multiple times is safe. + * - Because a `DataPublisher` instance cannot be restarted, {@link ReactiveStore.retry | `retry()`} + * on the returned store throws a + * {@link SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED | `SolanaError`}. * * @param config * - * @example - * ```ts - * const store = createReactiveStoreFromDataPublisher({ - * abortSignal: AbortSignal.timeout(10_000), - * dataChannelName: 'notification', - * dataPublisher, - * errorChannelName: 'error', - * }); - * const unsubscribe = store.subscribe(() => { - * console.log('State updated:', store.getState()); - * }); - * ``` + * @deprecated Use {@link createReactiveStoreFromDataPublisherFactory} instead. That variant accepts + * a factory function for the underlying {@link DataPublisher} and can therefore support + * {@link ReactiveStore.retry | `retry()`}. */ export function createReactiveStoreFromDataPublisher({ abortSignal, @@ -102,40 +155,172 @@ export function createReactiveStoreFromDataPublisher({ dataPublisher, errorChannelName, }: Config): ReactiveStore { - let currentState: TData | undefined; - let currentError: unknown; + let currentState: ReactiveState = LOADING_STATE; const subscribers = new Set<() => void>(); const abortController = new AbortController(); abortSignal.addEventListener('abort', () => abortController.abort(abortSignal.reason)); + function notify() { + subscribers.forEach(cb => cb()); + } + dataPublisher.on( dataChannelName, data => { - currentState = data as TData; - subscribers.forEach(cb => cb()); + currentState = { data: data as TData, error: undefined, status: 'loaded' }; + notify(); }, { signal: abortController.signal }, ); dataPublisher.on( errorChannelName, err => { - if (currentError !== undefined) return; - currentError = err; - // Abort the signal passed to dataPublisher, which stops the subscriptions + if (currentState.status === 'error') return; + currentState = { data: currentState.data, error: err, status: 'error' }; abortController.abort(err); - subscribers.forEach(cb => cb()); + notify(); }, { signal: abortController.signal }, ); return { getError(): unknown { - return currentError; + return currentState.error; + }, + getState(): TData | undefined { + return currentState.data; + }, + getUnifiedState(): ReactiveState { + return currentState; + }, + retry(): void { + throw new SolanaError(SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED); + }, + subscribe(callback: () => void): () => void { + subscribers.add(callback); + return () => { + subscribers.delete(callback); + }; + }, + }; +} + +/** + * Returns a {@link ReactiveStore} that wires itself to a fresh {@link DataPublisher} on + * construction and on every {@link ReactiveStore.retry | `retry()`}. + * + * Unlike {@link createReactiveStoreFromDataPublisher}, this variant accepts a `createDataPublisher` + * factory rather than a ready-made publisher. That lets the store tear down a broken stream and + * open a new one without losing subscribers or the last known value. + * + * Things to note: + * + * - `getUnifiedState()` starts in `status: 'loading'` until the first notification arrives. + * - On error, the store transitions to `status: 'error'` preserving the last known value. Only the + * first error per connection window is captured — a subsequent `retry()` resets that window. + * - `retry()` is a no-op unless the store is currently in `status: 'error'`. When it fires, the + * store transitions to `status: 'retrying'` (preserving stale data), invokes + * `createDataPublisher()`, and wires up a fresh connection. If the factory rejects, the store + * transitions to `status: 'error'` with the rejection reason. + * - Triggering the caller's `abortSignal` disconnects the store permanently; subsequent `retry()` + * calls are no-ops. + * + * @param config + * + * @example + * ```ts + * const store = createReactiveStoreFromDataPublisherFactory({ + * abortSignal, + * async createDataPublisher() { + * return getDataPublisherFromEventEmitter(new WebSocket(url)); + * }, + * dataChannelName: 'message', + * errorChannelName: 'error', + * }); + * const unsubscribe = store.subscribe(() => { + * const snapshot = store.getUnifiedState(); + * if (snapshot.status === 'error') console.error('Connection failed:', snapshot.error); + * else if (snapshot.status === 'loaded') console.log('Latest:', snapshot.data); + * }); + * // Call `store.retry()` to recover after an error — e.g. from a user-triggered "Retry" button. + * ``` + */ +export function createReactiveStoreFromDataPublisherFactory({ + abortSignal, + createDataPublisher, + dataChannelName, + errorChannelName, +}: FactoryConfig): ReactiveStore { + let currentState: ReactiveState = LOADING_STATE; + const subscribers = new Set<() => void>(); + + const outerController = new AbortController(); + abortSignal.addEventListener('abort', () => outerController.abort(abortSignal.reason)); + + function notify() { + subscribers.forEach(cb => cb()); + } + + function connect() { + if (outerController.signal.aborted) return; + // Inner signal is passed to data publisher + const innerController = new AbortController(); + // Forward an abort from the outer signal to the inner one, so that when the caller aborts, we disconnect + // Scope this forwarder to the inner signal so it's removed on reconnection + // and we don't accumulate listeners on the outer signal across retries. + const forwardAbort = () => innerController.abort(outerController.signal.reason); + outerController.signal.addEventListener('abort', forwardAbort, { signal: innerController.signal }); + createDataPublisher().then( + publisher => { + if (innerController.signal.aborted) return; + publisher.on( + dataChannelName, + data => { + currentState = { data: data as TData, error: undefined, status: 'loaded' }; + notify(); + }, + { signal: innerController.signal }, + ); + publisher.on( + errorChannelName, + err => { + if (currentState.status === 'error') return; + currentState = { data: currentState.data, error: err, status: 'error' }; + innerController.abort(err); + notify(); + }, + { signal: innerController.signal }, + ); + }, + err => { + if (innerController.signal.aborted) return; + currentState = { data: currentState.data, error: err, status: 'error' }; + innerController.abort(err); + notify(); + }, + ); + } + + connect(); + + return { + getError(): unknown { + return currentState.error; }, getState(): TData | undefined { + return currentState.data; + }, + getUnifiedState(): ReactiveState { return currentState; }, + retry(): void { + if (outerController.signal.aborted) return; + if (currentState.status !== 'error') return; + currentState = { data: currentState.data, error: undefined, status: 'retrying' }; + notify(); + connect(); + }, subscribe(callback: () => void): () => void { subscribers.add(callback); return () => {