diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b3c33de..eaf4404 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,6 +38,7 @@ jobs: steps: - uses: actions/checkout@v4 - uses: wyvox/action-setup-pnpm@v3 + with: { node-version: 22 } - run: pnpm install - run: pnpm build - run: pnpm vitest ${{ matrix.testenv.args }} diff --git a/README.md b/README.md index 715e23a..b0f1473 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ npm add signal-utils signal-polyfill - subtle utilities - [effect](#leaky-effect-via-queuemicrotask) - [reaction](#reaction) + - [Batched Effects](#batched-effects) + - [AsyncComputed](#asynccomputed) ### `@signal` @@ -543,6 +545,73 @@ batch(() => { Synchronous batched effects can be useful when abstracting over signals to use them as a backing storage mechanism. In some cases you may want the effect of a signal update to be synchronously observable, but also to allow batching when possible for the usual performacne and coherence reasons. +#### AsyncComputed + +The `AsyncComputed` class reprents an _async_ computation that consumes other signals. + +While computing a value based on other signals _synchronously_ is covered by the core signals API, computing a value _asynchronously_ is not. (There is an ongoing [discussion about how to handle async computations](https://github.com/tc39/proposal-signals/issues/30 however). + +`AsyncComputed` is similar to `Signal.Computed`, except that it takes an async (or Promise-returning) function as the computation function. + +All _synchronous_ access to signals within the function is tracked (by running it within a `Signal.Computed`), so that when the signal dependencies change, the computation is rerun. New runs of the async computation preempt pending runs of the computation. + +```ts +import {AsyncComputed} from 'signal-utils/async-computed'; + +const count = new Signal.State(1); + +const asyncDoubled = new AsyncComputed(async () => { + // Wait 10ms + await new Promise((res) => setTimeout(res, 10)); + + return count.get() * 2; +}); + +console.log(asyncDoubled.status); // Logs: pending +console.log(asyncDoubled.value); // Logs: undefined + +await asyncDoubled.complete; + +console.log(asyncDoubled.status); // Logs: complete +console.log(asyncDoubled.value); // Logs: 2 +``` + +An `AsyncComputed` instance tracks its "status", which is either `"initial"`, +`"pending"`, `"complete"`, or `"error"`. + +##### AsyncComputed API + +- `constructor(fn, options)` + - arguments: + - `fn: (abortSignal: AbortSignal) => Promise`: The compute function. + Synchronous signal access (before the first await) is tracked. + + If a run is preempted by another run because dependencies change, the + AbortSignal will abort. It's recomended to call `signal.throwIfAborted()` + after any `await`. + - `options?: AsyncComputedOptions`: + - `initialValue`: The initial value to return from `.value` before the + computation has yet run. +- `status: "initial" | "pending" | "complete" | "error"` +- `value: T | undefined`: The last value that the compute function resolved + with, or `undefined` if the last run of the compute function threw an error. + If the compute function has not yet been run `value` will be the value of the + `initialValue` or `undefined`. +- `error: unknown`: The last error that the compute function threw, or + `undefined` if the last run of the compute function resolved successfully, or + if the compute function has not yet been run. +- `complete: Promise`: A promise that resolves when the compute function has + completed, or rejects if the compute function throws an error. + + If a new run of the compute function is started before the previous run has + completed, the promise will resolve with the result of the new run. +- `run(): void`: Runs the compute function if it is not already running and its + dependencies have changed. +- `get(): T | undefined`: Retruns the current `value` or throws if the last + completion result was an error. This method is best used for accessing from + other computed signals, since it will propagate error states into the other + computed signals. + ## Contributing See: [./CONTRIBUTING.md](CONTRIBUTING.md) diff --git a/package.json b/package.json index 81414d9..d298714 100644 --- a/package.json +++ b/package.json @@ -80,7 +80,7 @@ }, "packageManager": "pnpm@9.0.6", "volta": { - "node": "20.12.2", + "node": "22.0.0", "pnpm": "9.0.6" } } diff --git a/src/async-computed.ts b/src/async-computed.ts new file mode 100644 index 0000000..08d912f --- /dev/null +++ b/src/async-computed.ts @@ -0,0 +1,229 @@ +import { Signal } from "signal-polyfill"; + +export type AsyncComputedStatus = "initial" | "pending" | "complete" | "error"; + +export interface AsyncComputedOptions { + /** + * The initial value of the AsyncComputed. + */ + initialValue?: T; +} + +/** + * A signal-like object that represents an asynchronous computation. + * + * AsyncComputed takes a compute function that performs an asynchronous + * computation and runs it inside a computed signal, while tracking the status + * of the computation, including its most recent completion value and error. + * + * Compute functions are run when the `value`, `error`, or `complete` properties + * are read, or when `get()` or `run()` are called, and are re-run when any + * signals that they read change. + * + * If a new run of the compute function is started before the previous run has + * completed, the previous run will have its AbortSignal aborted, and the result + * of the previous run will be ignored. + */ +export class AsyncComputed { + // Whether we have been notified of a pending update from the watcher. This is + // set synchronously when any dependencies of the compute function change. + #isNotified = false; + #status = new Signal.State("initial"); + + /** + * The current status of the AsyncComputed, which is one of 'initial', + * 'pending', 'complete', or 'error'. + * + * The status will be 'initial' until the compute function is first run. + * + * The status will be 'pending' while the compute function is running. If the + * status is 'pending', the `value` and `error` properties will be the result + * of the previous run of the compute function. + * + * The status will be 'complete' when the compute function has completed + * successfully. If the status is 'complete', the `value` property will be the + * result of the previous run of the compute function and the `error` property + * will be `undefined`. + * + * The status will be 'error' when the compute function has completed with an + * error. If the status is 'error', the `error` property will be the error + * that was thrown by the previous run of the compute function and the `value` + * property will be `undefined`. + * + * This value is read from a signal, so any signals that read it will be + * tracked as dependents of it. + * + * Accessing this property will cause the compute function to run if it hasn't + * already. + */ + get status() { + // Unconditionally read the status signal to ensure that any signals that + // read it are tracked as dependents. + const currentState = this.#status.get(); + // Read from the non-signal #isNotified field, which can be set by the + // watcher synchronously. + return this.#isNotified ? "pending" : currentState; + } + + #value: Signal.State; + + /** + * The last value that the compute function resolved with, or `undefined` if + * the last run of the compute function threw an error. If the compute + * function has not yet been run `value` will be the value of the + * `initialValue` or `undefined`. + * + * This value is read from a signal, so any signals that read it will be + * tracked as dependents of it. + * + * Accessing this property will cause the compute function to run if it hasn't + * already. + */ + get value() { + this.run(); + return this.#value.get(); + } + + #error = new Signal.State(undefined); + + /** + * The last error that the compute function threw, or `undefined` if the last + * run of the compute function resolved successfully, or if the compute + * function has not yet been run. + * + * This value is read from a signal, so any signals that read it will be + * tracked as dependents of it. + * + * Accessing this property will cause the compute function to run if it hasn't + * already. + */ + get error() { + this.run(); + return this.#error.get(); + } + + #deferred = new Signal.State | undefined>(undefined); + + /** + * A promise that resolves when the compute function has completed, or rejects + * if the compute function throws an error. + * + * If a new run of the compute function is started before the previous run has + * completed, the promise will resolve with the result of the new run. + * + * This value is read from a signal, so any signals that read it will be + * tracked as dependents of it. The identity of the promise will change if the + * compute function is re-run after having completed or errored. + * + * Accessing this property will cause the compute function to run if it hasn't + * already. + */ + get complete(): Promise { + this.run(); + // run() will have created a new deferred if needed. + return this.#deferred.get()!.promise; + } + + #computed: Signal.Computed; + + #watcher: Signal.subtle.Watcher; + + // A unique ID for the current run. This is used to ensure that runs that have + // been preempted by a new run do not update state or resolve the deferred + // with the wrong result. + #currentRunId = 0; + + #currentAbortController?: AbortController; + + /** + * Creates a new AsyncComputed signal. + * + * @param fn The function that performs the asynchronous computation. Any + * signals read synchronously - that is, before the first await - will be + * tracked as dependencies of the AsyncComputed, and cause the function to + * re-run when they change. + * + * @param options.initialValue The initial value of the AsyncComputed. + */ + constructor( + fn: (abortSignal: AbortSignal) => Promise, + options?: AsyncComputedOptions, + ) { + this.#value = new Signal.State(options?.initialValue); + this.#computed = new Signal.Computed(() => { + const runId = ++this.#currentRunId; + // Untrack reading the status signal to avoid triggering the computed when + // the status changes. + const status = Signal.subtle.untrack(() => this.#status.get()); + + // If we're not already pending, create a new deferred to track the + // completion of the run. + if (status !== "pending") { + this.#deferred.set(Promise.withResolvers()); + } + this.#isNotified = false; + this.#status.set("pending"); + + this.#currentAbortController?.abort(); + this.#currentAbortController = new AbortController(); + + fn(this.#currentAbortController.signal).then( + (result) => { + // If we've been preempted by a new run, don't update the status or + // resolve the deferred. + if (runId !== this.#currentRunId) { + return; + } + this.#status.set("complete"); + this.#value.set(result); + this.#error.set(undefined); + this.#deferred.get()!.resolve(result); + }, + (error) => { + // If we've been preempted by a new run, don't update the status or + // resolve the deferred. + if (runId !== this.#currentRunId) { + return; + } + this.#status.set("error"); + this.#error.set(error); + this.#value.set(undefined); + this.#deferred.get()!.reject(error); + }, + ); + }); + this.#watcher = new Signal.subtle.Watcher(async () => { + // Set the #isNotified flag synchronously when any dependencies change, so + // that it can be read synchronously by the status getter. + this.#isNotified = true; + this.#watcher.watch(); + }); + this.#watcher.watch(this.#computed); + } + + /** + * Returns the last value that the compute function resolved with, or + * the initial value if the compute function has not yet been run. + * + * @throws The last error that the compute function threw, is the last run of + * the compute function threw an error. + */ + get() { + const status = this.status; + if ( + status === "error" || + (status === "pending" && this.error !== undefined) + ) { + throw this.error; + } + return this.value; + } + + /** + * Runs the compute function if it is not already running and its dependencies + * have changed. + */ + run() { + this.#computed.get(); + } +} diff --git a/tests/async-computed.test.ts b/tests/async-computed.test.ts new file mode 100644 index 0000000..22ebabd --- /dev/null +++ b/tests/async-computed.test.ts @@ -0,0 +1,199 @@ +import { describe, test, assert } from "vitest"; +import { Signal } from "signal-polyfill"; +import { AsyncComputed } from "../src/async-computed.ts"; + +describe("AsyncComputed", () => { + test("initialValue", async () => { + const task = new AsyncComputed(async () => 1, { initialValue: 0 }); + assert.strictEqual(task.value, 0); + }); + + test("AsyncComputed runs", async () => { + const task = new AsyncComputed(async () => { + // Make the task take more than one microtask + await 0; + return 1; + }); + assert.equal(task.status, "initial"); + + // Getting the value starts the task + assert.strictEqual(task.value, undefined); + assert.strictEqual(task.error, undefined); + assert.equal(task.status, "pending"); + + const result = await task.complete; + + assert.equal(task.status, "complete"); + assert.strictEqual(task.value, 1); + assert.strictEqual(result, 1); + assert.strictEqual(task.error, undefined); + }); + + test("AsyncComputed re-runs when signal dependencies change", async () => { + const dep = new Signal.State("a"); + const task = new AsyncComputed(async () => { + // Read dependencies before first await + const value = dep.get(); + return value; + }); + + await task.complete; + assert.equal(task.status, "complete"); + assert.strictEqual(task.value, "a"); + assert.strictEqual(task.error, undefined); + + dep.set("b"); + assert.equal(task.status, "pending"); + + await task.complete; + assert.equal(task.status, "complete"); + assert.strictEqual(task.value, "b"); + assert.strictEqual(task.error, undefined); + + dep.set("c"); + assert.equal(task.status, "pending"); + }); + + test("Preemptive runs reuse the same completed promise", async () => { + const dep = new Signal.State("a"); + const deferredOne = Promise.withResolvers(); + let deferred = deferredOne; + const abortSignals: Array = []; + const task = new AsyncComputed(async (abortSignal) => { + // Read dependencies before first await + const value = dep.get(); + + abortSignals.push(abortSignal); + // Wait until we're told to go. The first run will wait so that the + // second run can preempt it. + await deferred.promise; + return value; + }); + + // Capture the promise that the task will complete + const firstRunComplete = task.complete; + + // Trigger a new run with a new deferred + const deferredTwo = Promise.withResolvers(); + deferred = deferredTwo; + dep.set("b"); + const secondRunComplete = task.complete; + + assert.equal(task.status, "pending"); + assert.strictEqual(abortSignals.length, 2); + assert.strictEqual(abortSignals[0]!.aborted, true); + assert.strictEqual(abortSignals[1]!.aborted, false); + + // We should not have created a new Promise. The first Promise should be + // resolved with the result of the second run. + assert.strictEqual(firstRunComplete, secondRunComplete); + + // Resolve the second run + deferredTwo.resolve(); + const result = await task.complete; + assert.equal(result, "b"); + }); + + test("AsyncComputed errors and can re-run", async () => { + const dep = new Signal.State("a"); + const task = new AsyncComputed(async () => { + // Read dependencies before first await + const value = dep.get(); + await 0; + if (value === "a") { + throw new Error("a"); + } + return value; + }); + + task.run(); + assert.equal(task.status, "pending"); + + try { + await task.complete; + assert.fail("Task should have thrown"); + } catch (error) { + assert.equal(task.status, "error"); + assert.strictEqual(task.value, undefined); + assert.strictEqual(task.error, error); + } + + // Check that the task can re-run after an error + + dep.set("b"); + assert.equal(task.status, "pending"); + await task.complete; + assert.strictEqual(task.value, "b"); + assert.strictEqual(task.error, undefined); + }); + + test("get() throws on error", async () => { + const task = new AsyncComputed(async () => { + throw new Error("A"); + }); + task.run(); + await task.complete.catch(() => {}); + assert.throws(() => task.get()); + }); + + test("can chain a computed signal", async () => { + const dep = new Signal.State("a"); + const task = new AsyncComputed(async () => { + // Read dependencies before first await + const value = dep.get(); + await 0; + if (value === "b") { + throw new Error("b"); + } + return value; + }); + const computed = new Signal.Computed(() => task.get()); + assert.strictEqual(computed.get(), undefined); + + await task.complete; + assert.strictEqual(computed.get(), "a"); + + dep.set("b"); + await task.complete.catch(() => {}); + assert.throws(() => computed.get()); + + dep.set("c"); + await task.complete; + assert.strictEqual(computed.get(), "c"); + }); + + test("can chain an AsyncComputed", async () => { + const dep = new Signal.State("a"); + const task1 = new AsyncComputed(async () => { + // Read dependencies before first await + const value = dep.get(); + await 0; + if (value === "b") { + throw new Error("b"); + } + return value; + }); + const task2 = new AsyncComputed(async () => { + return task1.complete; + }); + + assert.strictEqual(task2.get(), undefined); + assert.strictEqual(task2.status, "pending"); + + await task2.complete; + assert.strictEqual(task2.get(), "a"); + assert.strictEqual(task2.status, "complete"); + + dep.set("b"); + assert.strictEqual(task2.status, "pending"); + await task2.complete.catch(() => {}); + assert.throws(() => task2.get()); + assert.strictEqual(task2.status, "error"); + + dep.set("c"); + assert.strictEqual(task2.status, "pending"); + await task2.complete; + assert.strictEqual(task2.get(), "c"); + assert.strictEqual(task2.status, "complete"); + }); +});