Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions code/.storybook/open-service-debug-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ function createDebugServiceDef(storyIndexGeneratorPromise: Promise<StoryIndexGen
output: v.undefined(),
handler: async (input, ctx) => {
logger.warn(`[open-service debug] command addActivity(${input.message})`);
ctx.self.setState((draft) => {
draft.activity.push(input.message);
ctx.self.setState((state) => {
state.activity.push(input.message);
});

return undefined;
Expand All @@ -115,10 +115,10 @@ function createDebugServiceDef(storyIndexGeneratorPromise: Promise<StoryIndexGen
logger.warn(
`[open-service debug] command syncStoryIndex(${input.reason}) => ${Object.keys(storyIndex.entries).length} entries`
);
ctx.self.setState((draft) => {
draft.storyIndexEntryCount = Object.keys(storyIndex.entries).length;
draft.storyIndexSampleIds = sampleIds;
draft.activity.push(`syncStoryIndex:${input.reason}:${sampleIds.length}`);
ctx.self.setState((state) => {
state.storyIndexEntryCount = Object.keys(storyIndex.entries).length;
state.storyIndexSampleIds = sampleIds;
state.activity.push(`syncStoryIndex:${input.reason}:${sampleIds.length}`);
});

return undefined;
Expand All @@ -137,10 +137,10 @@ function createDebugServiceDef(storyIndexGeneratorPromise: Promise<StoryIndexGen
logger.warn(
`[open-service debug] command recordPreloadVisit(${input.entryId}, ${input.source}) => ${value}`
);
ctx.self.setState((draft) => {
draft.preloadedByEntryId[input.entryId] = value;
draft.lastObservedValue = value;
draft.activity.push(`recordPreloadVisit:${input.entryId}:${input.source}`);
ctx.self.setState((state) => {
state.preloadedByEntryId[input.entryId] = value;
state.lastObservedValue = value;
state.activity.push(`recordPreloadVisit:${input.entryId}:${input.source}`);
});

return undefined;
Expand Down
3 changes: 2 additions & 1 deletion code/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@
"!src/**/*"
],
"dependencies": {
"@preact/signals-core": "^1.14.2",
"@storybook/global": "^5.0.0",
"@storybook/icons": "^2.0.2",
"@testing-library/dom": "^10.4.1",
Expand All @@ -241,6 +242,7 @@
"@vitest/expect": "3.2.4",
"@vitest/spy": "3.2.4",
"@webcontainer/env": "^1.1.1",
"deepsignal": "^1.6.0",
"esbuild": "^0.18.0 || ^0.19.0 || ^0.20.0 || ^0.21.0 || ^0.22.0 || ^0.23.0 || ^0.24.0 || ^0.25.0 || ^0.26.0 || ^0.27.0",
"open": "^10.2.0",
"oxc-parser": "^0.127.0",
Expand Down Expand Up @@ -304,7 +306,6 @@
"@yarnpkg/libzip": "2.3.0",
"acorn": "^8.15.0",
"acorn-jsx": "^5.3.2",
"alien-signals": "^3.2.0",
"ansi-to-html": "^0.7.2",
"browser-dtector": "^3.4.0",
"bundle-require": "^5.1.0",
Expand Down
59 changes: 48 additions & 11 deletions code/core/src/shared/open-service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ Its goals are:

- define stateful services in one declarative object
- expose synchronous queries and async commands with strong TypeScript inference
- validate all query and command input/output through Standard Schema
- support reactive query subscriptions through `alien-signals`
- validate all query and command input/output through Standard Schema (schemas may transform/coerce)
- support fine-grained reactive query subscriptions through deep signals (`deepsignal` +
`@preact/signals-core`)
- support server-side static state snapshots driven by query `load` hooks

The main audience for this README is agents and maintainers who need to understand how the pieces
Expand Down Expand Up @@ -94,6 +95,15 @@ Query handlers do **not** receive `commands` or `setState`. Mutations belong in

`load` mutations must go through commands. Cross-service `getService(...).queries.*` calls inside a load body are not auto-tracked for the drain; use `await ctx.getService(id).queries.foo.loaded(input)` when you need a cross-service dependency awaited before your own load completes.

**`load` is a reactive, idempotent warming step.** For an active subscription, `load` re-fires whenever the external signals it reads synchronously change — same-service fields and cross-service reads via `getService(...).queries.*` alike — turning a query into a reactive async resource (like a TanStack Query / SolidJS `createResource` / Vue async `watchEffect`). This means:

- **`load` must be idempotent.** Re-running it with the same dependencies must produce the same state. Any genuinely one-shot side effect belongs in a command invoked conditionally, never in `load` itself.
- **Read dependencies synchronously, up front.** Only reads in the load's synchronous prefix (before the first `await`) are tracked. Read the values you depend on first, then do async work — the same idiom every signal-based resource uses.
- **Loads that read no external signal fire exactly once** (the common case: `await ctx.self.commands.x(input)`), so existing loads are unaffected.
- Direct `query()` / `.loaded()` calls are **not** reactive — they keep one-shot-per-call semantics. Reactivity is scoped to subscriptions and is torn down when the last subscriber unsubscribes.

The runtime guards re-firing: a superseded run (its dependencies changed again before it finished) cannot overwrite a newer run's state, and changes batched together produce a single re-load.

**Keep `load` bodies as small as possible.** Almost always, `load` should be a one-liner that calls a command — the real work (input resolution, side effects, validation, state mutation) belongs in the command. This pays off for three reasons:

- **Reusability.** Anyone can call the command directly (other services, tests, integrations) without going through the query's load path. Logic stuck inside a load is unreachable from outside the drain.
Expand Down Expand Up @@ -158,7 +168,12 @@ Both must be Standard Schema compatible.
The runtime validates:

- caller input before a handler runs
- handler output before the result is returned or emitted
- handler output when a value is produced for a consumer — a direct `query()` call, `query.loaded()`,
the static build, and a subscription emission

Output validation reads the whole value, so it is kept out of the part of a subscription that
determines reactive dependencies: for a `selector` subscriber it runs without tracking, so it cannot
expand the deep-signal dependency footprint. (See "Subscription Flow".)

Queries validate **synchronously**. Their input and output schemas must produce sync results. If a Standard Schema returns a Promise during a query validation, the runtime throws `OpenServiceAsyncSchemaError` immediately.

Expand Down Expand Up @@ -250,15 +265,37 @@ When an **async** `load` body runs, it instead gets a *wrapped* `ctx.self.querie

Cross-service `ctx.getService(id).queries.*` calls inside a load body are **not** wrapped; authors must use `.loaded()` explicitly when they need a cross-service dep awaited from inside a load. From a sync handler, cross-service queries are tracked because they consult the module-scoped session like any other call.

## State and reactivity

State is a **deep reactive proxy** (`deepSignal` from `deepsignal`, backed by `@preact/signals-core`)
created in [service-runtime.ts](./service-runtime.ts). There is no top-level state atom and no Immer:

- Reading a field through `ctx.self.state` tracks a fine-grained signal for exactly that field
(including not-yet-present record keys, which fire when the key is later added).
- `setState((state) => …)` mutates the proxy **in place** inside a batch, so one command notifies
subscribers once, and only the fields it actually changed are invalidated.
- The proxy is internal and does not escape:
- Query/`.loaded()` results are the schema-validated value. For object and array schemas that
rebuild a plain value, this also detaches the result from the proxy.
- Subscription emissions are detached to plain values (validated for whole-value subscribers, or
JSON-stripped for `selector` slices).
- The whole-state snapshot for the static build uses `structuredClone` of the plain backing
object. (`structuredClone` cannot clone a proxy, so proxy-slice stripping uses a JSON round-trip;
state must be JSON-serializable, the same constraint the static-build pipeline relies on.)

## Subscription Flow

Subscriptions are implemented with `alien-signals` in [service-runtime.ts](./service-runtime.ts):
Subscriptions are implemented in [service-runtime.ts](./service-runtime.ts):

1. `subscribe(input, callback)` defers all work to a microtask.
2. The microtask validates the input synchronously and fires the dependency's `load` in the background.
3. A `computed()` value wraps the synchronous handler. An `effect()` runs the handler immediately (delivering the current value to the callback) and re-runs whenever the handler's tracked state dependencies change.
4. Subscribers receive the current state right away, then a follow-up emission once the load settles and state changes. UI consumers that want to suppress the pre-load emission should branch on the value (e.g. show a spinner for `null`).
5. Each emitted value is output-validated before the subscriber callback runs.
1. `subscribe(input, callback)` (or `subscribe(input, selector, callback)`) defers all work to a microtask.
2. The microtask validates the input synchronously. If the query has a `load`, it is run inside its own `effect()` so the external signals it reads synchronously are tracked: when they change, the effect re-runs and the load re-fires (see "Load"). Writes from a superseded run are dropped (each run carries an epoch; `setState` is gated on it), so a slow stale load can't clobber a newer result. The effect is torn down with the subscription.
3. A `computed()` runs the synchronous handler against the deep-signal proxy, so its dependency footprint is exactly what it reads. The output is always validated, but where validation runs depends on the subscription:
- **No selector:** the value is validated here and emitted. Reading the whole value to validate it is the correct footprint for a whole-value subscriber, and it keeps the emitted value identical to a direct `query()` pull.
- **With a selector:** validation runs untracked (so it does not register dependencies) and only `selector(value)` is read (then detached to a plain snapshot), so a sibling field the selector ignores never re-runs the handler.
4. An `effect()` runs the computed immediately (delivering the current value) and re-runs only when the computed's tracked fields change. A write to an unrelated key or field never re-runs the handler.
5. Subscribers receive the current state right away, then a follow-up emission once the load settles and state changes. UI consumers that want to suppress the pre-load emission should branch on the value (e.g. show a spinner for `null`).
6. Emissions are deduped by value: the effect compares the new value with the last emitted one via `es-toolkit` `isEqual` and skips the callback when they are equal. So a load that rewrites a deeply-equal value does not re-fire subscribers.
7. The optional `selector` is the `universal-store` pattern: the callback receives the selected slice and fires only when that slice changes by value — and, because the selector drives the computed's reads, an unselected field change does not even re-run the handler.

Tests should use `vi.waitFor(...)` when asserting the first emission or follow-up emissions.

Expand Down Expand Up @@ -363,8 +400,8 @@ export const exampleServiceDef = defineService({
input: entryIdSchema,
output: v.void(),
handler: async (input, ctx) => {
ctx.self.setState((draft) => {
draft.values[input.entryId] = 'ready';
ctx.self.setState((state) => {
state.values[input.entryId] = 'ready';
});
},
},
Expand Down
71 changes: 60 additions & 11 deletions code/core/src/shared/open-service/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ export const mutableRecordLookupServiceDef = defineService({
input: assignEntryFieldInputSchema,
output: voidOutputSchema,
handler: (input, ctx) => {
ctx.self.setState((draft) => {
draft[input.entryId] ??= {};
draft[input.entryId]![input.fieldKey] = input.fieldValue;
ctx.self.setState((state) => {
state[input.entryId] ??= {};
state[input.entryId]![input.fieldKey] = input.fieldValue;
});
},
},
Expand Down Expand Up @@ -82,8 +82,8 @@ export const awaitedPreloadValueServiceDef = defineService({
output: voidOutputSchema,
handler: async (input, ctx) => {
await Promise.resolve();
ctx.self.setState((draft) => {
draft[input.entryId] = 'preloaded';
ctx.self.setState((state) => {
state[input.entryId] = 'preloaded';
});
},
},
Expand Down Expand Up @@ -116,14 +116,63 @@ export const fireAndForgetPreloadValueServiceDef = defineService({
output: voidOutputSchema,
handler: async (input, ctx) => {
await Promise.resolve();
ctx.self.setState((draft) => {
draft[input.entryId] = 'preloaded';
ctx.self.setState((state) => {
state[input.entryId] = 'preloaded';
});
},
},
},
});

export type RebuiltValue = { marker: string; count: number };
export type RebuiltValueState = Record<string, RebuiltValue | undefined>;

/** Shared schema for the object value used by the rebuilt-equal-value fixture. */
export const rebuiltValueOutputSchema = v.nullable(
v.object({ marker: v.string(), count: v.number() })
);

/**
* Service fixture whose `load` rebuilds a deeply-equal but freshly-allocated object value every
* time it runs, then writes it back via a command.
*
* It exercises the case where re-subscribing to an already-populated entry would emit a redundant
* second value (the immediate emission plus a load-driven emission carrying an equal-but-not-
* identical object) unless the runtime dedups by value.
*/
export const rebuiltEqualValueOnLoadServiceDef = defineService({
id: 'internal-fixture/rebuilt-equal-value-on-load',
description: 'Rewrites a deeply-equal but freshly-allocated object value on every load.',
initialState: {} as RebuiltValueState,
queries: {
getRebuiltValue: {
description: 'Returns the value for an entry; load always rewrites a fresh-but-equal value.',
input: entryIdInputSchema,
output: rebuiltValueOutputSchema,
handler: (input, ctx) => ctx.self.state[input.entryId] ?? null,
load: async (input, ctx) => {
await ctx.self.commands.rebuildValue(input);
},
},
},
commands: {
rebuildValue: {
description: 'Allocates a brand-new object with a stable value and stores it.',
input: entryIdInputSchema,
output: rebuiltValueOutputSchema,
handler: async (input, ctx) => {
await Promise.resolve();
// A new object literal every call: deeply equal to any prior value, never `===` to it.
const value: RebuiltValue = { marker: 'stable', count: 1 };
ctx.self.setState((state) => {
state[input.entryId] = value;
});
return value;
},
},
},
});

export type SharedStaticFileState = { left?: string; right?: string };

/** Creates a fixture where multiple queries contribute state to one shared static file. */
Expand Down Expand Up @@ -162,8 +211,8 @@ export function createSharedStaticFileServiceDef() {
input: noInputSchema,
output: voidOutputSchema,
handler: (_input, ctx) => {
ctx.self.setState((draft) => {
draft.left = 'preloaded';
ctx.self.setState((state) => {
state.left = 'preloaded';
});
},
},
Expand All @@ -172,8 +221,8 @@ export function createSharedStaticFileServiceDef() {
input: noInputSchema,
output: voidOutputSchema,
handler: (_input, ctx) => {
ctx.self.setState((draft) => {
draft.right = 'preloaded';
ctx.self.setState((state) => {
state.right = 'preloaded';
});
},
},
Expand Down
12 changes: 6 additions & 6 deletions code/core/src/shared/open-service/index.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ const openServiceDef = defineService({
output: v.void(),
handler: (input, ctx) => {
expectTypeOf(input).toEqualTypeOf<number>();
ctx.self.setState((draft) => {
expectTypeOf(draft).toEqualTypeOf<OpenServiceState>();
draft.count += input;
ctx.self.setState((state) => {
expectTypeOf(state).toEqualTypeOf<OpenServiceState>();
state.count += input;
});
},
},
Expand All @@ -80,9 +80,9 @@ const openServiceDef = defineService({
output: v.void(),
handler: async (input, ctx) => {
expectTypeOf(input).toEqualTypeOf<{ entryId: string }>();
ctx.self.setState((draft) => {
expectTypeOf(draft.valuesById[input.entryId]).toEqualTypeOf<string | undefined>();
draft.valuesById[input.entryId] = 'ready';
ctx.self.setState((state) => {
expectTypeOf(state.valuesById[input.entryId]).toEqualTypeOf<string | undefined>();
state.valuesById[input.entryId] = 'ready';
});
},
},
Expand Down
8 changes: 4 additions & 4 deletions code/core/src/shared/open-service/server.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ const registeredService = registerService(registrationOnlyServiceDef, {
increment: {
handler: (input, ctx) => {
expectTypeOf(input).toEqualTypeOf<number>();
ctx.self.setState((draft) => {
draft.count += input;
ctx.self.setState((state) => {
state.count += input;
});
},
},
preloadValue: {
handler: async (input, ctx) => {
expectTypeOf(input).toEqualTypeOf<{ entryId: string }>();
ctx.self.setState((draft) => {
draft.valuesById[input.entryId] = 'ready';
ctx.self.setState((state) => {
state.valuesById[input.entryId] = 'ready';
});
},
},
Expand Down
24 changes: 12 additions & 12 deletions code/core/src/shared/open-service/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ describe('server static builds', () => {
entryId: 'entry-a',
});

ctx.self.setState((draft) => {
draft.value = record?.marker ?? null;
ctx.self.setState((state) => {
state.value = record?.marker ?? null;
});

return undefined;
Expand Down Expand Up @@ -167,8 +167,8 @@ describe('server static builds', () => {
output: v.undefined(),
handler: async (_input, ctx) => {
readyEntryIds.splice(0, readyEntryIds.length, 'entry-a');
ctx.self.setState((draft) => {
draft.built = true;
ctx.self.setState((state) => {
state.built = true;
});

return undefined;
Expand Down Expand Up @@ -200,8 +200,8 @@ describe('server static builds', () => {
input: v.object({ entryId: v.string() }),
output: v.undefined(),
handler: async (input, ctx) => {
ctx.self.setState((draft) => {
draft.value = input.entryId;
ctx.self.setState((state) => {
state.value = input.entryId;
});

return undefined;
Expand Down Expand Up @@ -281,8 +281,8 @@ describe('server static builds', () => {
}),
output: v.undefined(),
handler: async (input, ctx) => {
ctx.self.setState((draft) => {
draft.value = input.value;
ctx.self.setState((state) => {
state.value = input.value;
});

return undefined;
Expand Down Expand Up @@ -370,8 +370,8 @@ describe('server static builds', () => {
input: v.undefined(),
output: v.undefined(),
handler: async (_input, ctx) => {
ctx.self.setState((draft) => {
draft.value = 'invalid';
ctx.self.setState((state) => {
state.value = 'invalid';
});

return undefined;
Expand Down Expand Up @@ -427,8 +427,8 @@ describe('server static builds', () => {
}),
output: v.undefined(),
handler: async (input, ctx) => {
ctx.self.setState((draft) => {
draft.value = input.value;
ctx.self.setState((state) => {
state.value = input.value;
});

return undefined;
Expand Down
Loading
Loading