Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .changeset/bumpy-seas-melt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
'@solana/subscribable': major
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createReactiveStoreWithInitialValueAndSlotTracking in @solana/kit is also breaking — it no longer auto-connects, and every caller now needs store.connect() after construction. Kit will bump via the fixed config anyway, but the changelog entry won't surface the migration to consumers.

Suggest adding '@solana/kit': major to the frontmatter and a short paragraph explaining the migration (e.g. "createReactiveStoreWithInitialValueAndSlotTracking no longer fires the RPC request on construction — call store.connect() to start it, or wrap in a useEffect that calls connect() on mount and reset() on cleanup.").

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, added @solana/kit: major

'@solana/rpc-subscriptions-spec': major
'@solana/kit': major
---

Drop auto-connect from `ReactiveStreamStore`; callers explicitly invoke `connect()` to open the underlying stream. Mirrors the action store's caller-driven `dispatch()` pattern — the store is a state machine that callers orchestrate, not a self-starting subscription.

The factory variant returned by `createReactiveStoreFromDataPublisherFactory` now starts in `status: 'idle'`. Call `store.connect()` to open the stream; from `idle`, the store transitions through `loading` → `loaded` (or `error`). A subsequent `connect()` from any non-idle status transitions through `retrying` while preserving the last known value. A new `reset()` method aborts the current connection and returns the store to `idle` without permanently killing it — natural for React effect cleanup.

```ts
const store = createReactiveStoreFromDataPublisherFactory({
abortSignal,
createDataPublisher,
dataChannelName: 'notification',
errorChannelName: 'error',
});
store.connect(); // opens the stream — previously this happened on construction
```

`retry()` is now deprecated; it remains as an error-only alias for `connect()`. Migrate to calling `connect()` directly. Code that previously relied on `retry()` being a no-op when the store was not in `error` state should add an explicit `if (status === 'error') store.connect();` guard at the call site.

`createReactiveStoreFromDataPublisher` (the deprecated non-factory variant accepting a ready-made `DataPublisher`) is removed. Its only documented use was as a backwards-compatibility alias behind `PendingRpcSubscriptionsRequest.reactive()`, which is also removed in this release. Migrate to the factory variant — wrap a ready-made publisher in `() => Promise.resolve(publisher)` if needed — and use `reactiveStore()` for RPC subscriptions.

