Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/clever-spies-shout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@solana/subscribable': minor
'@solana/errors': minor
'@solana/kit': minor
---

Add `retry()` and `getUnifiedState()` to `ReactiveStore`. The new `getUnifiedState()` returns a discriminated `{ data, error, status }` snapshot with stable identity, so stores can be passed directly to `useSyncExternalStore` without an intermediate wrapper. `getState()` and `getError()` remain on the type but are now `@deprecated` in favour of the unified snapshot.

A new `createReactiveStoreFromDataPublisherFactory` function is also introduced. It accepts a `createDataPublisher: () => Promise<DataPublisher>` factory rather than a ready-made publisher, which lets the store reconnect via `retry()` after an error. The existing `createReactiveStoreFromDataPublisher` is now `@deprecated`; calling `retry()` on a store it produced throws a new `SolanaError` with code `SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED`.

`createReactiveStoreWithInitialValueAndSlotTracking` (from `@solana/kit`) now supports `retry()`, which re-sends the RPC request and re-subscribes to the subscription with a fresh abort signal while preserving the last known slot and value.
5 changes: 5 additions & 0 deletions packages/errors/src/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CLOSED_BEFORE_MESSAGE_BUFF
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED = 8190003;
export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_FAILED_TO_CONNECT = 8190004;

// Subscribable-related errors.
// Reserve error codes in the range [8195000-8195999].
export const SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED = 8195000;

// Program-client-related errors.
// Reserve error codes in the range [8500000-8500999].
export const SOLANA_ERROR__PROGRAM_CLIENTS__INSUFFICIENT_ACCOUNT_METAS = 8500000;
Expand Down Expand Up @@ -641,6 +645,7 @@ export type SolanaErrorCode =
| typeof SOLANA_ERROR__SIGNER__TRANSACTION_SENDING_SIGNER_MISSING
| typeof SOLANA_ERROR__SIGNER__WALLET_ACCOUNT_CANNOT_SIGN_TRANSACTION
| typeof SOLANA_ERROR__SIGNER__WALLET_MULTISIGN_UNIMPLEMENTED
| typeof SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED
| typeof SOLANA_ERROR__SUBTLE_CRYPTO__CANNOT_EXPORT_NON_EXTRACTABLE_KEY
| typeof SOLANA_ERROR__SUBTLE_CRYPTO__DIGEST_UNIMPLEMENTED
| typeof SOLANA_ERROR__SUBTLE_CRYPTO__DISALLOWED_IN_INSECURE_CONTEXT
Expand Down
4 changes: 4 additions & 0 deletions packages/errors/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ import {
SOLANA_ERROR__SIGNER__TRANSACTION_SENDING_SIGNER_MISSING,
SOLANA_ERROR__SIGNER__WALLET_ACCOUNT_CANNOT_SIGN_TRANSACTION,
SOLANA_ERROR__SIGNER__WALLET_MULTISIGN_UNIMPLEMENTED,
SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED,
SOLANA_ERROR__SUBTLE_CRYPTO__CANNOT_EXPORT_NON_EXTRACTABLE_KEY,
SOLANA_ERROR__SUBTLE_CRYPTO__DIGEST_UNIMPLEMENTED,
SOLANA_ERROR__SUBTLE_CRYPTO__DISALLOWED_IN_INSECURE_CONTEXT,
Expand Down Expand Up @@ -711,6 +712,9 @@ export const SolanaErrorMessages: Readonly<{
'The account supports the following features: $supportedFeatures.',
[SOLANA_ERROR__SIGNER__WALLET_MULTISIGN_UNIMPLEMENTED]:
'Wallet account signers do not support signing multiple messages/transactions in a single operation',
[SOLANA_ERROR__SUBSCRIBABLE__RETRY_NOT_SUPPORTED]:
'This `ReactiveStore` does not support retry. Use `createReactiveStoreFromDataPublisherFactory` ' +
'to construct a retryable store.',
[SOLANA_ERROR__SUBTLE_CRYPTO__CANNOT_EXPORT_NON_EXTRACTABLE_KEY]: 'Cannot export a non-extractable key.',
[SOLANA_ERROR__SUBTLE_CRYPTO__DIGEST_UNIMPLEMENTED]: 'No digest implementation could be found.',
[SOLANA_ERROR__SUBTLE_CRYPTO__DISALLOWED_IN_INSECURE_CONTEXT]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,4 +530,292 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => {
expect(subscriber).not.toHaveBeenCalled();
});
});

