diff --git a/code/.storybook/open-service-debug-service.ts b/code/.storybook/open-service-debug-service.ts index 627856b79700..79f7ff38d751 100644 --- a/code/.storybook/open-service-debug-service.ts +++ b/code/.storybook/open-service-debug-service.ts @@ -97,8 +97,8 @@ function createDebugServiceDef(storyIndexGeneratorPromise: Promise { logger.warn(`[open-service debug] command addActivity(${input.message})`); - ctx.self.setState((draft) => { - draft.activity.push(input.message); + ctx.self.setState((state) => { + state.activity.push(input.message); }); return undefined; @@ -115,10 +115,10 @@ function createDebugServiceDef(storyIndexGeneratorPromise: Promise ${Object.keys(storyIndex.entries).length} entries` ); - ctx.self.setState((draft) => { - draft.storyIndexEntryCount = Object.keys(storyIndex.entries).length; - draft.storyIndexSampleIds = sampleIds; - draft.activity.push(`syncStoryIndex:${input.reason}:${sampleIds.length}`); + ctx.self.setState((state) => { + state.storyIndexEntryCount = Object.keys(storyIndex.entries).length; + state.storyIndexSampleIds = sampleIds; + state.activity.push(`syncStoryIndex:${input.reason}:${sampleIds.length}`); }); return undefined; @@ -137,10 +137,10 @@ function createDebugServiceDef(storyIndexGeneratorPromise: Promise ${value}` ); - ctx.self.setState((draft) => { - draft.preloadedByEntryId[input.entryId] = value; - draft.lastObservedValue = value; - draft.activity.push(`recordPreloadVisit:${input.entryId}:${input.source}`); + ctx.self.setState((state) => { + state.preloadedByEntryId[input.entryId] = value; + state.lastObservedValue = value; + state.activity.push(`recordPreloadVisit:${input.entryId}:${input.source}`); }); return undefined; diff --git a/code/core/package.json b/code/core/package.json index 1a2f7147248b..ecfa95be61d3 100644 --- a/code/core/package.json +++ b/code/core/package.json @@ -233,6 +233,7 @@ "!src/**/*" ], "dependencies": { + "@preact/signals-core": "^1.14.2", "@storybook/global": "^5.0.0", "@storybook/icons": "^2.0.2", "@testing-library/dom": "^10.4.1", @@ -241,6 +242,7 @@ "@vitest/expect": "3.2.4", "@vitest/spy": "3.2.4", "@webcontainer/env": "^1.1.1", + "deepsignal": "^1.6.0", "esbuild": "^0.18.0 || ^0.19.0 || ^0.20.0 || ^0.21.0 || ^0.22.0 || ^0.23.0 || ^0.24.0 || ^0.25.0 || ^0.26.0 || ^0.27.0", "open": "^10.2.0", "oxc-parser": "^0.127.0", @@ -304,7 +306,6 @@ "@yarnpkg/libzip": "2.3.0", "acorn": "^8.15.0", "acorn-jsx": "^5.3.2", - "alien-signals": "^3.2.0", "ansi-to-html": "^0.7.2", "browser-dtector": "^3.4.0", "bundle-require": "^5.1.0", diff --git a/code/core/src/shared/open-service/README.md b/code/core/src/shared/open-service/README.md index c7fdf4fec00b..5b00cc64ff10 100644 --- a/code/core/src/shared/open-service/README.md +++ b/code/core/src/shared/open-service/README.md @@ -6,8 +6,9 @@ Its goals are: - define stateful services in one declarative object - expose synchronous queries and async commands with strong TypeScript inference -- validate all query and command input/output through Standard Schema -- support reactive query subscriptions through `alien-signals` +- validate all query and command input/output through Standard Schema (schemas may transform/coerce) +- support fine-grained reactive query subscriptions through deep signals (`deepsignal` + + `@preact/signals-core`) - support server-side static state snapshots driven by query `load` hooks The main audience for this README is agents and maintainers who need to understand how the pieces @@ -94,6 +95,15 @@ Query handlers do **not** receive `commands` or `setState`. Mutations belong in `load` mutations must go through commands. Cross-service `getService(...).queries.*` calls inside a load body are not auto-tracked for the drain; use `await ctx.getService(id).queries.foo.loaded(input)` when you need a cross-service dependency awaited before your own load completes. +**`load` is a reactive, idempotent warming step.** For an active subscription, `load` re-fires whenever the external signals it reads synchronously change — same-service fields and cross-service reads via `getService(...).queries.*` alike — turning a query into a reactive async resource (like a TanStack Query / SolidJS `createResource` / Vue async `watchEffect`). This means: + +- **`load` must be idempotent.** Re-running it with the same dependencies must produce the same state. Any genuinely one-shot side effect belongs in a command invoked conditionally, never in `load` itself. +- **Read dependencies synchronously, up front.** Only reads in the load's synchronous prefix (before the first `await`) are tracked. Read the values you depend on first, then do async work — the same idiom every signal-based resource uses. +- **Loads that read no external signal fire exactly once** (the common case: `await ctx.self.commands.x(input)`), so existing loads are unaffected. +- Direct `query()` / `.loaded()` calls are **not** reactive — they keep one-shot-per-call semantics. Reactivity is scoped to subscriptions and is torn down when the last subscriber unsubscribes. + +The runtime guards re-firing: a superseded run (its dependencies changed again before it finished) cannot overwrite a newer run's state, and changes batched together produce a single re-load. + **Keep `load` bodies as small as possible.** Almost always, `load` should be a one-liner that calls a command — the real work (input resolution, side effects, validation, state mutation) belongs in the command. This pays off for three reasons: - **Reusability.** Anyone can call the command directly (other services, tests, integrations) without going through the query's load path. Logic stuck inside a load is unreachable from outside the drain. @@ -158,7 +168,12 @@ Both must be Standard Schema compatible. The runtime validates: - caller input before a handler runs -- handler output before the result is returned or emitted +- handler output when a value is produced for a consumer — a direct `query()` call, `query.loaded()`, + the static build, and a subscription emission + +Output validation reads the whole value, so it is kept out of the part of a subscription that +determines reactive dependencies: for a `selector` subscriber it runs without tracking, so it cannot +expand the deep-signal dependency footprint. (See "Subscription Flow".) Queries validate **synchronously**. Their input and output schemas must produce sync results. If a Standard Schema returns a Promise during a query validation, the runtime throws `OpenServiceAsyncSchemaError` immediately. @@ -250,15 +265,37 @@ When an **async** `load` body runs, it instead gets a *wrapped* `ctx.self.querie Cross-service `ctx.getService(id).queries.*` calls inside a load body are **not** wrapped; authors must use `.loaded()` explicitly when they need a cross-service dep awaited from inside a load. From a sync handler, cross-service queries are tracked because they consult the module-scoped session like any other call. +## State and reactivity + +State is a **deep reactive proxy** (`deepSignal` from `deepsignal`, backed by `@preact/signals-core`) +created in [service-runtime.ts](./service-runtime.ts). There is no top-level state atom and no Immer: + +- Reading a field through `ctx.self.state` tracks a fine-grained signal for exactly that field + (including not-yet-present record keys, which fire when the key is later added). +- `setState((state) => …)` mutates the proxy **in place** inside a batch, so one command notifies + subscribers once, and only the fields it actually changed are invalidated. +- The proxy is internal and does not escape: + - Query/`.loaded()` results are the schema-validated value. For object and array schemas that + rebuild a plain value, this also detaches the result from the proxy. + - Subscription emissions are detached to plain values (validated for whole-value subscribers, or + JSON-stripped for `selector` slices). + - The whole-state snapshot for the static build uses `structuredClone` of the plain backing + object. (`structuredClone` cannot clone a proxy, so proxy-slice stripping uses a JSON round-trip; + state must be JSON-serializable, the same constraint the static-build pipeline relies on.) + ## Subscription Flow -Subscriptions are implemented with `alien-signals` in [service-runtime.ts](./service-runtime.ts): +Subscriptions are implemented in [service-runtime.ts](./service-runtime.ts): -1. `subscribe(input, callback)` defers all work to a microtask. -2. The microtask validates the input synchronously and fires the dependency's `load` in the background. -3. A `computed()` value wraps the synchronous handler. An `effect()` runs the handler immediately (delivering the current value to the callback) and re-runs whenever the handler's tracked state dependencies change. -4. Subscribers receive the current state right away, then a follow-up emission once the load settles and state changes. UI consumers that want to suppress the pre-load emission should branch on the value (e.g. show a spinner for `null`). -5. Each emitted value is output-validated before the subscriber callback runs. +1. `subscribe(input, callback)` (or `subscribe(input, selector, callback)`) defers all work to a microtask. +2. The microtask validates the input synchronously. If the query has a `load`, it is run inside its own `effect()` so the external signals it reads synchronously are tracked: when they change, the effect re-runs and the load re-fires (see "Load"). Writes from a superseded run are dropped (each run carries an epoch; `setState` is gated on it), so a slow stale load can't clobber a newer result. The effect is torn down with the subscription. +3. A `computed()` runs the synchronous handler against the deep-signal proxy, so its dependency footprint is exactly what it reads. The output is always validated, but where validation runs depends on the subscription: + - **No selector:** the value is validated here and emitted. Reading the whole value to validate it is the correct footprint for a whole-value subscriber, and it keeps the emitted value identical to a direct `query()` pull. + - **With a selector:** validation runs untracked (so it does not register dependencies) and only `selector(value)` is read (then detached to a plain snapshot), so a sibling field the selector ignores never re-runs the handler. +4. An `effect()` runs the computed immediately (delivering the current value) and re-runs only when the computed's tracked fields change. A write to an unrelated key or field never re-runs the handler. +5. Subscribers receive the current state right away, then a follow-up emission once the load settles and state changes. UI consumers that want to suppress the pre-load emission should branch on the value (e.g. show a spinner for `null`). +6. Emissions are deduped by value: the effect compares the new value with the last emitted one via `es-toolkit` `isEqual` and skips the callback when they are equal. So a load that rewrites a deeply-equal value does not re-fire subscribers. +7. The optional `selector` is the `universal-store` pattern: the callback receives the selected slice and fires only when that slice changes by value — and, because the selector drives the computed's reads, an unselected field change does not even re-run the handler. Tests should use `vi.waitFor(...)` when asserting the first emission or follow-up emissions. @@ -363,8 +400,8 @@ export const exampleServiceDef = defineService({ input: entryIdSchema, output: v.void(), handler: async (input, ctx) => { - ctx.self.setState((draft) => { - draft.values[input.entryId] = 'ready'; + ctx.self.setState((state) => { + state.values[input.entryId] = 'ready'; }); }, }, diff --git a/code/core/src/shared/open-service/fixtures.ts b/code/core/src/shared/open-service/fixtures.ts index c9ed3a4ca78e..9551811b03f1 100644 --- a/code/core/src/shared/open-service/fixtures.ts +++ b/code/core/src/shared/open-service/fixtures.ts @@ -44,9 +44,9 @@ export const mutableRecordLookupServiceDef = defineService({ input: assignEntryFieldInputSchema, output: voidOutputSchema, handler: (input, ctx) => { - ctx.self.setState((draft) => { - draft[input.entryId] ??= {}; - draft[input.entryId]![input.fieldKey] = input.fieldValue; + ctx.self.setState((state) => { + state[input.entryId] ??= {}; + state[input.entryId]![input.fieldKey] = input.fieldValue; }); }, }, @@ -82,8 +82,8 @@ export const awaitedPreloadValueServiceDef = defineService({ output: voidOutputSchema, handler: async (input, ctx) => { await Promise.resolve(); - ctx.self.setState((draft) => { - draft[input.entryId] = 'preloaded'; + ctx.self.setState((state) => { + state[input.entryId] = 'preloaded'; }); }, }, @@ -116,14 +116,63 @@ export const fireAndForgetPreloadValueServiceDef = defineService({ output: voidOutputSchema, handler: async (input, ctx) => { await Promise.resolve(); - ctx.self.setState((draft) => { - draft[input.entryId] = 'preloaded'; + ctx.self.setState((state) => { + state[input.entryId] = 'preloaded'; }); }, }, }, }); +export type RebuiltValue = { marker: string; count: number }; +export type RebuiltValueState = Record; + +/** Shared schema for the object value used by the rebuilt-equal-value fixture. */ +export const rebuiltValueOutputSchema = v.nullable( + v.object({ marker: v.string(), count: v.number() }) +); + +/** + * Service fixture whose `load` rebuilds a deeply-equal but freshly-allocated object value every + * time it runs, then writes it back via a command. + * + * It exercises the case where re-subscribing to an already-populated entry would emit a redundant + * second value (the immediate emission plus a load-driven emission carrying an equal-but-not- + * identical object) unless the runtime dedups by value. + */ +export const rebuiltEqualValueOnLoadServiceDef = defineService({ + id: 'internal-fixture/rebuilt-equal-value-on-load', + description: 'Rewrites a deeply-equal but freshly-allocated object value on every load.', + initialState: {} as RebuiltValueState, + queries: { + getRebuiltValue: { + description: 'Returns the value for an entry; load always rewrites a fresh-but-equal value.', + input: entryIdInputSchema, + output: rebuiltValueOutputSchema, + handler: (input, ctx) => ctx.self.state[input.entryId] ?? null, + load: async (input, ctx) => { + await ctx.self.commands.rebuildValue(input); + }, + }, + }, + commands: { + rebuildValue: { + description: 'Allocates a brand-new object with a stable value and stores it.', + input: entryIdInputSchema, + output: rebuiltValueOutputSchema, + handler: async (input, ctx) => { + await Promise.resolve(); + // A new object literal every call: deeply equal to any prior value, never `===` to it. + const value: RebuiltValue = { marker: 'stable', count: 1 }; + ctx.self.setState((state) => { + state[input.entryId] = value; + }); + return value; + }, + }, + }, +}); + export type SharedStaticFileState = { left?: string; right?: string }; /** Creates a fixture where multiple queries contribute state to one shared static file. */ @@ -162,8 +211,8 @@ export function createSharedStaticFileServiceDef() { input: noInputSchema, output: voidOutputSchema, handler: (_input, ctx) => { - ctx.self.setState((draft) => { - draft.left = 'preloaded'; + ctx.self.setState((state) => { + state.left = 'preloaded'; }); }, }, @@ -172,8 +221,8 @@ export function createSharedStaticFileServiceDef() { input: noInputSchema, output: voidOutputSchema, handler: (_input, ctx) => { - ctx.self.setState((draft) => { - draft.right = 'preloaded'; + ctx.self.setState((state) => { + state.right = 'preloaded'; }); }, }, diff --git a/code/core/src/shared/open-service/index.test-d.ts b/code/core/src/shared/open-service/index.test-d.ts index 023320d87026..04140f8e606d 100644 --- a/code/core/src/shared/open-service/index.test-d.ts +++ b/code/core/src/shared/open-service/index.test-d.ts @@ -69,9 +69,9 @@ const openServiceDef = defineService({ output: v.void(), handler: (input, ctx) => { expectTypeOf(input).toEqualTypeOf(); - ctx.self.setState((draft) => { - expectTypeOf(draft).toEqualTypeOf(); - draft.count += input; + ctx.self.setState((state) => { + expectTypeOf(state).toEqualTypeOf(); + state.count += input; }); }, }, @@ -80,9 +80,9 @@ const openServiceDef = defineService({ output: v.void(), handler: async (input, ctx) => { expectTypeOf(input).toEqualTypeOf<{ entryId: string }>(); - ctx.self.setState((draft) => { - expectTypeOf(draft.valuesById[input.entryId]).toEqualTypeOf(); - draft.valuesById[input.entryId] = 'ready'; + ctx.self.setState((state) => { + expectTypeOf(state.valuesById[input.entryId]).toEqualTypeOf(); + state.valuesById[input.entryId] = 'ready'; }); }, }, diff --git a/code/core/src/shared/open-service/server.test-d.ts b/code/core/src/shared/open-service/server.test-d.ts index aab109588488..603b865129f5 100644 --- a/code/core/src/shared/open-service/server.test-d.ts +++ b/code/core/src/shared/open-service/server.test-d.ts @@ -65,16 +65,16 @@ const registeredService = registerService(registrationOnlyServiceDef, { increment: { handler: (input, ctx) => { expectTypeOf(input).toEqualTypeOf(); - ctx.self.setState((draft) => { - draft.count += input; + ctx.self.setState((state) => { + state.count += input; }); }, }, preloadValue: { handler: async (input, ctx) => { expectTypeOf(input).toEqualTypeOf<{ entryId: string }>(); - ctx.self.setState((draft) => { - draft.valuesById[input.entryId] = 'ready'; + ctx.self.setState((state) => { + state.valuesById[input.entryId] = 'ready'; }); }, }, diff --git a/code/core/src/shared/open-service/server.test.ts b/code/core/src/shared/open-service/server.test.ts index 0ef1d7187aae..40befb30e243 100644 --- a/code/core/src/shared/open-service/server.test.ts +++ b/code/core/src/shared/open-service/server.test.ts @@ -121,8 +121,8 @@ describe('server static builds', () => { entryId: 'entry-a', }); - ctx.self.setState((draft) => { - draft.value = record?.marker ?? null; + ctx.self.setState((state) => { + state.value = record?.marker ?? null; }); return undefined; @@ -167,8 +167,8 @@ describe('server static builds', () => { output: v.undefined(), handler: async (_input, ctx) => { readyEntryIds.splice(0, readyEntryIds.length, 'entry-a'); - ctx.self.setState((draft) => { - draft.built = true; + ctx.self.setState((state) => { + state.built = true; }); return undefined; @@ -200,8 +200,8 @@ describe('server static builds', () => { input: v.object({ entryId: v.string() }), output: v.undefined(), handler: async (input, ctx) => { - ctx.self.setState((draft) => { - draft.value = input.entryId; + ctx.self.setState((state) => { + state.value = input.entryId; }); return undefined; @@ -281,8 +281,8 @@ describe('server static builds', () => { }), output: v.undefined(), handler: async (input, ctx) => { - ctx.self.setState((draft) => { - draft.value = input.value; + ctx.self.setState((state) => { + state.value = input.value; }); return undefined; @@ -370,8 +370,8 @@ describe('server static builds', () => { input: v.undefined(), output: v.undefined(), handler: async (_input, ctx) => { - ctx.self.setState((draft) => { - draft.value = 'invalid'; + ctx.self.setState((state) => { + state.value = 'invalid'; }); return undefined; @@ -427,8 +427,8 @@ describe('server static builds', () => { }), output: v.undefined(), handler: async (input, ctx) => { - ctx.self.setState((draft) => { - draft.value = input.value; + ctx.self.setState((state) => { + state.value = input.value; }); return undefined; diff --git a/code/core/src/shared/open-service/server.ts b/code/core/src/shared/open-service/server.ts index f5a8a362adc4..222e4c8689f4 100644 --- a/code/core/src/shared/open-service/server.ts +++ b/code/core/src/shared/open-service/server.ts @@ -88,7 +88,7 @@ export async function buildStaticFiles(): Promise { await buildRuntime.runLoadOnce(queryName, validatedInput); - return { path, state: buildRuntime.stateSignal() }; + return { path, state: buildRuntime.getStateSnapshot() }; }) ); })() diff --git a/code/core/src/shared/open-service/service-registration.test.ts b/code/core/src/shared/open-service/service-registration.test.ts index 0b4e8c0abcbe..3fad6bfa7a27 100644 --- a/code/core/src/shared/open-service/service-registration.test.ts +++ b/code/core/src/shared/open-service/service-registration.test.ts @@ -168,8 +168,8 @@ describe('service registration', () => { commands: { increment: { handler: async (_input, ctx) => { - ctx.self.setState((draft) => { - draft.count += 1; + ctx.self.setState((state) => { + state.count += 1; }); }, }, @@ -184,8 +184,8 @@ describe('service registration', () => { const record = lookup.queries.getRecordFields({ entryId: input.entryId, }); - ctx.self.setState((draft) => { - draft.count = record?.marker === input.fieldValue ? 1 : 0; + ctx.self.setState((state) => { + state.count = record?.marker === input.fieldValue ? 1 : 0; }); }, }, @@ -252,8 +252,8 @@ describe('service registration', () => { commands: { setValue: { handler: async (_input, ctx) => { - ctx.self.setState((draft) => { - draft.value = 'built-at-registration'; + ctx.self.setState((state) => { + state.value = 'built-at-registration'; }); }, }, diff --git a/code/core/src/shared/open-service/service-registration.ts b/code/core/src/shared/open-service/service-registration.ts index aad03d396d15..939dde0864dd 100644 --- a/code/core/src/shared/open-service/service-registration.ts +++ b/code/core/src/shared/open-service/service-registration.ts @@ -192,7 +192,13 @@ export function registerService< } const resolvedDefinition = applyRegistration(definition, registration); - const runtime = createServiceRuntime(resolvedDefinition, { registryApi: serviceRegistryApi }); + // The runtime mutates its state object in place, so give it a copy rather than the definition's + // shared `initialState` (which would otherwise leak state across registrations). + const runtime = createServiceRuntime( + resolvedDefinition, + { registryApi: serviceRegistryApi }, + structuredClone(resolvedDefinition.initialState) + ); const registeredRuntime = { queries: runtime.queries, commands: runtime.commands, diff --git a/code/core/src/shared/open-service/service-runtime.test.ts b/code/core/src/shared/open-service/service-runtime.test.ts index 884a95dcc9ae..c2788b1531d1 100644 --- a/code/core/src/shared/open-service/service-runtime.test.ts +++ b/code/core/src/shared/open-service/service-runtime.test.ts @@ -6,13 +6,13 @@ import { serviceRegistryApi } from './service-registration.ts'; import { createServiceRuntime } from './service-runtime.ts'; import { clearRegistry, registerService } from './server.ts'; import { + type RebuiltValue, awaitedPreloadValueServiceDef, createDerivedBooleanFromChildQueryServiceDef, - entryIdInputSchema, + createInvalidQueryOutputServiceDef, fireAndForgetPreloadValueServiceDef, mutableRecordLookupServiceDef, - preloadedValueOutputSchema, - voidOutputSchema, + rebuiltEqualValueOnLoadServiceDef, } from './fixtures.ts'; afterEach(() => { @@ -80,38 +80,6 @@ describe('service runtime', () => { unsubscribe(); }); - it('does not notify subscribers for a different record', async () => { - const service = registerService(mutableRecordLookupServiceDef); - const callsA: Array | null> = []; - const callsB: Array | null> = []; - - const unsubscribeA = service.queries.getRecordFields.subscribe( - { entryId: 'entry-a' }, - (value) => { - callsA.push(value); - } - ); - const unsubscribeB = service.queries.getRecordFields.subscribe( - { entryId: 'entry-b' }, - (value) => { - callsB.push(value); - } - ); - - await vi.waitFor(() => expect(callsA).toEqual([null])); - await vi.waitFor(() => expect(callsB).toEqual([null])); - await service.commands.assignRecordField({ - entryId: 'entry-b', - fieldKey: 'marker', - fieldValue: 'match', - }); - - expect(callsA).toEqual([null]); - expect(callsB).toEqual([null, { marker: 'match' }]); - unsubscribeA(); - unsubscribeB(); - }); - it('stops notifying after unsubscribe', async () => { const service = registerService(mutableRecordLookupServiceDef); const calls: Array | null> = []; @@ -200,8 +168,8 @@ describe('service runtime', () => { input: v.string(), output: v.void(), handler: (input, ctx) => { - ctx.self.setState((draft) => { - draft.value = input; + ctx.self.setState((state) => { + state.value = input; }); }, }, @@ -251,6 +219,33 @@ describe('service runtime', () => { queueMicrotaskSpy.mockRestore(); } }); + + // A `selector` narrows the reactive footprint but must not skip output validation: the handler + // output is still validated (untracked) before the selector runs. + it('still validates query output for a subscriber that passes a selector', async () => { + const queuedCallbacks: Array<() => void> = []; + const queueMicrotaskSpy = vi + .spyOn(globalThis, 'queueMicrotask') + .mockImplementation((callback: VoidFunction) => { + queuedCallbacks.push(callback); + }); + const service = registerService(createInvalidQueryOutputServiceDef()); + + try { + service.queries.getBrokenValue.subscribe( + undefined, + (value) => value, + () => {} + ); + + await vi.waitFor(() => expect(queuedCallbacks).toHaveLength(1)); + expect(() => queuedCallbacks[0]()).toThrow( + 'Invalid output for query "internal-fixture/invalid-query-output.getBrokenValue"' + ); + } finally { + queueMicrotaskSpy.mockRestore(); + } + }); }); describe('background load', () => { @@ -301,6 +296,137 @@ describe('service runtime', () => { unsubscribe(); }); + // A load that re-runs and rewrites a deeply-equal value produces a new state slice, so the + // subscription computed re-runs — but the emitted value is value-equal to the last one, so the + // `isEqual` emit gate suppresses the redundant callback. + it('does not re-emit when a load rewrites a deeply-equal but freshly-allocated value', async () => { + const service = registerService(rebuiltEqualValueOnLoadServiceDef); + + // First subscription: null -> populated. After this, state holds value #1. + const firstCalls: Array = []; + const unsubscribeFirst = service.queries.getRebuiltValue.subscribe( + { entryId: 'entry-a' }, + (value) => { + firstCalls.push(value); + } + ); + await vi.waitFor(() => expect(firstCalls).toEqual([null, { marker: 'stable', count: 1 }])); + unsubscribeFirst(); + + // Second subscription, entry already populated: the immediate emission carries the stored + // value, then load reruns and stores a brand-new object that is deeply equal but not `===`. + // The redundant emission must be suppressed. + const secondCalls: Array = []; + const unsubscribeSecond = service.queries.getRebuiltValue.subscribe( + { entryId: 'entry-a' }, + (value) => { + secondCalls.push(value); + } + ); + + await vi.waitFor(() => expect(secondCalls).toEqual([{ marker: 'stable', count: 1 }])); + // Give the background load time to run and (not) notify. + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(secondCalls).toEqual([{ marker: 'stable', count: 1 }]); + + unsubscribeSecond(); + }); + + // True fine-grained reactivity: the deep-signal proxy tracks reads per field, so writing an + // unrelated key must not re-run the subscriber's handler at all (not merely suppress the + // emission). The handler spy proves the computed did not re-evaluate. Before the deep-signal + // migration this assertion failed — the handler re-ran on every write and the test only passed + // because the emitted value happened to be value-equal. + it('does not re-run a subscriber handler when an unrelated key changes', async () => { + const handlerSpy = vi.spyOn(mutableRecordLookupServiceDef.queries.getRecordFields, 'handler'); + try { + const service = registerService(mutableRecordLookupServiceDef); + + await service.commands.assignRecordField({ + entryId: 'entry-a', + fieldKey: 'marker', + fieldValue: 'match', + }); + + const callsA: Array | null> = []; + const unsubscribe = service.queries.getRecordFields.subscribe( + { entryId: 'entry-a' }, + (value) => { + callsA.push(value); + } + ); + await vi.waitFor(() => expect(callsA).toEqual([{ marker: 'match' }])); + const handlerRunsAfterSubscribe = handlerSpy.mock.calls.length; + + await service.commands.assignRecordField({ + entryId: 'entry-b', + fieldKey: 'marker', + fieldValue: 'other', + }); + await new Promise((resolve) => setTimeout(resolve, 30)); + + // No emission and — crucially — no handler re-run for entry-a. + expect(callsA).toEqual([{ marker: 'match' }]); + expect(handlerSpy.mock.calls.length).toBe(handlerRunsAfterSubscribe); + + unsubscribe(); + } finally { + handlerSpy.mockRestore(); + } + }); + + // A `selector` narrows the subscriber to one slice of the value. Changing a sibling field the + // selector ignores must neither fire the callback nor re-run the handler (the handler spy proves + // the dependency footprint is narrowed, not just the emission); changing the selected field + // fires once with the new slice. + it('re-emits and re-runs only for the selected slice of a query value', async () => { + const handlerSpy = vi.spyOn(mutableRecordLookupServiceDef.queries.getRecordFields, 'handler'); + try { + const service = registerService(mutableRecordLookupServiceDef); + + await service.commands.assignRecordField({ + entryId: 'entry-a', + fieldKey: 'selected', + fieldValue: 'first', + }); + + const selectedCalls: Array = []; + const unsubscribe = service.queries.getRecordFields.subscribe( + { entryId: 'entry-a' }, + (record) => record?.selected, + (selected) => { + selectedCalls.push(selected); + } + ); + await vi.waitFor(() => expect(selectedCalls).toEqual(['first'])); + const handlerRunsAfterSubscribe = handlerSpy.mock.calls.length; + + // Changing a sibling field re-runs nothing and emits nothing: the selector reads only + // `selected`, so the sibling is outside the tracked footprint. + await service.commands.assignRecordField({ + entryId: 'entry-a', + fieldKey: 'sibling', + fieldValue: 'ignored', + }); + await new Promise((resolve) => setTimeout(resolve, 30)); + expect(selectedCalls).toEqual(['first']); + expect(handlerSpy.mock.calls.length).toBe(handlerRunsAfterSubscribe); + + // Changing the selected field fires the callback once with the new slice. + await service.commands.assignRecordField({ + entryId: 'entry-a', + fieldKey: 'selected', + fieldValue: 'second', + }); + await vi.waitFor(() => expect(selectedCalls).toEqual(['first', 'second'])); + + unsubscribe(); + } finally { + handlerSpy.mockRestore(); + } + }); + it('preloads distinct values independently by input', async () => { const service = registerService(awaitedPreloadValueServiceDef); const callsA: Array = []; @@ -392,6 +518,301 @@ describe('service runtime', () => { }); }); + describe('reactive load', () => { + // Same-service: the load reads an external field (`source`) and writes a derived field; + // changing `source` must re-fire the load and refresh the value. + function createReactiveDerivedServiceDef() { + return defineService({ + id: 'internal-fixture/reactive-derived-same-service', + initialState: { source: 1, derived: null as number | null }, + queries: { + getDerived: { + input: v.void(), + output: v.nullable(v.number()), + handler: (_input, ctx) => ctx.self.state.derived, + load: async (_input, ctx) => { + const source = ctx.self.state.source; // external read -> tracked + await Promise.resolve(); + await ctx.self.commands.setDerived(source * 10); + }, + }, + }, + commands: { + setSource: { + input: v.number(), + output: v.void(), + handler: (next, ctx) => + ctx.self.setState((state) => { + state.source = next; + }), + }, + bumpSourceTwice: { + input: v.void(), + output: v.void(), + // Two writes in one command share one batch, so the load coalesces them into one re-fire. + handler: (_input, ctx) => + ctx.self.setState((state) => { + state.source = 2; + state.source = 3; + }), + }, + setDerived: { + input: v.number(), + output: v.void(), + handler: (next, ctx) => + ctx.self.setState((state) => { + state.derived = next; + }), + }, + }, + }); + } + + it('re-fires load when a same-service external dependency changes', async () => { + const service = registerService(createReactiveDerivedServiceDef()); + const calls: Array = []; + + const unsubscribe = service.queries.getDerived.subscribe(undefined, (value) => { + calls.push(value); + }); + + await vi.waitFor(() => expect(calls).toEqual([null, 10])); + await service.commands.setSource(5); + await vi.waitFor(() => expect(calls).toEqual([null, 10, 50])); + + unsubscribe(); + }); + + it('re-fires load when a cross-service dependency changes (via getService)', async () => { + const sourceDef = defineService({ + id: 'internal-fixture/reactive-cross-source', + initialState: { value: 1 }, + queries: { + getValue: { + input: v.void(), + output: v.number(), + handler: (_input, ctx) => ctx.self.state.value, + }, + }, + commands: { + setValue: { + input: v.number(), + output: v.void(), + handler: (next, ctx) => + ctx.self.setState((state) => { + state.value = next; + }), + }, + }, + }); + const derivedDef = defineService({ + id: 'internal-fixture/reactive-cross-derived', + initialState: { derived: null as number | null }, + queries: { + getDerived: { + input: v.void(), + output: v.nullable(v.number()), + handler: (_input, ctx) => ctx.self.state.derived, + load: async (_input, ctx) => { + const value = ctx.getService(sourceDef.id).queries.getValue(undefined) as number; + await Promise.resolve(); + await ctx.self.commands.setDerived(value * 10); + }, + }, + }, + commands: { + setDerived: { + input: v.number(), + output: v.void(), + handler: (next, ctx) => + ctx.self.setState((state) => { + state.derived = next; + }), + }, + }, + }); + + const sourceService = registerService(sourceDef); + const derivedService = registerService(derivedDef); + const calls: Array = []; + + const unsubscribe = derivedService.queries.getDerived.subscribe(undefined, (value) => { + calls.push(value); + }); + + await vi.waitFor(() => expect(calls).toEqual([null, 10])); + await sourceService.commands.setValue(5); + await vi.waitFor(() => expect(calls).toEqual([null, 10, 50])); + + unsubscribe(); + }); + + it('does not infinite-loop when the load writes the state its handler reads', async () => { + const def = createReactiveDerivedServiceDef(); + const loadSpy = vi.spyOn(def.queries.getDerived, 'load'); + try { + const service = registerService(def); + const calls: Array = []; + + const unsubscribe = service.queries.getDerived.subscribe(undefined, (value) => { + calls.push(value); + }); + + await vi.waitFor(() => expect(calls).toEqual([null, 10])); + await new Promise((resolve) => setTimeout(resolve, 40)); + + // The load writes `derived` (read by the handler) but only reads `source`, so writing + // `derived` does not re-trigger it: it fires exactly once and settles. + expect(loadSpy).toHaveBeenCalledTimes(1); + expect(calls).toEqual([null, 10]); + + unsubscribe(); + } finally { + loadSpy.mockRestore(); + } + }); + + it('coalesces rapid dependency changes in one batch into a single re-load', async () => { + const def = createReactiveDerivedServiceDef(); + const loadSpy = vi.spyOn(def.queries.getDerived, 'load'); + try { + const service = registerService(def); + const calls: Array = []; + + const unsubscribe = service.queries.getDerived.subscribe(undefined, (value) => { + calls.push(value); + }); + await vi.waitFor(() => expect(calls).toEqual([null, 10])); + expect(loadSpy).toHaveBeenCalledTimes(1); + + // Two writes to `source` within one batched command -> one re-load (not two). + await service.commands.bumpSourceTwice(); + await vi.waitFor(() => expect(calls).toEqual([null, 10, 30])); + + expect(loadSpy).toHaveBeenCalledTimes(2); + + unsubscribe(); + } finally { + loadSpy.mockRestore(); + } + }); + + it('supersedes an in-flight load when dependencies change again', async () => { + const gates = new Map void>(); + const waitForGate = (source: number) => + new Promise((resolve) => { + gates.set(source, resolve); + }); + const releaseGate = async (source: number) => { + await vi.waitFor(() => expect(gates.has(source)).toBe(true)); + gates.get(source)!(); + }; + + const def = defineService({ + id: 'internal-fixture/reactive-superseding', + initialState: { source: 0, derived: null as number | null }, + queries: { + getDerived: { + input: v.void(), + output: v.nullable(v.number()), + handler: (_input, ctx) => ctx.self.state.derived, + load: async (_input, ctx) => { + const source = ctx.self.state.source; + await waitForGate(source); // hold the load open until the test releases it + await ctx.self.commands.setDerived(source); + }, + }, + }, + commands: { + setSource: { + input: v.number(), + output: v.void(), + handler: (next, ctx) => + ctx.self.setState((state) => { + state.source = next; + }), + }, + setDerived: { + input: v.number(), + output: v.void(), + handler: (next, ctx) => + ctx.self.setState((state) => { + state.derived = next; + }), + }, + }, + }); + + const service = registerService(def); + const calls: Array = []; + const unsubscribe = service.queries.getDerived.subscribe(undefined, (value) => { + calls.push(value); + }); + + // Initial load (source 0) settles first. + await vi.waitFor(() => expect(calls).toEqual([null])); + await releaseGate(0); + await vi.waitFor(() => expect(calls).toEqual([null, 0])); + + // Start two loads back-to-back; release the stale one (1) first, then the newest (2). + await service.commands.setSource(1); + await service.commands.setSource(2); + await releaseGate(1); + await releaseGate(2); + + await vi.waitFor(() => expect(calls).toEqual([null, 0, 2])); + // The superseded load (source 1) must never have written `derived`. + await new Promise((resolve) => setTimeout(resolve, 40)); + expect(calls).toEqual([null, 0, 2]); + + unsubscribe(); + }); + + it('does not re-fire load for non-subscription query() calls', async () => { + const def = createReactiveDerivedServiceDef(); + const loadSpy = vi.spyOn(def.queries.getDerived, 'load'); + try { + const service = registerService(def); + + // A plain query() call fires load once (fire-and-forget) and never sets up reactivity. + service.queries.getDerived(undefined); + await vi.waitFor(() => expect(loadSpy).toHaveBeenCalledTimes(1)); + + await service.commands.setSource(9); + await new Promise((resolve) => setTimeout(resolve, 40)); + + expect(loadSpy).toHaveBeenCalledTimes(1); + } finally { + loadSpy.mockRestore(); + } + }); + + it('fires an existing self-contained load exactly once for a subscription', async () => { + const loadSpy = vi.spyOn(awaitedPreloadValueServiceDef.queries.getPreloadedValue, 'load'); + try { + const service = registerService(awaitedPreloadValueServiceDef); + const calls: Array = []; + + const unsubscribe = service.queries.getPreloadedValue.subscribe( + { entryId: 'entry-a' }, + (value) => { + calls.push(value); + } + ); + + await vi.waitFor(() => expect(calls).toEqual([null, 'preloaded'])); + await new Promise((resolve) => setTimeout(resolve, 40)); + + // The load reads/writes only its own state (no external read), so it fires once. + expect(loadSpy).toHaveBeenCalledTimes(1); + + unsubscribe(); + } finally { + loadSpy.mockRestore(); + } + }); + }); + describe('cross-service query composition', () => { it('reads a child query synchronously from another service', async () => { const sourceService = registerService(mutableRecordLookupServiceDef); @@ -520,8 +941,8 @@ describe('service runtime', () => { input: v.undefined(), output: v.void(), handler: (_input, ctx) => { - ctx.self.setState((draft) => { - draft.aDone = true; + ctx.self.setState((state) => { + state.aDone = true; }); }, }, @@ -529,8 +950,8 @@ describe('service runtime', () => { input: v.undefined(), output: v.void(), handler: (_input, ctx) => { - ctx.self.setState((draft) => { - draft.bDone = true; + ctx.self.setState((state) => { + state.bDone = true; }); }, }, @@ -572,8 +993,8 @@ describe('service runtime', () => { input: v.undefined(), output: v.void(), handler: (_input, ctx) => { - ctx.self.setState((draft) => { - draft.counter += 1; + ctx.self.setState((state) => { + state.counter += 1; }); }, }, @@ -630,8 +1051,8 @@ describe('service runtime', () => { output: v.void(), handler: (_input, ctx) => { bumpCommandSpy(); - ctx.self.setState((draft) => { - draft.count += 1; + ctx.self.setState((state) => { + state.count += 1; }); }, }, @@ -645,7 +1066,7 @@ describe('service runtime', () => { await buildRuntime.runLoadOnce(queryName, undefined); // A duplicate load body would run bump twice and leave count at 2. - expect(buildRuntime.stateSignal().count).toBe(1); + expect(buildRuntime.getStateSnapshot().count).toBe(1); expect(loadBodySpy).toHaveBeenCalledTimes(1); expect(bumpCommandSpy).toHaveBeenCalledTimes(1); }); diff --git a/code/core/src/shared/open-service/service-runtime.ts b/code/core/src/shared/open-service/service-runtime.ts index 86d316b4a7cf..8651d8906fe1 100644 --- a/code/core/src/shared/open-service/service-runtime.ts +++ b/code/core/src/shared/open-service/service-runtime.ts @@ -58,8 +58,9 @@ * path because handler reads are tracked by `activeHandlerLoadSession`, which is module-scoped * and stable for the duration of a sync handler call. */ -import { produce } from 'immer'; -import { computed, effect, endBatch, signal, startBatch } from 'alien-signals'; +import { batch, computed, effect, untracked } from '@preact/signals-core'; +import { deepSignal } from 'deepsignal/core'; +import { isEqual } from 'es-toolkit/predicate'; import { OpenServiceInvalidStaticPathError, @@ -87,7 +88,6 @@ import type { ServiceRegistryApi, } from './types.ts'; -type ServiceSignal = ReturnType>; type RuntimeQueryDefinition = QueryDefinition; /** @@ -101,7 +101,8 @@ export type ServiceRuntime< TQueries extends Queries, TCommands extends Commands, > = { - stateSignal: ServiceSignal; + /** Returns a plain, detached snapshot of the current state for serialization. */ + getStateSnapshot(): TState; commandSelf: CommandSelf; queryCtx: QueryCtx; loadCtxForStatic: LoadCtx; @@ -293,28 +294,44 @@ async function drainCollector( /** * Creates the writable `self` object that backs every runtime ctx for one service instance. * - * State writes are wrapped in an alien-signals batch so one command can update multiple fields - * without causing intermediate reactive notifications between writes. + * State is a deep reactive proxy: mutations applied to `state` notify only the fine-grained signals + * for the fields that actually changed. Writes are wrapped in a batch so one command only notifies + * subscribers after the full mutation completes. */ -function createCommandSelf(stateSignal: ServiceSignal): CommandSelf { +function createCommandSelf(state: TState): CommandSelf { return { get state() { - return stateSignal(); + return state; }, setState(mutate) { - // Batch signal writes so one command only triggers subscribers after the full draft update. - startBatch(); - try { - stateSignal(produce(stateSignal(), mutate)); - } finally { - endBatch(); - } + batch(() => { + mutate(state); + }); }, queries: {}, commands: {}, }; } +/** + * Detaches a value from the reactive deep-signal proxy into a plain snapshot. + * + * Used for `selector` results on the subscription path: a selector can return a live proxy slice, + * which we strip so consumers get plain, comparable data (and reads inside the `computed` register + * only the fields the selector actually touched). Primitives pass through untouched. + * + * `structuredClone` cannot clone a `Proxy`, so this uses a JSON round-trip. That is sufficient + * because open-service state is required to be JSON-serializable (the same constraint the + * static-build pipeline relies on). For plain (already-detached) values such as the whole-state + * snapshot, the runtime uses `structuredClone` directly instead. + */ +function detachSnapshot(value: TValue): TValue { + if (value === null || typeof value !== 'object') { + return value; + } + return JSON.parse(JSON.stringify(value)) as TValue; +} + /** * Builds the runtime command map from the declarative command definitions. * @@ -368,10 +385,17 @@ function buildCommands( type QueryRuntimeRefs = { serviceId: ServiceId; commandSelf: CommandSelf; - stateSignal: ServiceSignal; + /** Deep reactive proxy backing this service's state; reads inside a computed track fine-grained. */ + state: TState; registryApi: ServiceRegistryApi; queryDefinitions: Map>; defaultQueries: Record>; + /** + * Builds a command map whose `setState` writes are dropped once `isCurrent()` returns false. + * Used by reactive subscription loads so a superseded (stale) re-run cannot overwrite the state + * produced by a newer run. + */ + buildGatedCommands: (isCurrent: () => boolean) => CommandSelf['commands']; }; /** @@ -407,7 +431,7 @@ function validateQueryOutput( } /** - * Runs the query handler synchronously and validates the resolved value. + * Runs the query handler synchronously and returns its raw result (no output validation). * * The `selfQueries` parameter lets the caller swap in load-aware wrappers when running inside a * load body or a `.loaded()` discovery pass; ordinary handler calls pass the default queries. @@ -430,14 +454,17 @@ function runHandlerSync( const handlerSelf: QuerySelf = { get state() { - return refs.stateSignal(); + return refs.state; }, queries: selfQueries, }; const handlerCtx: QueryCtx = { self: handlerSelf, getService }; - const result = queryDef.handler(validatedInput, handlerCtx); - return validateQueryOutput(refs, queryName, queryDef, result); + // The handler result is returned raw. Output validation is intentionally *not* run here: this + // function executes inside the subscription `computed`, where reading the whole value to validate + // it would broaden the reactive dependency footprint. Validation runs at the call sites instead — + // see `validateQueryOutput` usages. + return queryDef.handler(validatedInput, handlerCtx); } /** @@ -540,7 +567,7 @@ async function runLoadBody( const wrappedQueries = buildLoadWrappedQueries(refs, ancestorChain, collector); const loadSelf: LoadSelf = { get state() { - return refs.stateSignal(); + return refs.state; }, queries: wrappedQueries, commands: refs.commandSelf.commands as LoadSelf['commands'], @@ -551,6 +578,43 @@ async function runLoadBody( await drainCollector(collector, undefined, refs.serviceId, queryName); } +/** + * Runs a query's `load` as the body of a subscription's reactive effect. + * + * Unlike {@link runLoadBody}, this is invoked synchronously inside an `effect()`, so the external + * signals the load reads in its synchronous prefix are tracked — when they later change, the effect + * re-runs and the load re-fires, turning it into a reactive async resource. Loads are therefore an + * idempotent warming step (the documented guideline, now a hard contract); one-shot side effects + * belong in a command. + * + * Writes go through gated commands: if `isCurrent()` flips to false because a newer run started + * (deps changed again), this run's later writes are dropped, so a slow stale load cannot clobber the + * newer result. Sibling reads use the default queries — their loads fire-and-forget — so transitive + * dependencies stay warm without the `.loaded()` drain, which is only meaningful for awaited pulls. + */ +async function runReactiveLoad( + refs: QueryRuntimeRefs, + queryName: string, + queryDef: RuntimeQueryDefinition, + validatedInput: unknown, + isCurrent: () => boolean +): Promise { + if (!queryDef.load) { + return; + } + + const loadSelf: LoadSelf = { + get state() { + return refs.state; + }, + queries: refs.defaultQueries, + commands: refs.buildGatedCommands(isCurrent) as LoadSelf['commands'], + }; + const loadCtx: LoadCtx = { self: loadSelf, getService: refs.registryApi.getService }; + + await Promise.resolve(queryDef.load(validatedInput, loadCtx)); +} + /** * Builds the wrapped `self.queries` map exposed inside one load body. * @@ -605,13 +669,18 @@ function buildLoadWrappedQueries( } } - return runHandlerSync( + return validateQueryOutput( refs, name, queryDef, - validatedInput, - wrappedQueries, - refs.registryApi.getService + runHandlerSync( + refs, + name, + queryDef, + validatedInput, + wrappedQueries, + refs.registryApi.getService + ) ); }) as Query; @@ -763,17 +832,25 @@ async function runLoaded( hasMoreWork = session.collector.size > 0; } + // Run the final handler call under the session so already-settled dependency loads are not + // refired during this last evaluation, and validate the output at this pull boundary (validation + // is intentionally kept out of the reactive `runHandlerSync` path). const previousSession = activeHandlerLoadSession; activeHandlerLoadSession = session; try { - return runHandlerSync( + return validateQueryOutput( refs, queryName, queryDef, - validatedInput, - refs.defaultQueries, - refs.registryApi.getService + runHandlerSync( + refs, + queryName, + queryDef, + validatedInput, + refs.defaultQueries, + refs.registryApi.getService + ) ); } finally { activeHandlerLoadSession = previousSession; @@ -844,40 +921,71 @@ function createDefaultQuery( } } - return runHandlerSync( + // Validate the output on this pull boundary. Validation runs off the reactive path, so it never + // affects the deep-signal dependency graph; the validated value is what the consumer receives. + return validateQueryOutput( refs, queryName, queryDef, - validatedInput, - refs.defaultQueries, - refs.registryApi.getService + runHandlerSync( + refs, + queryName, + queryDef, + validatedInput, + refs.defaultQueries, + refs.registryApi.getService + ) ); }) as Query; query.loaded = (input: unknown) => runLoaded(refs, queryName, queryDef, input); - query.subscribe = (input: unknown, callback: (value: unknown) => void): (() => void) => - subscribeToQuery(refs, queryName, queryDef, input, callback); + query.subscribe = (( + input: unknown, + selectorOrCallback: ((value: unknown) => unknown) | ((value: unknown) => void), + maybeCallback?: (value: unknown) => void + ): (() => void) => + subscribeToQuery( + refs, + queryName, + queryDef, + input, + maybeCallback ? (selectorOrCallback as (value: unknown) => unknown) : undefined, + maybeCallback ?? (selectorOrCallback as (value: unknown) => void) + )) as Query['subscribe']; return query; } /** - * Subscribes to a query by running its handler under an alien-signals `computed()` and `effect()`. + * Subscribes to a query by running its handler inside a deep-signal-aware `computed()` and + * notifying through an `effect()`. * * The first emission is deferred to a microtask so callers always receive their unsubscribe handle * before the callback fires. The runtime kicks `load` off in the background but does not wait for - * it — subscribers see the current state immediately and a follow-up emission once the load - * settles and tracked state changes. + * it — subscribers see the current state immediately and a follow-up emission once the load settles + * and tracked state changes. + * + * Two layers keep emissions precise: + * + * 1. **Fine-grained reads** — the handler (and the optional `selector`) read through the deep-signal + * proxy, so the computed only re-runs when the exact fields it touched change. A write to an + * unrelated key or field never re-runs the handler. + * 2. **Value dedup** — the emitted value is detached into a plain snapshot and compared with the + * previously emitted snapshot via `isEqual`. A re-run that produces a deeply-equal value (e.g. a + * load that rewrites an equal payload) does not fire the callback. With a `selector`, only the + * selected slice is snapshotted and compared, so subscribers depend on exactly the data they use. */ function subscribeToQuery( refs: QueryRuntimeRefs, queryName: string, queryDef: RuntimeQueryDefinition, rawInput: unknown, + selector: ((value: unknown) => unknown) | undefined, callback: (value: unknown) => void ): () => void { let active = true; let teardown: (() => void) | undefined; + let loadTeardown: (() => void) | undefined; Promise.resolve().then(() => { if (!active) { @@ -893,47 +1001,79 @@ function subscribeToQuery( } if (queryDef.load) { - const loadKey = makeLoadKey(refs.serviceId, queryName, validatedInput); - const pendingLoad = triggerLoad( - refs, - queryName, - queryDef, - validatedInput, - loadKey, - EMPTY_SET - ); - // Subscribers do not block on rejections, but we still want them visible to global handlers. - pendingLoad.catch(rethrowAsync); + // Reactive load: run the load inside an effect so the external signals it reads synchronously + // are tracked. When they change, the effect re-runs and the load re-fires, keeping an + // asynchronously-produced value fresh. `epoch` gates writes so a superseded run can't clobber + // a newer one. Loads with no external synchronous reads (the existing ones) track nothing and + // therefore fire exactly once, so this is non-breaking. + let epoch = 0; + loadTeardown = effect(() => { + const myEpoch = ++epoch; + runReactiveLoad(refs, queryName, queryDef, validatedInput, () => myEpoch === epoch).catch( + rethrowAsync + ); + }); } - const comp = computed(() => - runHandlerSync( + // The output is always validated, but the computed's dependency footprint must match only what + // the subscriber consumes: + // - With a `selector`, validation runs untracked (so reading the whole value to validate it + // does not register dependencies), and only the selected fields the selector reads are + // tracked. A sibling field the selector ignores never re-runs this computed. + // - Without a selector, validation runs tracked: reading the whole value is the correct + // footprint for a whole-output subscriber, and the validated value is emitted (identical to + // what a direct `query()` / `.loaded()` pull returns). + const comp = computed(() => { + const output = runHandlerSync( refs, queryName, queryDef, validatedInput, refs.defaultQueries, refs.registryApi.getService - ) - ); + ); + if (selector) { + const validated = untracked(() => validateQueryOutput(refs, queryName, queryDef, output)); + // Read the live handler output so the selector's field accesses stay on the reactive + // proxy; validation returns a plain parsed value that cannot carry those dependencies. + selector(output); + return detachSnapshot(selector(validated)); + } + return validateQueryOutput(refs, queryName, queryDef, output); + }); + + let hasEmitted = false; + let lastEmitted: unknown; teardown = effect(() => { let value: unknown; try { - value = comp(); + value = comp.value; } catch (error) { rethrowAsync(error); return; } - if (active) { - callback(value); + if (!active) { + return; + } + + // Skip re-runs that did not change the (selected) value. The computed already gates on + // fine-grained reads; this gates on the rarer case where tracked state changed but the + // emitted value is deeply equal (e.g. a load rewriting an equal payload). + if (hasEmitted && isEqual(value, lastEmitted)) { + return; } + + hasEmitted = true; + lastEmitted = value; + callback(value); }); }); return () => { active = false; teardown?.(); + loadTeardown?.(); }; } @@ -966,9 +1106,16 @@ export function createServiceRuntime< }, initialState: TState = def.initialState ): ServiceRuntime { - // The signal is the single source of truth that query computations subscribe to. - const stateSignal = signal(initialState); - const commandSelf = createCommandSelf(stateSignal); + // `initialState` is the plain backing object that the deep-signal proxy writes through to; it + // stays in sync with every mutation and is the source for serialization snapshots. The runtime + // mutates it in place, so callers that share an object (e.g. a definition's `initialState`) must + // pass their own copy — `registerService` and the static build each do. + const rawState = initialState; + // The deep reactive proxy is the single source of truth that query computations track, at + // per-field granularity. + const state = deepSignal(rawState as object) as TState; + const getStateSnapshot = (): TState => structuredClone(rawState); + const commandSelf = createCommandSelf(state); const { registryApi } = runtimeOptions; const createCommandCtx = (): CommandCtx => ({ self: commandSelf, @@ -986,13 +1133,41 @@ export function createServiceRuntime< Object.entries(def.queries) as [string, RuntimeQueryDefinition][] ); const defaultQueries: Record> = {}; + + // Gated commands for reactive subscription loads: a stale run's writes are dropped once a newer + // run has started (`isCurrent()` returns false), so superseded loads cannot clobber fresh state. + const buildGatedCommands = (isCurrent: () => boolean): CommandSelf['commands'] => { + const gatedSelf: CommandSelf = { + get state() { + return state; + }, + setState(mutate) { + if (!isCurrent()) { + return; + } + batch(() => { + mutate(state); + }); + }, + queries: defaultQueries, + commands: {}, + }; + const gated = buildCommands(def.id, def.commands, () => ({ + self: gatedSelf, + getService: registryApi.getService, + })); + gatedSelf.commands = gated as CommandSelf['commands']; + return gated as CommandSelf['commands']; + }; + const refs: QueryRuntimeRefs = { serviceId: def.id, commandSelf, - stateSignal, + state, registryApi, queryDefinitions, defaultQueries, + buildGatedCommands, }; // Build queries after commands so handler/load ctx surfaces resolve the same command map. @@ -1005,7 +1180,7 @@ export function createServiceRuntime< const queries = defaultQueries as ServiceInstance['queries']; const queryCtxSelf: QuerySelf = { get state() { - return stateSignal(); + return state; }, queries: defaultQueries, }; @@ -1013,7 +1188,7 @@ export function createServiceRuntime< const loadCtxForStatic: LoadCtx = { self: { get state() { - return stateSignal(); + return state; }, queries: defaultQueries, commands: commands as LoadSelf['commands'], @@ -1057,7 +1232,7 @@ export function createServiceRuntime< }; return { - stateSignal, + getStateSnapshot, commandSelf, queryCtx, loadCtxForStatic, diff --git a/code/core/src/shared/open-service/services/docgen/server.ts b/code/core/src/shared/open-service/services/docgen/server.ts index 6989b21c2df4..04a7b25e8931 100644 --- a/code/core/src/shared/open-service/services/docgen/server.ts +++ b/code/core/src/shared/open-service/services/docgen/server.ts @@ -66,8 +66,8 @@ export function registerDocgenService(options: RegisterDocgenServiceOptions) { return undefined; } - ctx.self.setState((draft) => { - draft.components[input.componentId] = payload; + ctx.self.setState((state) => { + state.components[input.componentId] = payload; }); return payload; }, diff --git a/code/core/src/shared/open-service/types.ts b/code/core/src/shared/open-service/types.ts index 3fe77efe6a6f..3203d560cb74 100644 --- a/code/core/src/shared/open-service/types.ts +++ b/code/core/src/shared/open-service/types.ts @@ -80,7 +80,17 @@ export type CommandFunctions< export type Query = { (input: TInput): TOutput; loaded(input: TInput): Promise; + /** + * Subscribe to a query. The callback fires once with the current value and again whenever the + * tracked state it reads changes. An optional `selector` narrows what the subscriber depends on: + * the callback receives the selected slice and only fires when that slice changes by value. + */ subscribe(input: TInput, callback: (value: TOutput) => void): () => void; + subscribe( + input: TInput, + selector: (value: TOutput) => TSelected, + callback: (selected: TSelected) => void + ): () => void; }; /** @@ -114,7 +124,7 @@ export type LoadSelf< /** * Mutable service handle exposed to command handlers. * - * Commands receive both `setState` for direct draft mutation and `commands` so one command can + * Commands receive both `setState` for direct state mutation and `commands` so one command can * delegate to another within the same service. */ export type CommandSelf< @@ -123,7 +133,7 @@ export type CommandSelf< TCommandOutputSchemas extends MatchingOutputSchemas = MatchingOutputSchemas, > = LoadSelf & { - setState(mutate: (draft: TState) => void): void; + setState(mutate: (state: TState) => void): void; }; export type ServiceSummary = { diff --git a/yarn.lock b/yarn.lock index 63f7c3e28951..47d72ef923b8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5504,6 +5504,13 @@ __metadata: languageName: node linkType: hard +"@preact/signals-core@npm:^1.14.2": + version: 1.14.2 + resolution: "@preact/signals-core@npm:1.14.2" + checksum: 10c0/898e6e22a5d2a11bd3d5b109c8d9bacff0e9bc9f23c01455901b7feab8c441dc03fdd53cfd7b5f9275b0df052fcaeb8c8928f537cebb190d9e02a6f2a3ac441e + languageName: node + linkType: hard + "@radix-ui/number@npm:1.1.0": version: 1.1.0 resolution: "@radix-ui/number@npm:1.1.0" @@ -12542,7 +12549,7 @@ __metadata: languageName: node linkType: hard -"alien-signals@npm:^3.0.0, alien-signals@npm:^3.2.0": +"alien-signals@npm:^3.0.0": version: 3.2.1 resolution: "alien-signals@npm:3.2.1" checksum: 10c0/4c4064faa208126177224d1ed6a2310687d452dec0771994e276d9af4c72e853fcb969ae4a7fcd034b1d1b9accb9500f4941178326eeea1cb8f64ec612853ef8 @@ -15768,6 +15775,27 @@ __metadata: languageName: node linkType: hard +"deepsignal@npm:^1.6.0": + version: 1.6.0 + resolution: "deepsignal@npm:1.6.0" + peerDependencies: + "@preact/signals": ^1.1.4 || ^2.0.0 + "@preact/signals-core": ^1.5.1 + "@preact/signals-react": ^1.3.8 || ^2.0.0 || ^3.0.0 + preact: ^10.16.0 + peerDependenciesMeta: + "@preact/signals": + optional: true + "@preact/signals-core": + optional: true + "@preact/signals-react": + optional: true + preact: + optional: true + checksum: 10c0/76664442ba8b9e33d54bcbf6131e19712d4b0b9d5705213c27a09bff9fb32c7772053047f143450f158728d95de09ead31ed5fe6917a70c30b70fdd1e3c2e60b + languageName: node + linkType: hard + "default-browser-id@npm:3.0.0": version: 3.0.0 resolution: "default-browser-id@npm:3.0.0" @@ -29599,6 +29627,7 @@ __metadata: "@happy-dom/global-registrator": "npm:^20.0.11" "@ngard/tiny-isequal": "npm:^1.1.0" "@polka/compression": "npm:^1.0.0-next.28" + "@preact/signals-core": "npm:^1.14.2" "@radix-ui/react-scroll-area": "npm:1.2.0-rc.7" "@radix-ui/react-slot": "npm:^1.0.2" "@react-aria/interactions": "npm:^3.25.5" @@ -29643,7 +29672,6 @@ __metadata: "@yarnpkg/libzip": "npm:2.3.0" acorn: "npm:^8.15.0" acorn-jsx: "npm:^5.3.2" - alien-signals: "npm:^3.2.0" ansi-to-html: "npm:^0.7.2" browser-dtector: "npm:^3.4.0" bundle-require: "npm:^5.1.0" @@ -29654,6 +29682,7 @@ __metadata: copy-to-clipboard: "npm:^3.3.1" cross-spawn: "npm:^7.0.6" deep-object-diff: "npm:^1.1.0" + deepsignal: "npm:^1.6.0" dequal: "npm:^2.0.2" detect-indent: "npm:^7.0.1" detect-port: "npm:^1.6.1"