`createReactiveStoreWithInitialValueAndSlotTracking` in `@solana/kit` no longer fires the RPC request on construction — call `store.connect()` to start it, or wrap in a `useEffect` that calls `connect()` on mount and `reset()` on cleanup. The store starts in `status: 'idle'` and follows the same lifecycle as the underlying stream store.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ function createMockSubscriptionRequest(): {
complete,
error,
mockRequest: {
reactive: jest.fn().mockRejectedValue(new Error('not implemented')),
reactiveStore: jest.fn().mockImplementation(() => {
throw new Error('not implemented');
}),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import type { ReactiveState, ReactiveStreamStore } from '@solana/subscribable';
*/
export type CreateReactiveStoreWithInitialValueAndSlotTrackingConfig<TRpcValue, TSubscriptionValue, TItem> = Readonly<{
/**
* Triggering this abort signal will cancel the pending RPC request and subscription, and
* disconnect the store from further updates.
* Triggering this abort signal will cancel any in-flight RPC request and subscription, and
* permanently disconnect the store from further updates. Subsequent
* {@link ReactiveStreamStore.connect | `connect()`} calls become no-ops.
*/
abortSignal: AbortSignal;
/**
Expand All @@ -42,10 +43,10 @@ export type CreateReactiveStoreWithInitialValueAndSlotTrackingConfig<TRpcValue,
rpcValueMapper: (value: TRpcValue) => TItem;
}>;

const LOADING_STATE: ReactiveState<never> = Object.freeze({
const IDLE_STATE: ReactiveState<never> = Object.freeze({
data: undefined,
error: undefined,
status: 'loading',
status: 'idle',
});

/**
Expand All @@ -59,15 +60,20 @@ const LOADING_STATE: ReactiveState<never> = Object.freeze({
*
* Things to note:
*
* - `getUnifiedState()` starts in `status: 'loading'` until the first response or notification
* arrives. Once data arrives it transitions to `status: 'loaded'` with a
* {@link SolanaRpcResponse} containing the value and the slot context at which it was observed.
* - The returned store starts in `status: 'idle'`. Call
* {@link ReactiveStreamStore.connect | `connect()`} to fire the RPC request and open the
* subscription.
* - From `idle`, the store transitions through `loading` until the first response or notification
* arrives, then to `loaded` with a {@link SolanaRpcResponse} containing the value and the slot
* context at which it was observed.
* - On error from either source, the store transitions to `status: 'error'` preserving the last
* known value. Only the first error per connection window is captured.
* - Calling {@link ReactiveStreamStore.retry | `retry()`} while in `status: 'error'` re-sends the RPC
* request and re-subscribes to the subscription using a fresh inner abort signal. The store
* transitions through `status: 'retrying'` back to `loaded`/`error`.
* - Triggering the caller's abort signal disconnects the store permanently; subsequent `retry()`
* - A subsequent `connect()` after `loaded` or `error` aborts the current connection, transitions
* through `status: 'retrying'` (preserving stale data), and re-fires the RPC request and
* subscription with a fresh inner abort signal.
* - {@link ReactiveStreamStore.reset | `reset()`} aborts the current connection and returns the
* store to `idle`, clearing `data` and `error`.
* - Triggering the caller's `abortSignal` disconnects the store permanently; subsequent `connect()`
* calls are no-ops.
*
* @param config
Expand Down Expand Up @@ -97,11 +103,12 @@ const LOADING_STATE: ReactiveState<never> = Object.freeze({
* const state = balanceStore.getUnifiedState();
* if (state.status === 'error') {
* console.error('Error:', state.error);
* balanceStore.retry();
* balanceStore.connect();
* } else if (state.status === 'loaded') {
* console.log(`Balance at slot ${state.data.context.slot}:`, state.data.value);
* }
* });
* balanceStore.connect();
* ```
*
* @see {@link ReactiveStreamStore}
Expand All @@ -115,8 +122,9 @@ export function createReactiveStoreWithInitialValueAndSlotTracking<TRpcValue, TS
}: CreateReactiveStoreWithInitialValueAndSlotTrackingConfig<TRpcValue, TSubscriptionValue, TItem>): ReactiveStreamStore<
SolanaRpcResponse<TItem>
> {
let currentState: ReactiveState<SolanaRpcResponse<TItem>> = LOADING_STATE;
let currentState: ReactiveState<SolanaRpcResponse<TItem>> = IDLE_STATE;
let lastUpdateSlot = -1n;
let currentInnerController: AbortController | undefined;
const subscribers = new Set<() => void>();

const outerController = new AbortController();
Expand All @@ -126,24 +134,44 @@ export function createReactiveStoreWithInitialValueAndSlotTracking<TRpcValue, TS
subscribers.forEach(cb => cb());
}

function connect() {
function setState(next: ReactiveState<SolanaRpcResponse<TItem>>) {
if (
currentState.status === next.status &&
currentState.data === next.data &&
currentState.error === next.error
) {
return;
}
currentState = next;
notify();
}

function performConnect() {
if (outerController.signal.aborted) return;
// Abort any currently active connection before starting a fresh one.
currentInnerController?.abort();
// Transition based on whether we have prior data/error to preserve.
if (currentState.status === 'idle') {
setState({ data: undefined, error: undefined, status: 'loading' });
} else {
setState({ data: currentState.data, error: undefined, status: 'retrying' });
}

const innerController = new AbortController();
currentInnerController = innerController;
const forwardAbort = () => innerController.abort(outerController.signal.reason);
outerController.signal.addEventListener('abort', forwardAbort, { signal: innerController.signal });
const innerSignal = innerController.signal;

function handleError(err: unknown) {
if (innerSignal.aborted) return;
if (currentState.status === 'error') return;
currentState = { data: currentState.data, error: err, status: 'error' };
setState({ data: currentState.data, error: err, status: 'error' });
innerController.abort(err);
notify();
}

function handleValue(value: SolanaRpcResponse<TItem>) {
currentState = { data: value, error: undefined, status: 'loaded' };
notify();
setState({ data: value, error: undefined, status: 'loaded' });
}

rpcRequest
Expand Down Expand Up @@ -175,9 +203,16 @@ export function createReactiveStoreWithInitialValueAndSlotTracking<TRpcValue, TS
.catch(handleError);
}

connect();
function performReset() {
currentInnerController?.abort();
currentInnerController = undefined;
// `lastUpdateSlot` resets too — a fresh connect() starts a new slot window.
lastUpdateSlot = -1n;
setState(IDLE_STATE);
}

return {
connect: performConnect,
getError(): unknown {
return currentState.error;
},
Expand All @@ -187,12 +222,10 @@ export function createReactiveStoreWithInitialValueAndSlotTracking<TRpcValue, TS
getUnifiedState(): ReactiveState<SolanaRpcResponse<TItem>> {
return currentState;
},
reset: performReset,
retry(): void {
if (outerController.signal.aborted) return;
if (currentState.status !== 'error') return;
currentState = { data: currentState.data, error: undefined, status: 'retrying' };
notify();
connect();
performConnect();
},
subscribe(callback: () => void): () => void {
subscribers.add(callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,74 +34,6 @@ describe('createSubscriptionRpc', () => {
});
});

describe('PendingRpcSubscriptionsRequest.reactive()', () => {
let mockTransport: jest.MockedFunction<RpcSubscriptionsTransport>;
let mockOn: jest.Mock;
let mockDataPublisher: DataPublisher;
let rpcSubscriptions: RpcSubscriptions<TestRpcSubscriptionNotifications>;
function publish(type: string, payload: unknown) {
mockOn.mock.calls.filter(([actualType]) => actualType === type).forEach(([_, listener]) => listener(payload));
}
beforeEach(() => {
mockOn = jest.fn().mockReturnValue(function unsubscribe() {});
mockDataPublisher = { on: mockOn };
mockTransport = jest.fn().mockResolvedValue(mockDataPublisher);
rpcSubscriptions = createSubscriptionRpc<TestRpcSubscriptionNotifications>({
api: {
thingNotifications(...args: unknown[]) {
return {
execute: jest.fn().mockResolvedValue(mockDataPublisher),
request: { methodName: 'thingNotifications', params: args },
};
},
},
transport: mockTransport,
});
});

it('passes the abort signal to the transport', async () => {
expect.assertions(1);
const abortController = new AbortController();
await rpcSubscriptions.thingNotifications().reactive({ abortSignal: abortController.signal });
expect(mockTransport).toHaveBeenCalledWith(expect.objectContaining({ signal: abortController.signal }));
});
it('returns a store whose getState() starts as undefined', async () => {
expect.assertions(1);
const store = await rpcSubscriptions
.thingNotifications()
.reactive({ abortSignal: new AbortController().signal });
expect(store.getState()).toBeUndefined();
});
it('returns a store whose getState() reflects incoming notifications', async () => {
expect.assertions(1);
const store = await rpcSubscriptions
.thingNotifications()
.reactive({ abortSignal: new AbortController().signal });
publish('notification', { value: 42 });
expect(store.getState()).toStrictEqual({ value: 42 });
});
it('calls store subscribers when a notification arrives', async () => {
expect.assertions(1);
const store = await rpcSubscriptions
.thingNotifications()
.reactive({ abortSignal: new AbortController().signal });
const subscriber = jest.fn();
store.subscribe(subscriber);
publish('notification', { value: 42 });
expect(subscriber).toHaveBeenCalledTimes(1);
});
it('surfaces errors via getError()', async () => {
expect.assertions(2);
const store = await rpcSubscriptions
.thingNotifications()
.reactive({ abortSignal: new AbortController().signal });
expect(store.getError()).toBeUndefined();
const error = new Error('o no');
publish('error', error);
expect(store.getError()).toBe(error);
});
});

describe('PendingRpcSubscriptionsRequest.reactiveStore()', () => {
let mockTransport: jest.MockedFunction<RpcSubscriptionsTransport>;
let mockOn: jest.Mock;
Expand Down Expand Up @@ -133,28 +65,42 @@ describe('PendingRpcSubscriptionsRequest.reactiveStore()', () => {
});
});

it('passes the abort signal to the transport', async () => {
it('returns a store that starts in `idle` status and does not call the transport before connect()', () => {
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
expect(store.getUnifiedState()).toStrictEqual({
data: undefined,
error: undefined,
status: 'idle',
});
expect(mockTransport).not.toHaveBeenCalled();
});
it('passes the abort signal to the transport on connect()', async () => {
expect.assertions(1);
const abortController = new AbortController();
rpcSubscriptions.thingNotifications().reactiveStore({ abortSignal: abortController.signal });
const store = rpcSubscriptions.thingNotifications().reactiveStore({ abortSignal: abortController.signal });
store.connect();
await flushMicrotasks();
expect(mockTransport).toHaveBeenCalledWith(expect.objectContaining({ signal: abortController.signal }));
});
it('returns a store that starts in `loading` status before the transport resolves', () => {
it('transitions to `loading` after connect() before the transport resolves', () => {
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
store.connect();
expect(store.getUnifiedState()).toStrictEqual({
data: undefined,
error: undefined,
status: 'loading',
});
});
it('returns a store whose state reflects incoming notifications', async () => {
it('reflects incoming notifications after connect()', async () => {
expect.assertions(1);
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
store.connect();
await flushMicrotasks();
publish('notification', { value: 42 });
expect(store.getUnifiedState()).toStrictEqual({
Expand All @@ -168,6 +114,7 @@ describe('PendingRpcSubscriptionsRequest.reactiveStore()', () => {
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
store.connect();
await flushMicrotasks();
const subscriber = jest.fn();
store.subscribe(subscriber);
Expand All @@ -179,6 +126,7 @@ describe('PendingRpcSubscriptionsRequest.reactiveStore()', () => {
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
store.connect();
await flushMicrotasks();
const error = new Error('o no');
publish('error', error);
Expand All @@ -188,21 +136,23 @@ describe('PendingRpcSubscriptionsRequest.reactiveStore()', () => {
status: 'error',
});
});
it('re-invokes the transport on retry() after an error', async () => {
it('re-invokes the transport on connect() after an error', async () => {
expect.assertions(1);
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
store.connect();
await flushMicrotasks();
publish('error', new Error('stream died'));
store.retry();
store.connect();
await flushMicrotasks();
expect(mockTransport).toHaveBeenCalledTimes(2);
});
it('aborts the signal forwarded to the data publisher listeners when the caller aborts', async () => {
expect.assertions(2);
const abortController = new AbortController();
rpcSubscriptions.thingNotifications().reactiveStore({ abortSignal: abortController.signal });
const store = rpcSubscriptions.thingNotifications().reactiveStore({ abortSignal: abortController.signal });
store.connect();
await flushMicrotasks();
const onCall = mockOn.mock.calls.find(([channel]: [string]) => channel === 'notification');
const listenerSignal = (onCall![2] as { signal: AbortSignal }).signal;
Expand All @@ -215,6 +165,7 @@ describe('PendingRpcSubscriptionsRequest.reactiveStore()', () => {
const store = rpcSubscriptions
.thingNotifications()
.reactiveStore({ abortSignal: new AbortController().signal });
store.connect();
await flushMicrotasks();
expect(store.getUnifiedState()).toBe(store.getUnifiedState());
publish('notification', { value: 42 });
Expand Down
Loading
Loading