describe('getUnifiedState()', () => {
it('starts in `loading` status', () => {
const { mockRequest: rpcRequest } = createMockRpcRequest();
const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
expect(store.getUnifiedState()).toStrictEqual({
data: undefined,
error: undefined,
status: 'loading',
});
});
it('transitions to `loaded` after the RPC response arrives', async () => {
expect.assertions(1);
const { mockRequest: rpcRequest, resolve } = createMockRpcRequest();
const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
resolve(rpcResponse(100, { count: 42 }));
await jest.runAllTimersAsync();
expect(store.getUnifiedState()).toStrictEqual({
data: { context: { slot: 100n }, value: 42 },
error: undefined,
status: 'loaded',
});
});
it('transitions to `error` on RPC failure, preserving nothing (no prior data)', async () => {
expect.assertions(1);
const { mockRequest: rpcRequest, reject } = createMockRpcRequest();
const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
const failure = new Error('rpc failed');
reject(failure);
await jest.runAllTimersAsync();
expect(store.getUnifiedState()).toStrictEqual({
data: undefined,
error: failure,
status: 'error',
});
});
it('transitions to `error` on subscription failure, preserving the RPC value', async () => {
expect.assertions(1);
const { mockRequest: rpcRequest, resolve } = createMockRpcRequest();
const { mockRequest: rpcSubscriptionRequest, error } = createMockSubscriptionRequest();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
resolve(rpcResponse(100, { count: 42 }));
await jest.runAllTimersAsync();
const failure = new Error('subscription failed');
error(failure);
await jest.runAllTimersAsync();
expect(store.getUnifiedState()).toStrictEqual({
data: { context: { slot: 100n }, value: 42 },
error: failure,
status: 'error',
});
});
});

