From 4d72b50498e8ae5a01c6f9188cb99d0c3230a179 Mon Sep 17 00:00:00 2001 From: Callum Date: Tue, 12 May 2026 17:45:25 +0000 Subject: [PATCH] Add `withSignal()` to `ReactiveActionStore` for per-dispatch cancellation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `ReactiveActionStore` now exposes `withSignal(signal)` — a thin wrapper that returns `{ dispatch, dispatchAsync }` bindings composing the caller's signal with the store's internal per-dispatch controller via `AbortSignal.any`. Aborting either cancels the in-flight call and surfaces the abort reason on state. The bare `dispatch` / `dispatchAsync` signatures are unchanged, so this is additive — no existing caller breaks. The wrapper supports two patterns. Per-attempt timeout: `store.withSignal(AbortSignal.timeout(5_000)).dispatch(args)` — a fresh clock per call, with different call sites free to pass different timeouts without rebuilding the store. Shared kill switch: hold one `AbortController`, bind the wrapper once (`const killable = store.withSignal(killCtrl.signal)`), use `killable.dispatch(...)` everywhere; aborting the controller cancels the in-flight call and short-circuits future calls on that wrapper. `PendingRpcRequest.reactiveStore()` (and the `ReactiveActionSource` duck-type) also accepts an optional `{ signal?: AbortSignal }` so a caller-provided cancellation source can be attached to the initial dispatch fired implicitly by `reactiveStore()` — same role `abortSignal` plays for `send()`. The `signal` is not a store-level setting; subsequent dispatches on the returned store go through `store.withSignal(...).dispatch(...)` like any other store. The shared `@solana/test-config` browser environment polyfills `AbortSignal.any` because jsdom 22 (the version pinned here) doesn't ship it. Replacing the AbortSignal class wholesale would break jsdom's brand checks for `addEventListener({ signal })`, so the patch is limited to the missing static method. --- .changeset/cool-coats-invent.md | 10 ++ packages/rpc-spec/src/rpc.ts | 12 +-- packages/subscribable/README.md | 16 +++ .../__tests__/reactive-action-store-test.ts | 101 ++++++++++++++++++ .../subscribable/src/reactive-action-store.ts | 71 +++++++++--- packages/test-config/browser-environment.ts | 30 ++++++ 6 files changed, 222 insertions(+), 18 deletions(-) create mode 100644 .changeset/cool-coats-invent.md diff --git a/.changeset/cool-coats-invent.md b/.changeset/cool-coats-invent.md new file mode 100644 index 000000000..49c88a5f4 --- /dev/null +++ b/.changeset/cool-coats-invent.md @@ -0,0 +1,10 @@ +--- +'@solana/subscribable': minor +--- + +Add `store.withSignal(signal)` on `ReactiveActionStore` for attaching a caller-provided `AbortSignal` to a dispatch. The method returns a thin wrapper exposing only `dispatch` / `dispatchAsync`; the supplied signal is composed with the store's internal per-dispatch controller via `AbortSignal.any`, so aborting either cancels the in-flight call and surfaces the abort reason on state. The bare `dispatch` / `dispatchAsync` signatures are unchanged — this is additive. + +Two common patterns the wrapper enables: + +- **Per-attempt timeout.** `store.withSignal(AbortSignal.timeout(5_000)).dispatch(args)` — a fresh clock per call. Different call sites can pass different timeouts. +- **Shared kill switch.** Hold one `AbortController`, bind the wrapper once (`const killable = store.withSignal(killCtrl.signal)`), and use `killable.dispatch(...)` everywhere. Aborting the controller cancels the current call and makes future calls on the wrapper start aborted. diff --git a/packages/rpc-spec/src/rpc.ts b/packages/rpc-spec/src/rpc.ts index 7d14e4565..0a1c3eea0 100644 --- a/packages/rpc-spec/src/rpc.ts +++ b/packages/rpc-spec/src/rpc.ts @@ -37,18 +37,18 @@ export type PendingRpcRequest = { /** * Synchronously returns a {@link ReactiveActionStore} in the `idle` state, ready to dispatch * the underlying request. Compatible with `useSyncExternalStore` and other reactive primitives - * that expect a `{ subscribe, getState }` contract. Call `dispatch()` to fire the request, - * again on retry, or `reset()` to abort the in-flight call and return to `status: 'idle'`. + * that expect a `{ subscribe, getState }` contract. Call `dispatch()` to fire the request + * (again on retry), or `reset()` to abort the in-flight call and return to `status: 'idle'`. * * Unlike {@link PendingRpcRequest.send}, this method does not fire the request on creation — - * the caller is responsible for dispatching. This makes signal handling uniform between - * `reactiveStore`-derived stores and stores created directly from `createReactiveActionStore` - * (which also do not auto-fire). + * the caller is responsible for dispatching. This makes signal handling uniform: attach a + * caller-provided cancellation source per dispatch via + * `store.withSignal(signal).dispatch(...)`. * * @example * ```ts * const store = rpc.getAccountInfo(address).reactiveStore(); - * store.dispatch(); // fire the initial request + * store.withSignal(AbortSignal.timeout(5_000)).dispatch(); // fire with a per-attempt timeout * const state = useSyncExternalStore(store.subscribe, store.getState); * if (state.status === 'error') return ; * if (state.status === 'running' && !state.data) return ; diff --git a/packages/subscribable/README.md b/packages/subscribable/README.md index c7f192189..1a0a2ac05 100644 --- a/packages/subscribable/README.md +++ b/packages/subscribable/README.md @@ -134,6 +134,22 @@ Things to note: - Two ways to trigger the action: - `dispatch(...)` — fire-and-forget. Returns `undefined` synchronously and never throws; safe to call from UI event handlers without a `.catch`. Failures surface on state as `{ status: 'error' }`. - `dispatchAsync(...)` — returns a promise that resolves to the wrapped function's result. Rejects on failure and with an `AbortError` when superseded or `reset()`. Use from imperative code that needs the resolved value; pair with [`isAbortError`](../promises#isaborterrorerr) from `@solana/promises` to filter abort rejections. +- Attach a caller-provided `AbortSignal` to a `dispatch` or `dispatchAsync` call via `store.withSignal(signal)`: + + ```ts + // Per-attempt timeout — fresh signal per call: + store.withSignal(AbortSignal.timeout(5_000)).dispatch(someAccountId); + + // Shared kill switch — bind the wrapper once, reuse everywhere: + const killCtrl = new AbortController(); + const killable = store.withSignal(killCtrl.signal); + killable.dispatch(someAccountId); + killable.dispatch(someAccountId); + killCtrl.abort(); // cancels in-flight and short-circuits future calls + ``` + + The wrapped signal is composed with the store's internal per-dispatch controller via `AbortSignal.any`, so aborting either cancels the in-flight call and surfaces the abort reason on state. The wrapper exposes only `dispatch` / `dispatchAsync` — `getState` / `subscribe` / `reset` stay on the parent store. + - Calling either dispatch while one is in flight aborts the previous call; its outcome is dropped from state regardless of which variant started it. - `data` survives across transitions: a fresh `running` or `error` snapshot carries the last successful result so call sites can keep rendering stale content while a retry is in flight. Only `reset()` clears it. - `reset()` aborts the in-flight dispatch and restores the idle snapshot, clearing both `data` and `error`. diff --git a/packages/subscribable/src/__tests__/reactive-action-store-test.ts b/packages/subscribable/src/__tests__/reactive-action-store-test.ts index 531ca380e..029cc9d39 100644 --- a/packages/subscribable/src/__tests__/reactive-action-store-test.ts +++ b/packages/subscribable/src/__tests__/reactive-action-store-test.ts @@ -174,6 +174,107 @@ describe('createReactiveActionStore', () => { }); }); + describe('withSignal()', () => { + it('forwards a non-aborted internal signal to `fn` when called via the bare `dispatch`', async () => { + expect.assertions(1); + const fn = jest.fn((_signal: AbortSignal) => Promise.resolve('ok')); + const store = createReactiveActionStore(fn); + await store.dispatchAsync(); + expect(fn.mock.calls[0][0].aborted).toBe(false); + }); + + it('aborts the in-flight dispatch and transitions to `error` when the caller-provided signal fires', async () => { + expect.assertions(2); + const ctrl = new AbortController(); + const reason = new Error('per-request abort'); + const store = createReactiveActionStore(() => new Promise(() => {})); + const dispatched = store.withSignal(ctrl.signal).dispatchAsync(); + ctrl.abort(reason); + await expect(dispatched).rejects.toBe(reason); + expect(store.getState()).toStrictEqual({ + data: undefined, + error: reason, + status: 'error', + }); + }); + + it('preserves stale data when the caller-provided signal aborts after a prior success', async () => { + expect.assertions(1); + const ctrl = new AbortController(); + const reason = new Error('abort'); + const { promise: second } = Promise.withResolvers(); + const results = [Promise.resolve('first'), second]; + const store = createReactiveActionStore(() => results.shift()!); + await store.dispatchAsync(); + const dispatched = store.withSignal(ctrl.signal).dispatchAsync(); + ctrl.abort(reason); + await dispatched.catch(() => {}); + expect(store.getState()).toStrictEqual({ + data: 'first', + error: reason, + status: 'error', + }); + }); + + it('lets later dispatches recover when a prior call started with an already-aborted signal', async () => { + expect.assertions(2); + const store = createReactiveActionStore(() => Promise.resolve('ok')); + await store + .withSignal(AbortSignal.abort(new Error('first'))) + .dispatchAsync() + .catch(() => {}); + expect(store.getState().status).toBe('error'); + await store.dispatchAsync(); + expect(store.getState()).toStrictEqual({ + data: 'ok', + error: undefined, + status: 'success', + }); + }); + + it('passes a combined signal to `fn` that fires when the caller-provided signal aborts', async () => { + expect.assertions(1); + const ctrl = new AbortController(); + let captured: AbortSignal | undefined; + const store = createReactiveActionStore((signal: AbortSignal) => { + captured = signal; + return new Promise(() => {}); + }); + store.withSignal(ctrl.signal).dispatch(); + ctrl.abort(new Error('boom')); + await Promise.resolve(); + expect(captured?.aborted).toBe(true); + }); + + it('lets the caller vary the signal across dispatches on the same store', async () => { + expect.assertions(2); + const fn = jest.fn((_signal: AbortSignal) => Promise.resolve('ok')); + const store = createReactiveActionStore(fn); + const ctrlA = new AbortController(); + const ctrlB = new AbortController(); + await store.withSignal(ctrlA.signal).dispatchAsync(); + await store.withSignal(ctrlB.signal).dispatchAsync(); + // Aborting one controller only fires its own dispatch's composed signal. + ctrlA.abort(new Error('only-A')); + expect(fn.mock.calls[0][0].aborted).toBe(true); + expect(fn.mock.calls[1][0].aborted).toBe(false); + }); + + it('lets a wrapper be reused as a "kill switch" across dispatches', async () => { + expect.assertions(2); + const killCtrl = new AbortController(); + const fn = jest.fn((_signal: AbortSignal) => Promise.resolve('ok')); + const store = createReactiveActionStore(fn); + const killable = store.withSignal(killCtrl.signal); + await killable.dispatchAsync(); + await killable.dispatchAsync(); + killCtrl.abort(new Error('killed')); + // Both completed dispatches saw a signal that's now aborted. + expect(fn.mock.calls[0][0].aborted).toBe(true); + expect(fn.mock.calls[1][0].aborted).toBe(true); + }); + }); + describe('reset()', () => { it('returns the store to idle from a success state', async () => { expect.assertions(1); diff --git a/packages/subscribable/src/reactive-action-store.ts b/packages/subscribable/src/reactive-action-store.ts index cc3a8b8a6..d6802e437 100644 --- a/packages/subscribable/src/reactive-action-store.ts +++ b/packages/subscribable/src/reactive-action-store.ts @@ -31,6 +31,7 @@ export type ReactiveActionStore = { * no state update. Use from UI event handlers; there's no promise to handle or `.catch`. * * @see {@link ReactiveActionStore.dispatchAsync} when you need the resolved value or propagated errors. + * @see {@link ReactiveActionStore.withSignal} to attach a caller-provided `AbortSignal` to a dispatch. */ readonly dispatch: (...args: TArgs) => void; /** @@ -46,20 +47,49 @@ export type ReactiveActionStore = { readonly reset: () => void; /** Registers a listener called on every state change. Returns an unsubscribe function. */ readonly subscribe: (listener: () => void) => () => void; + /** + * Returns a thin wrapper exposing `dispatch` / `dispatchAsync` that compose `signal` with the + * store's internal per-dispatch controller via `AbortSignal.any` — aborting either cancels + * the in-flight call. Aborting the caller-provided signal surfaces the abort reason on state + * as `{ status: 'error' }`; the internal controller path (supersession by a newer dispatch or + * `reset()`) is silent by design so the newer dispatch owns state. Use this to attach a + * caller-provided cancellation source (per-attempt timeout, shared kill switch, parent-context + * signal) without touching the bare `dispatch` / `dispatchAsync` API. + * + * - Per-attempt timeout: `store.withSignal(AbortSignal.timeout(5_000)).dispatch(args)` — fresh + * clock per call. + * - Permanent kill switch: hold one `AbortController`, bind the wrapper once + * (`const killable = store.withSignal(killCtrl.signal)`), and use `killable.dispatch(...)` + * everywhere; aborting the controller cancels in-flight and short-circuits future calls. + * + * The wrapper exposes only `dispatch` / `dispatchAsync` — `getState` / `subscribe` / `reset` + * remain store-level concerns on the parent. + */ + readonly withSignal: (signal: AbortSignal) => { + readonly dispatch: (...args: TArgs) => void; + readonly dispatchAsync: (...args: TArgs) => Promise; + }; }; /** - * Duck-type for objects that build a {@link ReactiveActionStore} on demand via a zero-argument - * `reactiveStore()` method. Satisfied by `PendingRpcRequest`. The `[]` argument tuple is - * intentional — the operation's arguments are already baked into the pending request, so each - * `dispatch()` re-fires the same call. + * Duck-type for objects that build a {@link ReactiveActionStore} on demand via `reactiveStore()`. + * Satisfied by `PendingRpcRequest`. The `[]` argument tuple is intentional — the operation's + * arguments are already baked into the pending request, so each `dispatch()` re-fires the same + * call. + * + * The returned store is in the `idle` state — the caller is responsible for calling `dispatch()` + * to fire the first attempt. Attach a caller-provided cancellation source per dispatch via + * `store.withSignal(signal).dispatch(...)` — see {@link ReactiveActionStore.withSignal}. * * @typeParam T - The value type resolved by the wrapped operation. * * @example * ```ts * function bind(source: ReactiveActionSource) { - * return source.reactiveStore(); + * const store = source.reactiveStore(); + * // Per-attempt timeout, fresh signal per call: + * store.withSignal(AbortSignal.timeout(30_000)).dispatch(); + * return store; * } * ``` * @@ -82,13 +112,16 @@ const IDLE_STATE: ReactiveActionState = Object.freeze({ * so only the most recent dispatch can mutate state. * * The wrapped function receives the `AbortSignal` as its first argument, followed by whatever - * arguments were passed to `dispatch`. + * arguments were passed to `dispatch`. Callers attach their own cancellation source per-call via + * {@link ReactiveActionStore.withSignal} — `store.withSignal(signal).dispatch(...)`. The caller's + * signal is composed with the per-dispatch controller via `AbortSignal.any`, so aborting it + * cancels the in-flight call and surfaces the abort reason on state. * * @typeParam TArgs - Argument tuple forwarded from `dispatch` to `fn`. * @typeParam TResult - Resolved value type of `fn`. * @param fn - Async function to wrap. Receives an {@link AbortSignal} plus the dispatch arguments. * @return A {@link ReactiveActionStore} exposing `dispatch`, `dispatchAsync`, `getState`, `subscribe`, - * and `reset`. + * `reset`, and `withSignal`. * * @example * ```ts @@ -100,7 +133,10 @@ const IDLE_STATE: ReactiveActionState = Object.freeze({ * store.subscribe(() => console.log(store.getState())); * store.dispatch(someAccountId); // fire-and-forget; state is the source of truth * - * // Or, when you need the resolved value imperatively: + * // Per-attempt timeout — fresh signal per call: + * store.withSignal(AbortSignal.timeout(30_000)).dispatch(someAccountId); + * + * // Imperative call with the resolved value: * const account = await store.dispatchAsync(someAccountId); * ``` * @@ -121,11 +157,11 @@ export function createReactiveActionStore listener()); } - const dispatchAsync = async (...args: TArgs): Promise => { + const dispatchAsyncWithSignal = async (userSignal: AbortSignal | undefined, ...args: TArgs): Promise => { currentController?.abort(); const controller = new AbortController(); currentController = controller; - const { signal } = controller; + const signal = userSignal ? AbortSignal.any([controller.signal, userSignal]) : controller.signal; const previousData = state.data; setState({ data: previousData, error: undefined, status: 'running' }); try { @@ -136,14 +172,19 @@ export function createReactiveActionStore => dispatchAsyncWithSignal(undefined, ...args); const dispatch = (...args: TArgs): void => { dispatchAsync(...args).catch(() => {}); }; @@ -163,5 +204,11 @@ export function createReactiveActionStore ({ + dispatch: (...args: TArgs): void => { + dispatchAsyncWithSignal(signal, ...args).catch(() => {}); + }, + dispatchAsync: (...args: TArgs): Promise => dispatchAsyncWithSignal(signal, ...args), + }), }; } diff --git a/packages/test-config/browser-environment.ts b/packages/test-config/browser-environment.ts index d0835c115..fb7d1e7c3 100644 --- a/packages/test-config/browser-environment.ts +++ b/packages/test-config/browser-environment.ts @@ -10,5 +10,35 @@ export default class BrowserEnvironment extends TestEnvironment { */ this.global.ArrayBuffer = globalThis.ArrayBuffer; this.global.Uint8Array = globalThis.Uint8Array; + /** + * Polyfill `AbortSignal.any` on jsdom's `AbortSignal` class if missing. jsdom 22 doesn't + * implement it (added natively in jsdom 24, in Node 20.3, and shipped in all current + * browsers). Cross-realm replacement of the whole class would break jsdom's internal + * brand checks elsewhere, so this patches just the static method. + */ + const JsdomAbortSignal = this.global.AbortSignal as typeof globalThis.AbortSignal & { + any?: typeof globalThis.AbortSignal.any; + }; + const JsdomAbortController = this.global.AbortController as typeof globalThis.AbortController; + if (typeof JsdomAbortSignal.any !== 'function') { + JsdomAbortSignal.any = function any(signals: readonly AbortSignal[]): AbortSignal { + const controller = new JsdomAbortController(); + const alreadyAborted = signals.find(s => s.aborted); + if (alreadyAborted) { + controller.abort(alreadyAborted.reason); + return controller.signal; + } + for (const inputSignal of signals) { + inputSignal.addEventListener( + 'abort', + () => { + if (!controller.signal.aborted) controller.abort(inputSignal.reason); + }, + { once: true, signal: controller.signal }, + ); + } + return controller.signal; + }; + } } }