Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/cool-coats-invent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
'@solana/subscribable': minor
---

Add `store.withSignal(signal)` on `ReactiveActionStore` for attaching a caller-provided `AbortSignal` to a dispatch. The method returns a thin wrapper exposing only `dispatch` / `dispatchAsync`; the supplied signal is composed with the store's internal per-dispatch controller via `AbortSignal.any`, so aborting either cancels the in-flight call and surfaces the abort reason on state. The bare `dispatch` / `dispatchAsync` signatures are unchanged — this is additive.

Two common patterns the wrapper enables:

- **Per-attempt timeout.** `store.withSignal(AbortSignal.timeout(5_000)).dispatch(args)` — a fresh clock per call. Different call sites can pass different timeouts.
- **Shared kill switch.** Hold one `AbortController`, bind the wrapper once (`const killable = store.withSignal(killCtrl.signal)`), and use `killable.dispatch(...)` everywhere. Aborting the controller cancels the current call and makes future calls on the wrapper start aborted.
12 changes: 6 additions & 6 deletions packages/rpc-spec/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ export type PendingRpcRequest<TResponse> = {
/**
* Synchronously returns a {@link ReactiveActionStore} in the `idle` state, ready to dispatch
* the underlying request. Compatible with `useSyncExternalStore` and other reactive primitives
* that expect a `{ subscribe, getState }` contract. Call `dispatch()` to fire the request,
* again on retry, or `reset()` to abort the in-flight call and return to `status: 'idle'`.
* that expect a `{ subscribe, getState }` contract. Call `dispatch()` to fire the request
* (again on retry), or `reset()` to abort the in-flight call and return to `status: 'idle'`.
*
* Unlike {@link PendingRpcRequest.send}, this method does not fire the request on creation —
* the caller is responsible for dispatching. This makes signal handling uniform between
* `reactiveStore`-derived stores and stores created directly from `createReactiveActionStore`
* (which also do not auto-fire).
* the caller is responsible for dispatching. This makes signal handling uniform: attach a
* caller-provided cancellation source per dispatch via
* `store.withSignal(signal).dispatch(...)`.
*
* @example
* ```ts
* const store = rpc.getAccountInfo(address).reactiveStore();
* store.dispatch(); // fire the initial request
* store.withSignal(AbortSignal.timeout(5_000)).dispatch(); // fire with a per-attempt timeout
* const state = useSyncExternalStore(store.subscribe, store.getState);
* if (state.status === 'error') return <ErrorMessage error={state.error} onRetry={store.dispatch} />;
* if (state.status === 'running' && !state.data) return <Spinner />;
Expand Down
16 changes: 16 additions & 0 deletions packages/subscribable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,22 @@ Things to note:
- Two ways to trigger the action:
- `dispatch(...)` — fire-and-forget. Returns `undefined` synchronously and never throws; safe to call from UI event handlers without a `.catch`. Failures surface on state as `{ status: 'error' }`.
- `dispatchAsync(...)` — returns a promise that resolves to the wrapped function's result. Rejects on failure and with an `AbortError` when superseded or `reset()`. Use from imperative code that needs the resolved value; pair with [`isAbortError`](../promises#isaborterrorerr) from `@solana/promises` to filter abort rejections.
- Attach a caller-provided `AbortSignal` to a `dispatch` or `dispatchAsync` call via `store.withSignal(signal)`:

```ts
// Per-attempt timeout — fresh signal per call:
store.withSignal(AbortSignal.timeout(5_000)).dispatch(someAccountId);

// Shared kill switch — bind the wrapper once, reuse everywhere:
const killCtrl = new AbortController();
const killable = store.withSignal(killCtrl.signal);
killable.dispatch(someAccountId);
killable.dispatch(someAccountId);
killCtrl.abort(); // cancels in-flight and short-circuits future calls
```

The wrapped signal is composed with the store's internal per-dispatch controller via `AbortSignal.any`, so aborting either cancels the in-flight call and surfaces the abort reason on state. The wrapper exposes only `dispatch` / `dispatchAsync` — `getState` / `subscribe` / `reset` stay on the parent store.

- Calling either dispatch while one is in flight aborts the previous call; its outcome is dropped from state regardless of which variant started it.
- `data` survives across transitions: a fresh `running` or `error` snapshot carries the last successful result so call sites can keep rendering stale content while a retry is in flight. Only `reset()` clears it.
- `reset()` aborts the in-flight dispatch and restores the idle snapshot, clearing both `data` and `error`.
Expand Down
101 changes: 101 additions & 0 deletions packages/subscribable/src/__tests__/reactive-action-store-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,107 @@ describe('createReactiveActionStore', () => {
});
});

describe('withSignal()', () => {
it('forwards a non-aborted internal signal to `fn` when called via the bare `dispatch`', async () => {
expect.assertions(1);
const fn = jest.fn((_signal: AbortSignal) => Promise.resolve('ok'));
const store = createReactiveActionStore(fn);
await store.dispatchAsync();
expect(fn.mock.calls[0][0].aborted).toBe(false);
});

it('aborts the in-flight dispatch and transitions to `error` when the caller-provided signal fires', async () => {
expect.assertions(2);
const ctrl = new AbortController();
const reason = new Error('per-request abort');
const store = createReactiveActionStore(() => new Promise<string>(() => {}));
const dispatched = store.withSignal(ctrl.signal).dispatchAsync();
ctrl.abort(reason);
await expect(dispatched).rejects.toBe(reason);
expect(store.getState()).toStrictEqual({
data: undefined,
error: reason,
status: 'error',
});
});

it('preserves stale data when the caller-provided signal aborts after a prior success', async () => {
expect.assertions(1);
const ctrl = new AbortController();
const reason = new Error('abort');
const { promise: second } = Promise.withResolvers<string>();
const results = [Promise.resolve('first'), second];
const store = createReactiveActionStore(() => results.shift()!);
await store.dispatchAsync();
const dispatched = store.withSignal(ctrl.signal).dispatchAsync();
ctrl.abort(reason);
await dispatched.catch(() => {});
expect(store.getState()).toStrictEqual({
data: 'first',
error: reason,
status: 'error',
});
});

it('lets later dispatches recover when a prior call started with an already-aborted signal', async () => {
expect.assertions(2);
const store = createReactiveActionStore(() => Promise.resolve('ok'));
await store
.withSignal(AbortSignal.abort(new Error('first')))
.dispatchAsync()
.catch(() => {});
expect(store.getState().status).toBe('error');
await store.dispatchAsync();
expect(store.getState()).toStrictEqual({
data: 'ok',
error: undefined,
status: 'success',
});
});

it('passes a combined signal to `fn` that fires when the caller-provided signal aborts', async () => {
expect.assertions(1);
const ctrl = new AbortController();
let captured: AbortSignal | undefined;
const store = createReactiveActionStore((signal: AbortSignal) => {
captured = signal;
return new Promise<string>(() => {});
});
store.withSignal(ctrl.signal).dispatch();
ctrl.abort(new Error('boom'));
await Promise.resolve();
expect(captured?.aborted).toBe(true);
});

it('lets the caller vary the signal across dispatches on the same store', async () => {
expect.assertions(2);
const fn = jest.fn((_signal: AbortSignal) => Promise.resolve('ok'));
const store = createReactiveActionStore(fn);
const ctrlA = new AbortController();
const ctrlB = new AbortController();
await store.withSignal(ctrlA.signal).dispatchAsync();
await store.withSignal(ctrlB.signal).dispatchAsync();
// Aborting one controller only fires its own dispatch's composed signal.
ctrlA.abort(new Error('only-A'));
expect(fn.mock.calls[0][0].aborted).toBe(true);
expect(fn.mock.calls[1][0].aborted).toBe(false);
});

it('lets a wrapper be reused as a "kill switch" across dispatches', async () => {
expect.assertions(2);
const killCtrl = new AbortController();
const fn = jest.fn((_signal: AbortSignal) => Promise.resolve('ok'));
const store = createReactiveActionStore(fn);
const killable = store.withSignal(killCtrl.signal);
await killable.dispatchAsync();
await killable.dispatchAsync();
killCtrl.abort(new Error('killed'));
// Both completed dispatches saw a signal that's now aborted.
expect(fn.mock.calls[0][0].aborted).toBe(true);
expect(fn.mock.calls[1][0].aborted).toBe(true);
});
});

describe('reset()', () => {
it('returns the store to idle from a success state', async () => {
expect.assertions(1);
Expand Down
71 changes: 59 additions & 12 deletions packages/subscribable/src/reactive-action-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export type ReactiveActionStore<TArgs extends readonly unknown[], TResult> = {
* no state update. Use from UI event handlers; there's no promise to handle or `.catch`.
*
* @see {@link ReactiveActionStore.dispatchAsync} when you need the resolved value or propagated errors.
* @see {@link ReactiveActionStore.withSignal} to attach a caller-provided `AbortSignal` to a dispatch.
*/
readonly dispatch: (...args: TArgs) => void;
/**
Expand All @@ -46,20 +47,49 @@ export type ReactiveActionStore<TArgs extends readonly unknown[], TResult> = {
readonly reset: () => void;
/** Registers a listener called on every state change. Returns an unsubscribe function. */
readonly subscribe: (listener: () => void) => () => void;
/**
* Returns a thin wrapper exposing `dispatch` / `dispatchAsync` that compose `signal` with the
* store's internal per-dispatch controller via `AbortSignal.any` — aborting either cancels
* the in-flight call. Aborting the caller-provided signal surfaces the abort reason on state
* as `{ status: 'error' }`; the internal controller path (supersession by a newer dispatch or
* `reset()`) is silent by design so the newer dispatch owns state. Use this to attach a
* caller-provided cancellation source (per-attempt timeout, shared kill switch, parent-context
* signal) without touching the bare `dispatch` / `dispatchAsync` API.
*
* - Per-attempt timeout: `store.withSignal(AbortSignal.timeout(5_000)).dispatch(args)` — fresh
* clock per call.
* - Permanent kill switch: hold one `AbortController`, bind the wrapper once
* (`const killable = store.withSignal(killCtrl.signal)`), and use `killable.dispatch(...)`
* everywhere; aborting the controller cancels in-flight and short-circuits future calls.
*
* The wrapper exposes only `dispatch` / `dispatchAsync` — `getState` / `subscribe` / `reset`
* remain store-level concerns on the parent.
*/
readonly withSignal: (signal: AbortSignal) => {
readonly dispatch: (...args: TArgs) => void;
readonly dispatchAsync: (...args: TArgs) => Promise<TResult>;
};
};