describe('retry()', () => {
// Helper: returns mocks where each invocation of rpcRequest.send() /
// rpcSubscriptionRequest.subscribe() yields a fresh controllable instance — needed to
// exercise retry, which re-invokes both.
function createRetryableMocks() {
const rpcInstances: {
reject(error: unknown): void;
resolve(response: SolanaRpcResponse<TestValue>): void;
}[] = [];
const subscriptionInstances: {
error(err: unknown): void;
pushNotification(notification: SolanaRpcResponse<TestValue>): void;
}[] = [];
const rpcRequest: PendingRpcRequest<SolanaRpcResponse<TestValue>> = {
send: jest.fn().mockImplementation(() => {
const { promise, resolve, reject } = Promise.withResolvers<SolanaRpcResponse<TestValue>>();
rpcInstances.push({ reject, resolve });
return promise;
}),
};
const rpcSubscriptionRequest: PendingRpcSubscriptionsRequest<SolanaRpcResponse<TestValue>> = {
reactive: jest.fn().mockRejectedValue(new Error('not implemented')),
subscribe: jest.fn().mockImplementation(() => {
const instance = createMockSubscriptionRequest();
subscriptionInstances.push({
error: instance.error,
pushNotification: instance.pushNotification,
});
return (instance.mockRequest.subscribe as jest.Mock)();
}),
};
return { rpcInstances, rpcRequest, rpcSubscriptionRequest, subscriptionInstances };
}

it('is a no-op when the store is not in error state', async () => {
expect.assertions(1);
const { rpcRequest, rpcSubscriptionRequest } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
await jest.runAllTimersAsync();
store.retry();
expect(rpcRequest.send).toHaveBeenCalledTimes(1);
});
it('transitions to `retrying` with preserved data and clears the error', async () => {
expect.assertions(1);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest, subscriptionInstances } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
rpcInstances[0].resolve(rpcResponse(100, { count: 42 }));
await jest.runAllTimersAsync();
subscriptionInstances[0].error(new Error('stream died'));
await jest.runAllTimersAsync();
store.retry();
expect(store.getUnifiedState()).toStrictEqual({
data: { context: { slot: 100n }, value: 42 },
error: undefined,
status: 'retrying',
});
});
it('re-invokes the RPC request and subscription on retry', async () => {
expect.assertions(2);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
rpcInstances[0].reject(new Error('boom'));
await jest.runAllTimersAsync();
store.retry();
await jest.runAllTimersAsync();
expect(rpcRequest.send).toHaveBeenCalledTimes(2);
expect(rpcSubscriptionRequest.subscribe).toHaveBeenCalledTimes(2);
});
it('recovers to `loaded` when the retried RPC succeeds', async () => {
expect.assertions(1);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
rpcInstances[0].reject(new Error('first failure'));
await jest.runAllTimersAsync();
store.retry();
await jest.runAllTimersAsync();
rpcInstances[1].resolve(rpcResponse(200, { count: 99 }));
await jest.runAllTimersAsync();
expect(store.getUnifiedState()).toStrictEqual({
data: { context: { slot: 200n }, value: 99 },
error: undefined,
status: 'loaded',
});
});
it('transitions to `error` again when the retried RPC also fails', async () => {
expect.assertions(1);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
rpcInstances[0].reject(new Error('first'));
await jest.runAllTimersAsync();
store.retry();
await jest.runAllTimersAsync();
const secondFailure = new Error('second');
rpcInstances[1].reject(secondFailure);
await jest.runAllTimersAsync();
expect(store.getUnifiedState()).toStrictEqual({
data: undefined,
error: secondFailure,
status: 'error',
});
});
it('notifies subscribers on the retrying transition', async () => {
expect.assertions(1);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
rpcInstances[0].reject(new Error('fail'));
await jest.runAllTimersAsync();
const subscriber = jest.fn();
store.subscribe(subscriber);
store.retry();
expect(subscriber).toHaveBeenCalledTimes(1);
});
it('does not re-invoke the RPC request after the caller has aborted', async () => {
expect.assertions(1);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
rpcInstances[0].reject(new Error('fail'));
await jest.runAllTimersAsync();
abortController.abort();
store.retry();
await jest.runAllTimersAsync();
expect(rpcRequest.send).toHaveBeenCalledTimes(1);
});
it('leaves the store in `error` state after the caller has aborted', async () => {
expect.assertions(1);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
const failure = new Error('fail');
rpcInstances[0].reject(failure);
await jest.runAllTimersAsync();
abortController.abort();
store.retry();
await jest.runAllTimersAsync();
expect(store.getUnifiedState()).toStrictEqual({
data: undefined,
error: failure,
status: 'error',
});
});
it('does not notify subscribers after the caller has aborted', async () => {
expect.assertions(1);
const { rpcInstances, rpcRequest, rpcSubscriptionRequest } = createRetryableMocks();
const store = createReactiveStoreWithInitialValueAndSlotTracking({
abortSignal: abortController.signal,
rpcRequest,
rpcSubscriptionRequest,
rpcSubscriptionValueMapper: v => v.count,
rpcValueMapper: v => v.count,
});
rpcInstances[0].reject(new Error('fail'));
await jest.runAllTimersAsync();
abortController.abort();
const subscriber = jest.fn();
store.subscribe(subscriber);
store.retry();
await jest.runAllTimersAsync();
expect(subscriber).not.toHaveBeenCalled();
});
});
});
Loading
Loading