Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/brown-candles-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
'@solana/rpc-subscriptions-spec': minor
'@solana/kit': minor
---

Add a `reactiveStore()` method to `PendingRpcSubscriptionsRequest`. Unlike `reactive()`, this variant returns a `ReactiveStore` synchronously and supports `retry()` to reconnect after an error. `reactive()` is now `@deprecated` in favour of `reactiveStore()`.

```ts
const store = rpc.accountNotifications(address).reactiveStore({ abortSignal });
const state = useSyncExternalStore(store.subscribe, store.getUnifiedState);
if (state.status === 'error') return <ErrorMessage error={state.error} onRetry={store.retry} />;
```
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ function createMockSubscriptionRequest(): {
error,
mockRequest: {
reactive: jest.fn().mockRejectedValue(new Error('not implemented')),
reactiveStore: jest.fn().mockImplementation(() => {
throw new Error('not implemented');
}),
subscribe: jest.fn().mockResolvedValue(asyncIterable),
},
pushNotification,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ function createMockSubscriptionRequest(): {
error,
mockRequest: {
reactive: jest.fn().mockRejectedValue(new Error('not implemented')),
reactiveStore: jest.fn().mockImplementation(() => {
throw new Error('not implemented');
}),
subscribe: jest.fn().mockResolvedValue(asyncIterable),
},
pushNotification,
Expand Down Expand Up @@ -633,6 +636,9 @@ describe('createReactiveStoreWithInitialValueAndSlotTracking', () => {
};
const rpcSubscriptionRequest: PendingRpcSubscriptionsRequest<SolanaRpcResponse<TestValue>> = {
reactive: jest.fn().mockRejectedValue(new Error('not implemented')),
reactiveStore: jest.fn().mockImplementation(() => {
throw new Error('not implemented');
}),
subscribe: jest.fn().mockImplementation(() => {
const instance = createMockSubscriptionRequest();
subscriptionInstances.push({
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN, SolanaError } from '@solana/errors';
import { DataPublisher } from '@solana/subscribable';

import { createSubscriptionRpc, type RpcSubscriptions } from '../rpc-subscriptions';
import { RpcSubscriptionsTransport } from '../rpc-subscriptions-transport';

interface TestRpcSubscriptionNotifications {
thingNotifications(...args: unknown[]): unknown;
thingNotifications(...args: unknown[]): { value: number };
}

describe('createSubscriptionRpc', () => {
Expand Down Expand Up @@ -31,3 +33,191 @@ describe('createSubscriptionRpc', () => {
expect(rpcSubscriptions).not.toHaveProperty('then');
});
});

describe('PendingRpcSubscriptionsRequest.reactive()', () => {
let mockTransport: jest.MockedFunction<RpcSubscriptionsTransport>;
let mockOn: jest.Mock;
let mockDataPublisher: DataPublisher;
let rpcSubscriptions: RpcSubscriptions<TestRpcSubscriptionNotifications>;
function publish(type: string, payload: unknown) {
mockOn.mock.calls.filter(([actualType]) => actualType === type).forEach(([_, listener]) => listener(payload));
}
beforeEach(() => {
mockOn = jest.fn().mockReturnValue(function unsubscribe() {});
mockDataPublisher = { on: mockOn };
mockTransport = jest.fn().mockResolvedValue(mockDataPublisher);
rpcSubscriptions = createSubscriptionRpc<TestRpcSubscriptionNotifications>({
api: {
thingNotifications(...args: unknown[]) {
return {
execute: jest.fn().mockResolvedValue(mockDataPublisher),
request: { methodName: 'thingNotifications', params: args },
};
},
},
transport: mockTransport,
});
});

it('passes the abort signal to the transport', async () => {
expect.assertions(1);
const abortController = new AbortController();
await rpcSubscriptions.thingNotifications().reactive({ abortSignal: abortController.signal });
expect(mockTransport).toHaveBeenCalledWith(expect.objectContaining({ signal: abortController.signal }));
});
it('returns a store whose getState() starts as undefined', async () => {
expect.assertions(1);
const store = await rpcSubscriptions
.thingNotifications()
.reactive({ abortSignal: new AbortController().signal });
expect(store.getState()).toBeUndefined();
});
it('returns a store whose getState() reflects incoming notifications', async () => {
expect.assertions(1);
const store = await rpcSubscriptions
.thingNotifications()
.reactive({ abortSignal: new AbortController().signal });
publish('notification', { value: 42 });
expect(store.getState()).toStrictEqual({ value: 42 });
});
it('calls store subscribers when a notification arrives', async () => {
expect.assertions(1);
const store = await rpcSubscriptions
.thingNotifications()
.reactive({ abortSignal: new AbortController().signal });
const subscriber = jest.fn();
store.subscribe(subscriber);
publish('notification', { value: 42 });
expect(subscriber).toHaveBeenCalledTimes(1);
});
it('surfaces errors via getError()', async () => {
expect.assertions(2);
const store = await rpcSubscriptions
.thingNotifications()
.reactive({ abortSignal: new AbortController().signal });
expect(store.getError()).toBeUndefined();
const error = new Error('o no');
publish('error', error);
expect(store.getError()).toBe(error);
});
});

describe('PendingRpcSubscriptionsRequest.reactiveStore()', () => {
let mockTransport: jest.MockedFunction<RpcSubscriptionsTransport>;
let mockOn: jest.Mock;
let mockDataPublisher: DataPublisher;
let rpcSubscriptions: RpcSubscriptions<TestRpcSubscriptionNotifications>;
function publish(type: string, payload: unknown) {
Comment thread
mcintyre94 marked this conversation as resolved.
mockOn.mock.calls.filter(([actualType]) => actualType === type).forEach(([_, listener]) => listener(payload));
}
// Two ticks: one for the `createDataPublisher()` promise to resolve inside `connect()`,
// one for the `.then` handler that wires up the `on(...)` listeners.
async function flushMicrotasks() {
await Promise.resolve();
await Promise.resolve();
}
beforeEach(() => {
mockOn = jest.fn().mockReturnValue(function unsubscribe() {});
mockDataPublisher = { on: mockOn };
mockTransport = jest.fn().mockResolvedValue(mockDataPublisher);
rpcSubscriptions = createSubscriptionRpc<TestRpcSubscriptionNotifications>({
api: {
thingNotifications(...args: unknown[]) {
return {
execute: jest.fn().mockResolvedValue(mockDataPublisher),
request: { methodName: 'thingNotifications', params: args },
};
},
},
transport: mockTransport,
});
});

it('passes the abort signal to the transport', async () => {
expect.assertions(1);
const abortController = new AbortController();
rpcSubscriptions.thingNotifications().reactiveStore({ abortSignal: abortController.signal });
await flushMicrotasks();
expect(mockTransport).toHaveBeenCalledWith(expect.objectContaining({ signal: abortController.signal }));
});
it('returns a store that starts in `loading` status before the transport resolves', () => {
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
expect(store.getUnifiedState()).toStrictEqual({
data: undefined,
error: undefined,
status: 'loading',
});
});
it('returns a store whose state reflects incoming notifications', async () => {
expect.assertions(1);
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
await flushMicrotasks();
publish('notification', { value: 42 });
expect(store.getUnifiedState()).toStrictEqual({
data: { value: 42 },
error: undefined,
status: 'loaded',
});
});
it('calls store subscribers when a notification arrives', async () => {
expect.assertions(1);
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
await flushMicrotasks();
const subscriber = jest.fn();
store.subscribe(subscriber);
publish('notification', { value: 42 });
expect(subscriber).toHaveBeenCalledTimes(1);
});
it('surfaces errors via getUnifiedState()', async () => {
expect.assertions(1);
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
await flushMicrotasks();
const error = new Error('o no');
publish('error', error);
expect(store.getUnifiedState()).toStrictEqual({
data: undefined,
error,
status: 'error',
});
});
it('re-invokes the transport on retry() after an error', async () => {
expect.assertions(1);
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
await flushMicrotasks();
publish('error', new Error('stream died'));
store.retry();
await flushMicrotasks();
expect(mockTransport).toHaveBeenCalledTimes(2);
});
it('aborts the signal forwarded to the data publisher listeners when the caller aborts', async () => {
expect.assertions(2);
const abortController = new AbortController();
rpcSubscriptions.thingNotifications().reactiveStore({ abortSignal: abortController.signal });
await flushMicrotasks();
const onCall = mockOn.mock.calls.find(([channel]: [string]) => channel === 'notification');
const listenerSignal = (onCall![2] as { signal: AbortSignal }).signal;
expect(listenerSignal.aborted).toBe(false);
abortController.abort();
expect(listenerSignal.aborted).toBe(true);
});
it('returns the same getUnifiedState() snapshot across consecutive calls when state has not changed', async () => {
expect.assertions(2);
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
await flushMicrotasks();
expect(store.getUnifiedState()).toBe(store.getUnifiedState());
publish('notification', { value: 42 });
expect(store.getUnifiedState()).toBe(store.getUnifiedState());
});
});
Comment thread
mcintyre94 marked this conversation as resolved.
28 changes: 25 additions & 3 deletions packages/rpc-subscriptions-spec/src/rpc-subscriptions-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import { ReactiveStore } from '@solana/subscribable';
* {@link PendingRpcSubscriptionsRequest | PendingRpcSubscriptionsRequest<TNotification>} will
* trigger the subscription and return a promise for an async iterable that vends `TNotifications`.
*
* Calling the {@link PendingRpcSubscriptionsRequest.reactive | `reactive(options)`} method will
* trigger the subscription and return a promise for a {@link ReactiveStore} compatible with
* `useSyncExternalStore`, Svelte stores, and other reactive primitives.
* Calling the {@link PendingRpcSubscriptionsRequest.reactiveStore | `reactiveStore(options)`}
* method will return a {@link ReactiveStore} compatible with `useSyncExternalStore`, Svelte stores,
* and other reactive primitives.
*/
export type PendingRpcSubscriptionsRequest<TNotification> = {
/**
Expand All @@ -28,8 +28,30 @@ export type PendingRpcSubscriptionsRequest<TNotification> = {
* return store.getState();
* });
* ```
*
* @deprecated Use {@link PendingRpcSubscriptionsRequest.reactiveStore | `reactiveStore()`}
* instead. The synchronous variant returns a store that reconnects on
* {@link ReactiveStore.retry | `retry()`} after an error, whereas the store returned by
* `reactive()` cannot recover once its underlying `DataPublisher` has failed.
*/
reactive(options: RpcSubscribeOptions): Promise<ReactiveStore<TNotification>>;
/**
* Synchronously returns a {@link ReactiveStore} that subscribes in the background and holds the
* latest notification. Compatible with `useSyncExternalStore` and other reactive primitives
* that expect a `{ subscribe, getUnifiedState }` contract. The store opens a fresh subscription
* on construction and on every {@link ReactiveStore.retry | `retry()`}.
*
* @example
* ```ts
* const store = rpc.accountNotifications(address).reactiveStore({ abortSignal });
* // React — the unified snapshot has stable identity per update.
* const state = useSyncExternalStore(store.subscribe, store.getUnifiedState);
* if (state.status === 'error') return <ErrorMessage error={state.error} onRetry={store.retry} />;
* if (state.status === 'loading') return <Spinner />;
* return <View data={state.data} />;
* ```
*/
reactiveStore(options: RpcSubscribeOptions): ReactiveStore<TNotification>;
Comment thread
lorisleiva marked this conversation as resolved.
/**
* Triggers the subscription and returns a promise for an async iterable of notifications.
* Use `for await...of` to consume notifications as they arrive. Abort the signal to
Expand Down
16 changes: 15 additions & 1 deletion packages/rpc-subscriptions-spec/src/rpc-subscriptions.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { SOLANA_ERROR__RPC_SUBSCRIPTIONS__CANNOT_CREATE_SUBSCRIPTION_PLAN, SolanaError } from '@solana/errors';
import { Callable, Flatten, OverloadImplementations, UnionToIntersection } from '@solana/rpc-spec-types';
import { createAsyncIterableFromDataPublisher, createReactiveStoreFromDataPublisher } from '@solana/subscribable';
import {
createAsyncIterableFromDataPublisher,
createReactiveStoreFromDataPublisher,
createReactiveStoreFromDataPublisherFactory,
} from '@solana/subscribable';

import { RpcSubscriptionsApi, RpcSubscriptionsPlan } from './rpc-subscriptions-api';
import { PendingRpcSubscriptionsRequest, RpcSubscribeOptions } from './rpc-subscriptions-request';
Expand Down Expand Up @@ -91,6 +95,16 @@ function createPendingRpcSubscription<TNotification>(
errorChannelName: 'error',
});
},
reactiveStore({ abortSignal }: RpcSubscribeOptions) {
return createReactiveStoreFromDataPublisherFactory<TNotification>({
abortSignal,
createDataPublisher() {
return transport({ signal: abortSignal, ...subscriptionsPlan });
},
dataChannelName: 'notification',
errorChannelName: 'error',
});
},
async subscribe({ abortSignal }: RpcSubscribeOptions): Promise<AsyncIterable<TNotification>> {
const notificationsDataPublisher = await transport({
signal: abortSignal,
Expand Down
Loading
Loading