/**
* Duck-type for objects that build a {@link ReactiveActionStore} on demand via a zero-argument
* `reactiveStore()` method. Satisfied by `PendingRpcRequest<T>`. The `[]` argument tuple is
* intentional — the operation's arguments are already baked into the pending request, so each
* `dispatch()` re-fires the same call.
* Duck-type for objects that build a {@link ReactiveActionStore} on demand via `reactiveStore()`.
* Satisfied by `PendingRpcRequest<T>`. The `[]` argument tuple is intentional — the operation's
* arguments are already baked into the pending request, so each `dispatch()` re-fires the same
* call.
*
* The returned store is in the `idle` state — the caller is responsible for calling `dispatch()`
* to fire the first attempt. Attach a caller-provided cancellation source per dispatch via
* `store.withSignal(signal).dispatch(...)` — see {@link ReactiveActionStore.withSignal}.
*
* @typeParam T - The value type resolved by the wrapped operation.
*
* @example
* ```ts
* function bind<T>(source: ReactiveActionSource<T>) {
* return source.reactiveStore();
* const store = source.reactiveStore();
* // Per-attempt timeout, fresh signal per call:
* store.withSignal(AbortSignal.timeout(30_000)).dispatch();
* return store;
* }
* ```
*
Expand All @@ -82,13 +112,16 @@ const IDLE_STATE: ReactiveActionState<never> = Object.freeze({
* so only the most recent dispatch can mutate state.
*
* The wrapped function receives the `AbortSignal` as its first argument, followed by whatever
* arguments were passed to `dispatch`.
* arguments were passed to `dispatch`. Callers attach their own cancellation source per-call via
* {@link ReactiveActionStore.withSignal} — `store.withSignal(signal).dispatch(...)`. The caller's
* signal is composed with the per-dispatch controller via `AbortSignal.any`, so aborting it
* cancels the in-flight call and surfaces the abort reason on state.
*
* @typeParam TArgs - Argument tuple forwarded from `dispatch` to `fn`.
* @typeParam TResult - Resolved value type of `fn`.
* @param fn - Async function to wrap. Receives an {@link AbortSignal} plus the dispatch arguments.
* @return A {@link ReactiveActionStore} exposing `dispatch`, `dispatchAsync`, `getState`, `subscribe`,
* and `reset`.
* `reset`, and `withSignal`.
*
* @example
* ```ts
Expand All @@ -100,7 +133,10 @@ const IDLE_STATE: ReactiveActionState<never> = Object.freeze({
* store.subscribe(() => console.log(store.getState()));
* store.dispatch(someAccountId); // fire-and-forget; state is the source of truth
*
* // Or, when you need the resolved value imperatively:
* // Per-attempt timeout — fresh signal per call:
* store.withSignal(AbortSignal.timeout(30_000)).dispatch(someAccountId);
*
* // Imperative call with the resolved value:
* const account = await store.dispatchAsync(someAccountId);
* ```
*
Expand All @@ -121,11 +157,11 @@ export function createReactiveActionStore<TArgs extends readonly unknown[], TRes
listeners.forEach(listener => listener());
}

const dispatchAsync = async (...args: TArgs): Promise<TResult> => {
const dispatchAsyncWithSignal = async (userSignal: AbortSignal | undefined, ...args: TArgs): Promise<TResult> => {
currentController?.abort();
const controller = new AbortController();
currentController = controller;
const { signal } = controller;
const signal = userSignal ? AbortSignal.any([controller.signal, userSignal]) : controller.signal;
const previousData = state.data;
setState({ data: previousData, error: undefined, status: 'running' });
try {
Expand All @@ -136,14 +172,19 @@ export function createReactiveActionStore<TArgs extends readonly unknown[], TRes
setState({ data: result, error: undefined, status: 'success' });
return result;
} catch (error) {
if (signal.aborted) {
throw signal.reason;
// Superseded by a newer dispatch or `reset()` — drop silently so only the most recent
// dispatch mutates state, and reject with the abort reason rather than any underlying
// failure that happened to race the abort.
if (controller.signal.aborted) {
throw controller.signal.reason;
}
// Real failure or the caller-provided signal firing — surface as error state.
setState({ data: previousData, error, status: 'error' });
throw error;
}
};

const dispatchAsync = (...args: TArgs): Promise<TResult> => dispatchAsyncWithSignal(undefined, ...args);
const dispatch = (...args: TArgs): void => {
dispatchAsync(...args).catch(() => {});
};
Expand All @@ -163,5 +204,11 @@ export function createReactiveActionStore<TArgs extends readonly unknown[], TRes
listeners.delete(listener);
};
},
withSignal: (signal: AbortSignal) => ({
dispatch: (...args: TArgs): void => {
dispatchAsyncWithSignal(signal, ...args).catch(() => {});
},
dispatchAsync: (...args: TArgs): Promise<TResult> => dispatchAsyncWithSignal(signal, ...args),
}),
};
}
30 changes: 30 additions & 0 deletions packages/test-config/browser-environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,35 @@ export default class BrowserEnvironment extends TestEnvironment {
*/
this.global.ArrayBuffer = globalThis.ArrayBuffer;
this.global.Uint8Array = globalThis.Uint8Array;
/**
* Polyfill `AbortSignal.any` on jsdom's `AbortSignal` class if missing. jsdom 22 doesn't
* implement it (added natively in jsdom 24, in Node 20.3, and shipped in all current
* browsers). Cross-realm replacement of the whole class would break jsdom's internal
* brand checks elsewhere, so this patches just the static method.
*/
const JsdomAbortSignal = this.global.AbortSignal as typeof globalThis.AbortSignal & {
any?: typeof globalThis.AbortSignal.any;
};
const JsdomAbortController = this.global.AbortController as typeof globalThis.AbortController;
Comment on lines +19 to +22
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider bumping jsdom to version 24 in a subsequent PR?

if (typeof JsdomAbortSignal.any !== 'function') {
JsdomAbortSignal.any = function any(signals: readonly AbortSignal[]): AbortSignal {
const controller = new JsdomAbortController();
const alreadyAborted = signals.find(s => s.aborted);
if (alreadyAborted) {
controller.abort(alreadyAborted.reason);
return controller.signal;
}
for (const inputSignal of signals) {
inputSignal.addEventListener(
'abort',
() => {
if (!controller.signal.aborted) controller.abort(inputSignal.reason);
},
{ once: true, signal: controller.signal },
);
}
Comment on lines +24 to +39
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Polyfill looks correct. Two small things:

  1. Inside the listener, this.reason relies on this being bound to the input signal that fired the event. That works because addEventListener invokes listeners with this === currentTarget, but it's the kind of thing TypeScript-strict reviewers tend to flag. event.currentTarget (or capturing the input signal in the closure) is slightly more explicit. Style call.
  2. JsdomAbortController is typed as typeof globalThis.AbortController — fine for the polyfill, but note that the controller.signal returned here is a jsdom AbortSignal. That's correct (we want it to pass jsdom's brand checks), just worth a mental note if anyone debugs realm issues later.

return controller.signal;
};
}
}
}
Loading