diff --git a/.cursor/rules/sdk/docs/request-lifecycle-system.mdc b/.cursor/rules/sdk/docs/request-lifecycle-system.mdc index 7f9e1cec05..b1379f6905 100644 --- a/.cursor/rules/sdk/docs/request-lifecycle-system.mdc +++ b/.cursor/rules/sdk/docs/request-lifecycle-system.mdc @@ -1,5 +1,5 @@ --- -description: Request Lifecycle System - design rationale, migration roadmap, and FAQ for RequestRegistry / RequestContext / DisposableScope +description: Request Lifecycle System - design rationale and FAQ for RequestRegistry / RequestContext / DisposableScope alwaysApply: false --- @@ -37,7 +37,7 @@ The pre-0.11.0 wire contract was `cancel({ operation: "inference", modelId })` Each cancel/error branch had to: `unlink` the file, `clearCacheRegistry({ cacheKey, modelId })`, `cachedMessageCounts.delete(...)`. The three branches duplicated this trio; any one of them could drift from the others on a partial failure. -**M2 (shipped)** introduces `KvCacheSession` (`server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts`) as the single owner of all three layers. The completion handler now calls `session.beginTurn(...)`, registers `scope.defer(() => session.rollback(turn))` once, and calls `session.commitTurn(...)` on the happy path (which suppresses the deferred rollback). The three duplicated cleanup blocks are gone; the maps that backed `clearCacheRegistry` and `cachedMessageCounts` are now private to the session module. Cross-model administrative cleanups go through the module-level `deleteKvCacheState(...)`, which `handleDeleteCache` delegates to. +`KvCacheSession` (`server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts`) is the single owner of all three layers. The completion handler calls `session.beginTurn(...)`, registers `scope.defer(() => session.rollback(turn))` once, and calls `session.commitTurn(...)` on the happy path (which suppresses the deferred rollback). The maps that back `clearCacheRegistry` and `cachedMessageCounts` are private to the session module. Cross-model administrative cleanups go through the module-level `deleteKvCacheState(...)`, which `handleDeleteCache` delegates to. ### 3. Cancellation tracked via side counters @@ -72,22 +72,9 @@ Concretely, Vercel's AI SDK (`streamText`) is a public-codebase example of the s | `throw new Error("cancelled")` | Loses partial state across RPC | `InferenceCancelledError` from `@/utils/errors-server` (re-exported from `@qvac/sdk`); carries `requestId` + `partial: { text, toolCalls, stats }` and is constructed client-side from the `stopReason: "cancelled"` event | | Reaching into `cachedMessageCounts` / `clearCacheRegistry` / `fsPromises.unlink` together | Drift across the three KV-cache layers | `KvCacheSession` (`session.beginTurn` + `scope.defer(session.rollback)` + `session.commitTurn`) — see `docs/kv-cache-system.mdc` | -`request-lifecycle-primitives.mdc` has the worked code examples. +`request-lifecycle-primitives.mdc` has the worked code examples and the dispatch-level truth table for which `RequestKind`s currently route through the registry. -## Migration Roadmap - -The full milestone breakdown lives in `tasks/release-0.11.0-planning/pitch-2-tasks.md` (workspace-local; not committed to the repo). Headline: - -| Milestone | Wire contract | Scope | -|-------------|-------------------------------------------------------------------------------|------------------------------------------------------------------------------------| -| **M1** ✅ | `cancel({ requestId })` lands; old `cancel({ modelId })` still works | llama.cpp completion handler | -| **M2** ✅ | `stopReason: "cancelled"` on `completionDone`; `InferenceCancelledError` (52419) rejects `CompletionRun` promise-aggregates with a `partial` payload while the `events` stream ends normally; `KvCacheSession` consolidates the three KV-cache layers (and `handleDeleteCache` now delegates to `deleteKvCacheState`); Stop-button race closed via a bounded `cancelled-before-begin` map in the registry | (M2 added `InferenceCancelledError`; no new handlers) | -| **M3a** ✅ | Framework primitives: per-handler `cancel: { scope, hard? }` declaration on `PluginHandlerDefinition`; `RequestRegistry.policy({ kind, oneAtATimePerModel })` admission control + `RequestRejectedByPolicyError` (52420); structured `[request-lifecycle] begin/cancel/end` logging + `withRequestContext` helper threaded through `KvCacheSession` | All built-in plugin manifests carry `cancel` declarations; `completion` kind gains `oneAtATimePerModel` policy | -| **M3b** | Migrate inference handlers onto the registry | llamacpp-embedding, whispercpp-transcription, parakeet-transcription, nmtcpp-translation | -| **M3c** | Migrate non-inference / addon handlers onto the registry | onnx-tts, onnx-ocr, sdcpp-generation, downloadAsset, loadModel | -| **M3d** | CLI / wire integration | CLI `cancel` surface, RPC handler routing for `cancel({ requestId })` parity | - -Until a handler is registry-migrated, the broad-cancel path (`cancel({ operation: , modelId })`) falls back to `addon.cancel()` directly — see the fallback in `server/bare/ops/cancel.ts`. The wire contract for non-migrated kinds is therefore unchanged: callers continue to use `cancel({ operation: , modelId })` exactly as before, and behavior is preserved through the migration. +For kinds that haven't been migrated onto the registry yet, the broad-cancel path (`cancel({ operation: , modelId })`) falls back to `addon.cancel()` directly — see the fallback in `server/bare/ops/cancel.ts`. The wire contract for non-migrated kinds is unchanged: callers continue to use `cancel({ operation: , modelId })` exactly as before. ## FAQ @@ -138,9 +125,9 @@ Extend the `CancelTarget` discriminated union in `request-registry.ts`. Today su The addon (`@qvac/llm-llamacpp` and friends) exposes a synchronous `cancel(jobId?)` that signals the C++ side to stop decoding ASAP. The SDK wires `signal.addEventListener("abort", onAbort, { once: true })` so a registry cancel fires `addon.cancel()` at the binding leaf — but the SDK's source of truth for "was this cancelled" is still `signal.aborted`, not addon state. This keeps the SDK decoupled from per-addon cancel semantics; an addon without a `cancel()` method still gets best-effort cancellation through the SDK's signal-propagated termination of the streaming loop. -### M3a: Why declare cancel capability on the plugin manifest? +### Why declare cancel capability on the plugin manifest? -Before M3a, "what cancel does for this handler" was implicit — readers grepped the handler body, the addon source, and the broad-cancel fallback to figure out whether `cancel({ requestId })` would interrupt compute, stop yielding, or do nothing. M3a moves the declaration onto `PluginHandlerDefinition.cancel`, so the framework knows the addon surface at registration time and reviewers can read a single field instead of a code path. +`PluginHandlerDefinition.cancel` makes "what cancel does for this handler" explicit on the manifest, so the framework knows the addon surface at registration time and reviewers can read a single field instead of inferring from the handler body, the addon source, and the broad-cancel fallback. Three scope values express the addon's truth today: @@ -152,11 +139,11 @@ Three scope values express the addon's truth today: The truth-table cell for each built-in plugin is pinned by `test/unit/plugin-cancel-capability.test.ts`. -### M3a: Why a concurrency policy on the registry? +### Why a concurrency policy on the registry? The llama.cpp addon owns one KV-cache + one decode loop per model. Two concurrent `completionStream` requests against the same model interleave their token streams on the same logical session — observably "the wrong tokens for my prompt" from the client's point of view, with the cache layer also drifting because two `KvCacheSession.beginTurn(...)` calls would race for the same `cachePath`. -Pre-M3a, the SDK didn't have an admission-control hook, so callers had to externally enforce "one completion per model." The M3a registry adds: +The registry exposes an admission-control hook so callers don't have to externally enforce "one completion per model": ```typescript registry.policy({ kind: "completion", oneAtATimePerModel: true }); @@ -164,13 +151,13 @@ registry.policy({ kind: "completion", oneAtATimePerModel: true }); `oneAtATimePerModel` rejects the second `begin({ kind: "completion", modelId: "m1" })` with `RequestRejectedByPolicyError` (52420) — a typed error that propagates through the RPC boundary and gives the client a clean signal to either retry after the first one ends or surface "model busy" to the user. The error is structured (carries `requestId`, `kind`, `modelId`, `reason`) rather than a generic 500, so clients can `instanceof` narrow. -Other kinds (`embed`, `transcribe`, …) are intentionally not policied in M3a — the brief defers that decision to per-kind migrations (M3b–M3c), since admission rules for non-completion handlers depend on their addon's own concurrency model. +Other kinds (`embed`, `transcribe`, …) are intentionally not policied — admission rules for non-completion handlers depend on their addon's own concurrency model and are decided per-kind as those handlers are migrated. The policy runs **before** controller / scope allocation in `begin(...)`, so a rejected `begin` leaves no registry entry behind. Disposing the holder of the in-flight slot releases admission for the next request — `await using` makes this automatic. -### M3a: Why structured `[request-lifecycle]` logs? +### Why structured `[request-lifecycle]` logs? -Logs are the only cross-handler observability surface the worker has — there's no APM agent inside Bare. Before M3a, "what happened on requestId X" required tailing free-form lines and inferring causality. M3a emits three lines per request with a fixed shape: +Logs are the only cross-handler observability surface the worker has — there's no APM agent inside Bare. The registry emits three lines per request with a fixed shape so "what happened on requestId X" is a single grep instead of free-form line tailing: ``` [request-lifecycle] begin requestId= kind= modelId= state=running @@ -182,17 +169,17 @@ Logs are the only cross-handler observability surface the worker has — there's Levels are `info` everywhere except the failure case: an `end` with `state=failed` fires at `warn` so ops alerting can predicate on `level>=warn` for the prefix without parsing the message body — the prefix stays consistent, only the level lifts to signal "this needs attention." A `cancelEntry` guard suppresses duplicate cancel emits when the same `cancel({ requestId })` fires twice (already-aborted entries skip the abort path) — one line per logical event. -### Closed in M2: same-tick "cancel-before-begin" race (Stop-button race) +### Same-tick "cancel-before-begin" race (Stop-button race) -**Pre-M2 (M1 contract, removed):** if a client cancelled via `cancel({ requestId })` before the server's `registry.begin({ requestId })` had run for that id, the cancel found zero matches, returned, and the later `begin(...)` opened a fresh context the cancel didn't reach. A user's "Stop" button click could silently no-op when it raced the round-trip of the original `completion(...)` request. +Without special handling, if a client cancelled via `cancel({ requestId })` before the server's `registry.begin({ requestId })` had run for that id, the cancel would find zero matches, return, and the later `begin(...)` would open a fresh context the cancel didn't reach. A user's "Stop" button click could silently no-op when it raced the round-trip of the original `completion(...)` request. -**M2 contract (Option A, chosen):** the registry tracks `requestId`s whose `cancel(...)` arrived before a matching `begin(...)`. The next `begin(...)` for one of those ids aborts the new controller before returning, with the recorded reason forwarded to `signal.reason`. The context is observable in state `"cancelling"` from the outset — no momentary `"running"` snapshot with an already-aborted signal. +The registry tracks `requestId`s whose `cancel(...)` arrived before a matching `begin(...)`. The next `begin(...)` for one of those ids aborts the new controller before returning, with the recorded reason forwarded to `signal.reason`. The context is observable in state `"cancelling"` from the outset — no momentary `"running"` snapshot with an already-aborted signal. Bound: at most `CANCEL_BEFORE_BEGIN_MAX_ENTRIES = 128` entries, evicted FIFO when full, with a `CANCEL_BEFORE_BEGIN_TTL_MS = 30_000` time-to-live so a stale id never decides a later, unrelated `begin(...)` even if the size cap isn't reached. Both constants are exported via `__requestRegistryTestHooks` for test pinning. A malicious client cannot grow the map unbounded. -Wire surface: **unchanged.** `cancel({ requestId })` still returns 0 when there's no in-flight match (the truth on the wire — no request was *matched and aborted by this call*); the effective cancel lands when the begin arrives. The wire contract for `CancelResponse` did not gain a `matched: boolean` field — Option B was considered and rejected (see `pitch-3-decisions.md`, D6). +Wire surface: **unchanged.** `cancel({ requestId })` still returns 0 when there's no in-flight match (the truth on the wire — no request was *matched and aborted by this call*); the effective cancel lands when the begin arrives. The wire contract for `CancelResponse` does not carry a `matched: boolean` field. -Test coverage: `same-tick cancel-before-begin retroactively aborts the later begin() (M2 Stop-button race close)` and `bounded cancel-before-begin set does not grow past its cap (TTL + size eviction)` in `test/unit/runtime/request-registry.test.ts`. +Test coverage: `same-tick cancel-before-begin retroactively aborts the later begin() (Stop-button race close)` and `bounded cancel-before-begin set does not grow past its cap (TTL + size eviction)` in `test/unit/runtime/request-registry.test.ts`. ## Implementation Files @@ -200,32 +187,33 @@ Test coverage: `same-tick cancel-before-begin retroactively aborts the later beg |----------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------| | `server/bare/runtime/disposable-scope.ts` | `DisposableScope` factory + module-load `Symbol.asyncDispose` guard | | `server/bare/runtime/request-context.ts` | `RequestContext`, `RequestKind`, `RequestState` types | -| `server/bare/runtime/request-registry.ts` | `createRequestRegistry()`, `ManagedRequestContext`, `RequestOutcome`, cancel/begin/end/list logic, bounded `cancelled-before-begin` race-close map (M2), `ConcurrencyPolicy` + `[request-lifecycle]` structured logging (M3a) | -| `server/bare/runtime/request-registry-singleton.ts` | `getRequestRegistry()` worker-scoped accessor; installs the `completion` → `oneAtATimePerModel` policy on first use (M3a) | -| `server/bare/runtime/with-request-context.ts` | `withRequestContext(logger, ctx)` — per-request logger wrapper prefixing every emit with the lifecycle correlation tuple (M3a) | +| `server/bare/runtime/request-registry.ts` | `createRequestRegistry()`, `ManagedRequestContext`, `RequestOutcome`, cancel/begin/end/list logic, bounded `cancelled-before-begin` race-close map, `ConcurrencyPolicy` + `[request-lifecycle]` structured logging | +| `server/bare/runtime/request-registry-singleton.ts` | `getRequestRegistry()` worker-scoped accessor; installs the `completion` → `oneAtATimePerModel` policy on first use | +| `server/bare/runtime/with-request-context.ts` | `withRequestContext(logger, ctx)` — per-request logger wrapper prefixing every emit with the lifecycle correlation tuple | | `server/bare/runtime/request-id.ts` | UUID generation helper for caller-provided ids | | `server/bare/runtime/index.ts` | Public re-exports — handlers import from `@/server/bare/runtime` | | `server/bare/ops/cancel.ts` | Broad-cancel op: registry-routed with addon fallback for non-migrated handlers | | `server/rpc/handlers/cancelHandler.ts` | RPC entry point: dispatches by `operation` (inference / embeddings / request / downloadAsset / rag) | -| `server/rpc/handlers/delete-cache.ts` | Delegates to `deleteKvCacheState(...)` — zero direct references to the three KV-cache layers (M2) | -| `server/bare/plugins/llamacpp-completion/plugin.ts` | First handler migrated; declares `cancel: { scope: "model", hard: true }`; builds `withRequestContext(...)` once per request and threads it into `completion(...)` (M3a) | -| `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts` | Reference implementation of the canonical handler shape; uses `KvCacheSession` (M2); accepts a request-scoped `logger` (M3a) | -| `server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts` | `KvCacheSession` factory + `deleteKvCacheState`: single owner of the three KV-cache layers (M2); accepts a per-instance logger (M3a) | -| `server/bare/plugins/{llamacpp-embedding,whispercpp-transcription,parakeet-transcription,nmtcpp-translation,onnx-tts,onnx-ocr,sdcpp-generation}/plugin.ts` | Built-in plugin manifests — each handler declares its `cancel: { scope, hard? }` truth-table row (M3a) | -| `schemas/plugin.ts` | `PluginHandlerCancel` / `PluginHandlerCancelScope` types + runtime schema validation on `pluginHandlerDefinitionRuntimeSchema` (M3a) | -| `client/api/completion-stream.ts` | Client-side construction of `InferenceCancelledError` on `stopReason: "cancelled"` (M2) | -| `schemas/completion-event.ts` | `stopReason` enum extended with `"cancelled"` (M2) | +| `server/rpc/handlers/delete-cache.ts` | Delegates to `deleteKvCacheState(...)` — zero direct references to the three KV-cache layers | +| `server/bare/plugins/llamacpp-completion/plugin.ts` | Reference plugin manifest; declares `cancel: { scope: "model", hard: true }`; builds `withRequestContext(...)` once per request and threads it into `completion(...)` | +| `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts` | Reference implementation of the canonical handler shape; uses `KvCacheSession`; accepts a request-scoped `logger` | +| `server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts` | `KvCacheSession` factory + `deleteKvCacheState`: single owner of the three KV-cache layers; accepts a per-instance logger | +| `server/bare/plugins/{llamacpp-embedding,whispercpp-transcription,parakeet-transcription,nmtcpp-translation,onnx-tts,onnx-ocr,sdcpp-generation}/plugin.ts` | Built-in plugin manifests — each handler declares its `cancel: { scope, hard? }` truth-table row | +| `schemas/plugin.ts` | `PluginHandlerCancel` / `PluginHandlerCancelScope` types + runtime schema validation on `pluginHandlerDefinitionRuntimeSchema` | +| `client/api/completion-stream.ts` | Client-side construction of `InferenceCancelledError` on `stopReason: "cancelled"` | +| `schemas/completion-event.ts` | `stopReason` enum carries `"cancelled"` | | `test/unit/runtime/disposable-scope.test.ts` | Scope contract: LIFO, idempotency, error aggregation, late-defer | -| `test/unit/runtime/request-registry.test.ts` | Registry contract: begin/cancel/end, parent-signal composition, listener detach, M2 Stop-button race close, bounded-set invariants, M3a `oneAtATimePerModel` admission rule | +| `test/unit/runtime/request-registry.test.ts` | Registry contract: begin/cancel/end, parent-signal composition, listener detach, Stop-button race close, bounded-set invariants, `oneAtATimePerModel` admission rule | | `test/unit/runtime/kv-cache-session.test.ts` | `KvCacheSession` contract: begin/commit/rollback semantics, double-rollback idempotency, delete-by-scope | -| `test/unit/runtime/with-request-context.test.ts` | `withRequestContext` prefix shape and underlying-logger pass-through (M3a) | -| `test/unit/runtime/request-lifecycle-logging.test.ts` | `[request-lifecycle] begin/cancel/end` line shape, single-emit guarantee, `cancelAll` fan-out (M3a) | -| `test/unit/plugin-cancel-capability.test.ts` | Runtime-schema + truth-table pinning for every built-in plugin's `cancel` declaration (M3a) | +| `test/unit/runtime/with-request-context.test.ts` | `withRequestContext` prefix shape and underlying-logger pass-through | +| `test/unit/runtime/request-lifecycle-logging.test.ts` | `[request-lifecycle] begin/cancel/end` line shape, single-emit guarantee, `cancelAll` fan-out | +| `test/unit/plugin-cancel-capability.test.ts` | Runtime-schema + truth-table pinning for every built-in plugin's `cancel` declaration | | `schemas/cancel.ts` | `CancelRequest` discriminated union + `cancelByRequestIdSugarSchema` for `sdk.cancel({ requestId })` | -| `utils/errors-server.ts` | `RequestIdConflictError` (52417), `InferenceCancelledError` (52419), `RequestRejectedByPolicyError` (52420, M3a), `AsyncDisposeUnavailableError` (53503) | +| `utils/errors-server.ts` | `RequestIdConflictError` (52417), `InferenceCancelledError` (52419), `RequestRejectedByPolicyError` (52420), `AsyncDisposeUnavailableError` (53503) | ## See Also - [`request-lifecycle-primitives.mdc`](../request-lifecycle-primitives.mdc) — canonical handler shape + anti-patterns (this rule fires automatically when editing server-side handlers). - [`error-handling.mdc`](../error-handling.mdc) — structured-error placement and the client/server error separation. - [`docs/kv-cache-system.mdc`](./kv-cache-system.mdc) — `KvCacheSession` ownership of the three KV-cache bookkeeping layers and its `beginTurn` / `commitTurn` / `rollback` / `deleteKvCacheState` API. + diff --git a/.cursor/rules/sdk/request-lifecycle-primitives.mdc b/.cursor/rules/sdk/request-lifecycle-primitives.mdc index cb7e8ebb4d..6f02632734 100644 --- a/.cursor/rules/sdk/request-lifecycle-primitives.mdc +++ b/.cursor/rules/sdk/request-lifecycle-primitives.mdc @@ -16,7 +16,7 @@ Server-side long-running operations (`completion`, `embeddings`, `transcribe`, ` - **`RequestContext`** — per-request handle bundling `requestId`, `kind`, `modelId`, `signal`, `scope`, `state`. - **`RequestRegistry`** — module-scoped registry that mints contexts via `begin(...)` and routes `cancel(...)` by `requestId` or `modelId`. -Migration is rolling out across milestones (M1 shipped completion; M2 shipped typed cancel outcomes + `KvCacheSession` + the Stop-button race close; **M3a** added per-handler cancel-capability declaration, registry concurrency policy, and structured `[request-lifecycle]` logging; M3b–M3d migrate embeddings / transcribe / translate / loadModel / downloadAsset onto the same primitives). The contract below applies to every newly-migrated handler. +The contract below applies to every cancellable server-side handler. The dispatch-level truth table below ("What's on the registry today") tracks which `RequestKind`s are currently routed through the registry; kinds not in that table use the broad-cancel fallback in `server/bare/ops/cancel.ts`. ## Canonical Handler Shape @@ -163,7 +163,7 @@ defineHandler({ The framework reads `cancel.scope` to decide how to wire `signal.onabort`; the handler stays oblivious to whether the addon supports per-request, per-model, or no cancel surface. See the truth table in the next section. -## Cancel Capability Declaration (new in M3a) +## Cancel Capability Declaration Every plugin handler declares its addon's cancel surface on the manifest so the framework knows what `cancel({ requestId })` can actually do today, and so future code can decide whether to wire `signal.onabort → addon.cancel()` directly or fall back to soft-cancel. @@ -191,7 +191,7 @@ completionStream: defineHandler({ Omitting the field entirely is equivalent to `{ scope: "none" }`. The runtime schema in `packages/sdk/schemas/plugin.ts` validates it on plugin registration. -### Truth table for built-in plugins (as of M3a) +### Truth table for built-in plugins | Plugin | Handler(s) | `cancel` | |------------------------------|---------------------------------------|-----------------------------------| @@ -208,7 +208,20 @@ Omitting the field entirely is equivalent to `{ scope: "none" }`. The runtime sc The truth table is pinned by `packages/sdk/test/unit/plugin-cancel-capability.test.ts`. When a handler's addon gains a per-request cancel surface, flip its declaration to `{ scope: "request", hard: true }` and update the test row in the same PR. -## Concurrency Policy (new in M3a) +### What's on the registry today + +The truth table above describes the addon-level capability for plugin handlers. The table below tracks the **dispatch-level** state: which request kinds run through `registry.begin(...)` (and therefore expose `cancel({ requestId })`) on this branch. + +| `RequestKind` | Dispatched from | Cancel reach | Notes | +|-------------------|------------------------------------------------------------------|--------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `completion` | `llamacpp-completion/plugin.ts` → `completion-stream.ts` | Hard (`addon.cancel()` mid-decode) | Reference handler. `oneAtATimePerModel` policy. | +| `loadModel` | `server/rpc/handlers/load-model/handler.ts` | Mixed: download phase hard (signal threaded to `resolveModelPath`); load phase soft (addon `createModel`/`load(false)` has no signal) | The handler checks `ctx.signal.aborted` at the download/load phase boundary and again after the load to surface `InferenceCancelledError`; the orphan-model edge case during load is a documented follow-up. | +| `downloadAsset` | `server/rpc/handlers/download-asset.ts` | Hard (signal threaded to `resolveModelPath`) | Per-`requestId` cancel preserves the content-addressed dedup in `download-manager.ts` — two subscribers on the same `downloadKey` share one transfer, and the transfer aborts only when the **last** subscriber leaves. | +| `rag` | `server/rpc/handlers/rag.ts` | Soft (workspace-bound; ingest/saveEmbeddings/reindex) | Dispatcher-level pre-emption: starting a new RAG op on a workspace cancels the prior in-flight op on the same workspace **before** `registry.begin(...)`. Workspace admission lives in the dispatcher rather than as a registry policy primitive. | + +Kinds **not** in this table (e.g. `embeddings`, `transcribe`, `translate`, `finetune`, `textToSpeech`, `ocr`, `diffusion`, `upscale`) still use the broad-cancel fallback in `server/bare/ops/cancel.ts`. + +## Concurrency Policy The registry exposes an opt-in admission-control surface on `policy(...)`: @@ -221,11 +234,11 @@ interface RequestRegistry { `getRequestRegistry()` (the worker-process singleton in `request-registry-singleton.ts`) registers `{ kind: "completion", oneAtATimePerModel: true }` on first use. The llama.cpp addon owns one KV-cache + one decode loop per model — two concurrent `completionStream` requests on the same model would interleave on the same logical session, so the second `begin(...)` is rejected explicitly with `RequestRejectedByPolicyError` (code 52420) instead of producing a confusing interleave. -Kinds without a registered policy are unconstrained (pre-M3a admission semantics — `embed`, `transcribe`, `translate`, etc. are unaffected until M3b–M3d wire their own policies). +Kinds without a registered policy are unconstrained — `embed`, `transcribe`, `translate`, etc. accept arbitrary concurrency until per-kind admission rules are wired. Policy enforcement runs in `registry.begin(...)` **before** any controller / scope allocation, so a rejected `begin` leaves no entry behind. Tests pin both the rejection and the "no slot leak" property in `packages/sdk/test/unit/runtime/request-registry.test.ts`. -## Structured Logging (new in M3a) +## Structured Logging The registry emits a single line per lifecycle event so log shippers can correlate per-request activity by grepping `requestId=`: @@ -301,15 +314,20 @@ There are two cancel paths exposed to clients: ### Targeted (preferred, new in 0.11.0) -Cancel by `requestId`. Pair with the `requestId` field exposed on `CompletionRun` (and equivalent long-running result objects): +Cancel by `requestId`. Pair with the `requestId` field exposed on `CompletionRun` and the decorated `loadModel(...)` / `downloadAsset(...)` promises: ```typescript -// Client side +// Streaming long-running call const run = sdk.completion({ ... }); console.log(run.requestId); // available synchronously +// Long-running Promise-shaped call (decorated promise) +const op = sdk.loadModel({ ... }); +console.log(op.requestId); // also synchronously available + // Later, from anywhere with access to the SDK client: await sdk.cancel({ requestId: run.requestId }); +await sdk.cancel({ requestId: op.requestId }); ``` ### Broad (escape hatch) @@ -321,7 +339,58 @@ await sdk.cancel({ operation: "inference", modelId }); await sdk.cancel({ operation: "embeddings", modelId }); ``` -Internally, both paths land on `RequestRegistry.cancel(...)`. The broad path falls back to `addon.cancel()` for handler kinds that haven't been registry-migrated yet (everything except llama.cpp completion in 0.11.0). +Internally, both paths land on `RequestRegistry.cancel(...)`. The broad path falls back to `addon.cancel()` for handler kinds that haven't been registry-migrated yet — see the "What's on the registry today" table above for the current set of registry-routed kinds. + +## Decorated-Promise Pattern + +`completion(...)` exposes `op.requestId` synchronously by virtue of returning a `CompletionRun` object (not a `Promise`). The same contract applies to `loadModel(...)` and `downloadAsset(...)` — both of which **stay** `Promise` on the unwrap path (so `await loadModel(...)` keeps returning the model id) but carry a synchronously-readable `requestId` field on the returned object. + +The helper: + +```typescript +// packages/sdk/utils/decorate-promise.ts +export function decoratePromise>( + promise: Promise, + metadata: M, +): Promise & M { + return Object.assign(promise, metadata); +} +``` + +Client-side call sites mint the `requestId` once, thread it onto the wire envelope, and decorate the returned promise: + +```typescript +// packages/sdk/client/api/load-model.ts +export function loadModel( + options: LoadModelOptions, + rpcOptions?: RPCOptions, +): Promise & { requestId: string } { + const requestId = generateClientRequestId(); + const inner = runLoadModel(options, requestId, rpcOptions); + return decoratePromise(inner, { requestId }); +} +``` + +Callers can grab `op.requestId` before the network round-trip resolves and fire a targeted cancel: + +```typescript +const op = sdk.loadModel({ modelSrc: "https://example.com/big.gguf" }); +// op.requestId is reachable RIGHT NOW — before the download even starts +stopButton.onclick = () => sdk.cancel({ requestId: op.requestId }); +const modelId = await op; // legacy `Promise` unwrap still works +``` + +### Invariants the helper exists to preserve + +1. **`await loadModel(...)` still returns `string`.** The decoration mutates the inner promise in place via `Object.assign`; the prototype chain is untouched, so `.then` / `.catch` / `.finally` / async-await unwrapping behave exactly as for a bare `Promise`. Pin this in your call site's tests — if it goes red, the decoration broke the unwrap chain. +2. **`op.requestId` and the server's registry-entry `requestId` are the same value.** The client generates the id once, the request envelope carries it (`request.requestId`), and the server's `registry.begin({ requestId, kind })` uses it as the registry key. `cancel({ requestId: op.requestId })` is a precise abort against this exact call. +3. **No prototype-extension shenanigans.** Do **not** add properties via `Promise.prototype` or `Object.setPrototypeOf`. The plain-object approach is intentional — extending `Promise.prototype` would silently affect every promise in the worker. If a future refactor wants to make the wrapper a class, push back. +4. **Metadata keys must not collide with `Promise` members.** Don't decorate with `{ then: ... }` / `{ catch: ... }` / `{ finally: ... }` — those would shadow the Promise methods and break `await`. `requestId` is safe; new metadata fields go through code review. + +### Test surface + +- `packages/sdk/test/unit/utils/decorate-promise.test.ts` pins the helper contract: synchronous metadata access, `await` unwrap to `T`, rejection propagation, in-place identity, `.then`/`.finally` chain integrity. +- `packages/sdk/test/unit/request-id-wire.test.ts` pins the wire envelope: `loadModelOptionsToRequestSchema` / `downloadAssetOptionsToRequestSchema` / `ragRequestSchema` accept and preserve `requestId` (server keys the registry on it) and treat it as optional (legacy clients omit it; server falls back to a server-generated id). ## Primitives Reference @@ -338,7 +407,7 @@ import { // scope factory (rarely used directly — `registry.begin(...)` carries one) createDisposableScope, - // per-request logger wrapper (M3a) + // per-request logger wrapper withRequestContext, } from "@/server/bare/runtime"; @@ -353,9 +422,9 @@ import type { CancelTarget, CancelByRequestId, CancelByModelId, - ConcurrencyPolicy, // M3a + ConcurrencyPolicy, DisposableScope, - RequestLogContext, // M3a — narrow shape `withRequestContext` reads + RequestLogContext, // narrow shape `withRequestContext` reads } from "@/server/bare/runtime"; ``` @@ -397,7 +466,7 @@ Four errors are owned by this stack today (all in `@/utils/errors-server`): |-------|----------------------------------|----------------------------------------------------------------------------| | 52417 | `RequestIdConflictError` | `registry.begin(...)` called with a `requestId` already present. | | 52419 | `InferenceCancelledError` | A cancelled inference run's `CompletionRun` promise-aggregates (`final` / `text` / `toolCalls` / `stats`) reject with this carrying the partial state. The `events` stream itself ends normally with `stopReason: "cancelled"`. | -| 52420 | `RequestRejectedByPolicyError` | `registry.begin(...)` rejected by a registered concurrency policy (e.g. `oneAtATimePerModel` for `completion`). Carries `requestId`, `kind`, `modelId`, and a human-readable `reason`. | +| 52420 | `RequestRejectedByPolicyError` | `registry.begin(...)` rejected by a registered concurrency policy (e.g. `oneAtATimePerModel` for `completion`). Carries `requestId`, `kind`, `modelId`, and a human-readable `reason`. | | 53503 | `AsyncDisposeUnavailableError` | Module-load guard: host runtime doesn't expose `Symbol.asyncDispose`. | `InferenceCancelledError` carries `requestId` plus a `partial: { text?, toolCalls?, stats? }` payload. Construction is **client-side** (`packages/sdk/client/api/completion-stream.ts`): when the event stream ends with `stopReason: "cancelled"`, the client builds the error from its locally-aggregated partial state and rejects the aggregates. The class is server-defined for reusability and re-exported from `@qvac/sdk` so client code can `instanceof`-check it. See `error-handling.mdc`. @@ -409,11 +478,13 @@ Four errors are owned by this stack today (all in `@/utils/errors-server`): Five unit-test files pin the contract — read them as canonical examples: - `packages/sdk/test/unit/runtime/disposable-scope.test.ts` — LIFO cleanup, idempotency, error aggregation, late `defer` behavior. -- `packages/sdk/test/unit/runtime/request-registry.test.ts` — `begin`/`cancel`/`end` flow, `requestId` conflict detection, parent-signal composition + listener detach discipline, the M2 Stop-button race close (`same-tick cancel-before-begin retroactively aborts the later begin()`), the bounded `cancelled-before-begin` map invariants, **and the M3a concurrency policy** (`oneAtATimePerModel` admission rule + `RequestRejectedByPolicyError` propagation + rejected-begin leaves no slot behind). +- `packages/sdk/test/unit/runtime/request-registry.test.ts` — `begin`/`cancel`/`end` flow, `requestId` conflict detection, parent-signal composition + listener detach discipline, Stop-button race close (`same-tick cancel-before-begin retroactively aborts the later begin()`), the bounded `cancelled-before-begin` map invariants, **and concurrency policy** (`oneAtATimePerModel` admission rule + `RequestRejectedByPolicyError` propagation + rejected-begin leaves no slot behind). - `packages/sdk/test/unit/runtime/kv-cache-session.test.ts` — `beginTurn` / `commitTurn` / `rollback` semantics in isolation: commit suppresses the deferred rollback, rollback after commit is a no-op, double-rollback is idempotent, and rollback wipes all three layers (file / `initializedCaches` / `cachedMessageCounts`) regardless of who fails along the way. - `packages/sdk/test/unit/runtime/request-lifecycle-logging.test.ts` — registry's `[request-lifecycle] begin/cancel/end` line shape, single-emit guarantee on redundant cancels, `cancelAll` fans out one line per active entry, `state=cancelling` on a `parentSignal`-pre-aborted begin, and the `modelId=-` placeholder for model-less requests. - `packages/sdk/test/unit/runtime/with-request-context.test.ts` — `withRequestContext` prefix shape across every level, `modelId`-segment dropping when absent, multi-argument preservation, and pass-through of `setLevel` / `getLevel` / `addTransport` / `setConsoleOutput`. + + The cancel-capability truth table is pinned by `packages/sdk/test/unit/plugin-cancel-capability.test.ts` — when a built-in plugin's `cancel` declaration changes, update the truth-table row in the same PR. When adding new behavior to the primitives, add the test before the implementation and pair it with the corresponding doc update in this rule. @@ -453,4 +524,4 @@ Already covered by the `CancelTarget` discriminated union. If you genuinely need - `error-handling.mdc` — `InferenceCancelledError` / `AsyncDisposeUnavailableError` / `RequestIdConflictError` placement and propagation across the RPC boundary. - `docs/kv-cache-system.mdc` — `KvCacheSession` ownership of the three KV-cache bookkeeping layers (on-disk `.bin`, `initializedCaches`, `cachedMessageCounts`), `beginTurn` / `commitTurn` / `rollback` semantics, and the cross-model `deleteKvCacheState` administrative API. -- `docs/request-lifecycle-system.mdc` — full reference (design rationale, migration roadmap, FAQ, M2 Stop-button race close rationale). +- `docs/request-lifecycle-system.mdc` — full reference (design rationale, FAQ, Stop-button race close rationale). diff --git a/packages/sdk/client/api/client-request-id.ts b/packages/sdk/client/api/client-request-id.ts new file mode 100644 index 0000000000..eac0fcb365 --- /dev/null +++ b/packages/sdk/client/api/client-request-id.ts @@ -0,0 +1,30 @@ +/** + * UUIDv4 generator for client-side request ids. The Web Crypto API + * ships `crypto.randomUUID` everywhere we run today (Bun, modern Node, + * modern browsers, React Native via the polyfill that the workbench- + * desktop / RN runtime config injects). The fallback exists so the SDK + * never crashes in an exotic JS environment without + * `crypto.randomUUID` — `requestId` semantics still hold (uniqueness, + * opaque to the caller), just without the UUIDv4 wire shape. + * + * Shared with the long-running call sites that decorate their promises + * with `requestId`: `completion(...)`, `loadModel(...)`, + * `downloadAsset(...)`. The decorated promise's `op.requestId` and the + * server's registry entry's `requestId` must match — generating once + * here keeps that invariant. + */ +export function generateClientRequestId(): string { + const c = ( + globalThis as { + crypto?: { randomUUID?: () => string }; + } + ).crypto; + if (c?.randomUUID) return c.randomUUID(); + // Fallback: 128 random bits encoded as a hex string. Distinct enough + // for in-flight cancel targeting; not a wire-spec UUID. + const bytes = new Uint8Array(16); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = Math.floor(Math.random() * 256); + } + return Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join(""); +} diff --git a/packages/sdk/client/api/completion-stream.ts b/packages/sdk/client/api/completion-stream.ts index da2d6895d7..d225973392 100644 --- a/packages/sdk/client/api/completion-stream.ts +++ b/packages/sdk/client/api/completion-stream.ts @@ -25,6 +25,7 @@ import { type ToolInput, } from "@/utils/tool-helpers"; import { buildFinalFromEvents } from "@/utils/aggregate-events"; +import { generateClientRequestId } from "@/client/api/client-request-id"; const logger = getClientLogger(); @@ -409,27 +410,3 @@ export function completion(params: CompletionParams): CompletionRun { } } -/** - * UUIDv4 generator for client-side request ids. The Web Crypto API ships - * `crypto.randomUUID` everywhere we run today (Bun, modern Node, modern - * browsers, React Native via the polyfill that the workbench-desktop / - * RN runtime config injects). The fallback exists so the SDK never - * crashes in an exotic JS environment without `crypto.randomUUID` — - * `requestId` semantics still hold (uniqueness, opaque to the caller), - * just without the UUIDv4 wire shape. - */ -function generateClientRequestId(): string { - const c = ( - globalThis as { - crypto?: { randomUUID?: () => string }; - } - ).crypto; - if (c?.randomUUID) return c.randomUUID(); - // Fallback: 128 random bits encoded as a hex string. Distinct enough - // for in-flight cancel targeting; not a wire-spec UUID. - const bytes = new Uint8Array(16); - for (let i = 0; i < bytes.length; i++) { - bytes[i] = Math.floor(Math.random() * 256); - } - return Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join(""); -} diff --git a/packages/sdk/client/api/download-asset.ts b/packages/sdk/client/api/download-asset.ts index 36b53238da..8554d64d93 100644 --- a/packages/sdk/client/api/download-asset.ts +++ b/packages/sdk/client/api/download-asset.ts @@ -9,6 +9,8 @@ import { StreamEndedError, InvalidResponseError, } from "@/utils/errors-client"; +import { decoratePromise } from "@/utils/decorate-promise"; +import { generateClientRequestId } from "@/client/api/client-request-id"; export type DownloadAssetOptions = BaseDownloadAssetOptions; @@ -25,7 +27,9 @@ export type DownloadAssetOptions = BaseDownloadAssetOptions; * - onProgress: Optional callback for download progress * @param rpcOptions - Optional RPC options including per-call profiling configuration * - * @returns Promise that resolves to the asset ID (either the provided assetSrc or a generated ID) + * @returns Promise that resolves to the asset ID (either the provided assetSrc or a generated ID), + * decorated with a synchronous `requestId` field for use with + * `cancel({ requestId: op.requestId })` before the promise resolves. * * @throws {QvacErrorBase} When asset download fails, with details in the error message * @throws {QvacErrorBase} When streaming ends unexpectedly (only when using onProgress) @@ -46,13 +50,32 @@ export type DownloadAssetOptions = BaseDownloadAssetOptions; * console.log(`Downloaded: ${progress.percentage}%`); * } * }); + * + * // Targeted cancel by requestId — grab the id synchronously, then + * // cancel before the download resolves. + * const op = downloadAsset({ assetSrc: "https://example.com/large.gguf" }); + * setTimeout(() => cancel({ requestId: op.requestId }), 1000); + * await op; // rejects with `InferenceCancelledError` * ``` */ -export async function downloadAsset( +export function downloadAsset( + options: DownloadAssetOptions, + rpcOptions?: RPCOptions, +): Promise & { requestId: string } { + const requestId = generateClientRequestId(); + const inner = runDownloadAsset(options, requestId, rpcOptions); + return decoratePromise(inner, { requestId }); +} + +async function runDownloadAsset( options: DownloadAssetOptions, + requestId: string, rpcOptions?: RPCOptions, ): Promise { - const request = downloadAssetOptionsToRequestSchema.parse(options); + const request = downloadAssetOptionsToRequestSchema.parse({ + ...options, + requestId, + }); if (options.onProgress) { // Use streaming for progress updates diff --git a/packages/sdk/client/api/load-model.ts b/packages/sdk/client/api/load-model.ts index 7e800d37ff..886f4f9743 100644 --- a/packages/sdk/client/api/load-model.ts +++ b/packages/sdk/client/api/load-model.ts @@ -22,6 +22,8 @@ import { } from "@/utils/errors-client"; import { assertModelSrcMatchesModelType } from "@/utils/load-model-validation"; import { getClientLogger } from "@/logging"; +import { decoratePromise } from "@/utils/decorate-promise"; +import { generateClientRequestId } from "@/client/api/client-request-id"; const logger = getClientLogger(); @@ -46,7 +48,7 @@ const logger = getClientLogger(); export function loadModel( options: LoadModelDescriptorParam, rpcOptions?: RPCOptions, -): Promise; +): Promise & { requestId: string }; /** * Loads a machine learning model from a local path, remote URL, or Hyperdrive key. @@ -143,7 +145,7 @@ export function loadModel( export function loadModel( options: LoadModelOptions, rpcOptions?: RPCOptions, -): Promise; +): Promise & { requestId: string }; /** * Loads a custom plugin model (any non-built-in `modelType` string). @@ -159,7 +161,7 @@ export function loadModel( export function loadModel( options: LoadCustomPluginModelOptions, rpcOptions?: RPCOptions, -): Promise; +): Promise & { requestId: string }; /** * Hot-reloads configuration on an already loaded model. @@ -196,14 +198,39 @@ export function loadModel( export function loadModel( options: ReloadConfigOptions, rpcOptions?: RPCOptions, -): Promise; +): Promise & { requestId: string }; -export async function loadModel( +export function loadModel( + options: + | LoadModelOptions + | LoadCustomPluginModelOptions + | LoadModelDescriptorOnlyOptions + | ReloadConfigOptions, + rpcOptions?: RPCOptions, +): Promise & { requestId: string } { + // Generate a stable `requestId` once, synchronously, before kicking + // off any async work. The same id is: + // - threaded onto the request envelope (`request.requestId`) so the + // server's `registry.begin(...)` records it on the registry + // entry; and + // - attached to the returned promise via `decoratePromise` so the + // caller can target this exact call with `cancel({ requestId })` + // before `await` resolves. Generating client-side and surfacing + // it synchronously is what closes the "stop-button race" gap for + // `loadModel` / `downloadAsset` callers — same shape as the + // `CompletionRun.requestId` contract. + const requestId = generateClientRequestId(); + const inner = runLoadModel(options, requestId, rpcOptions); + return decoratePromise(inner, { requestId }); +} + +async function runLoadModel( options: | LoadModelOptions | LoadCustomPluginModelOptions | LoadModelDescriptorOnlyOptions | ReloadConfigOptions, + requestId: string, rpcOptions?: RPCOptions, ): Promise { const isReloadConfig = "modelId" in options && !("modelSrc" in options); @@ -232,6 +259,13 @@ export async function loadModel( } } + // Splice the client-generated `requestId` onto the resolved options so + // the wire envelope carries it. The server uses the same value as the + // registry-entry key — that match is what makes + // `cancel({ requestId: op.requestId })` a no-op when no match exists + // and a precise abort when it does. + resolvedOptions = { ...resolvedOptions, requestId }; + const request = isReloadConfig ? reloadConfigOptionsToRequestSchema.parse(resolvedOptions) : loadModelOptionsToRequestSchema.parse(resolvedOptions); diff --git a/packages/sdk/schemas/download-asset.ts b/packages/sdk/schemas/download-asset.ts index f57694719a..67d4472a0c 100644 --- a/packages/sdk/schemas/download-asset.ts +++ b/packages/sdk/schemas/download-asset.ts @@ -18,13 +18,24 @@ export const downloadAssetOptionsToRequestSchema = .extend({ onProgress: z.unknown().optional(), withProgress: z.boolean().optional(), + requestId: z.string().min(1).optional(), }) - .transform((data) => ({ - type: "downloadAsset" as const, - assetSrc: modelInputToSrcSchema.parse(data.assetSrc), - withProgress: data.withProgress ?? !!data.onProgress, - seed: data.seed ?? false, - })); + .transform((data) => { + const out: { + type: "downloadAsset"; + assetSrc: string; + withProgress: boolean; + seed: boolean; + requestId?: string; + } = { + type: "downloadAsset" as const, + assetSrc: modelInputToSrcSchema.parse(data.assetSrc), + withProgress: data.withProgress ?? !!data.onProgress, + seed: data.seed ?? false, + }; + if (data.requestId !== undefined) out.requestId = data.requestId; + return out; + }); export const downloadAssetRequestSchema = z .object({ @@ -32,6 +43,13 @@ export const downloadAssetRequestSchema = z assetSrc: z.string(), withProgress: z.boolean().optional(), seed: z.boolean().optional(), + requestId: z + .string() + .min(1) + .optional() + .describe( + "Stable identifier for this in-flight download, generated by the client at call time. Optional on the wire so legacy clients keep working — the server falls back to a server-generated id when the field is missing. Exposed on the client-side decorated promise so callers can target this download with `cancel({ requestId })`.", + ), }) .transform((data) => ({ ...data, diff --git a/packages/sdk/schemas/load-model.ts b/packages/sdk/schemas/load-model.ts index cda81b701a..c060202087 100644 --- a/packages/sdk/schemas/load-model.ts +++ b/packages/sdk/schemas/load-model.ts @@ -54,6 +54,7 @@ const loadModelRequestCommonFields = { onProgress: z.unknown().optional(), logger: z.unknown().optional(), withProgress: z.boolean().optional(), + requestId: z.string().min(1).optional(), }; export const loadBuiltinModelOptionsBaseSchema = z.union([ @@ -153,6 +154,7 @@ const loadModelOptionsToRequestBaseSchema = z.union([ seed: data.seed ?? false, withProgress: data.withProgress ?? !!data.onProgress, delegate: data.delegate, + ...(data.requestId !== undefined && { requestId: data.requestId }), })), z .object({ @@ -170,6 +172,7 @@ const loadModelOptionsToRequestBaseSchema = z.union([ seed: data.seed ?? false, withProgress: data.withProgress ?? !!data.onProgress, delegate: data.delegate, + ...(data.requestId !== undefined && { requestId: data.requestId }), })), z .object({ @@ -187,6 +190,7 @@ const loadModelOptionsToRequestBaseSchema = z.union([ seed: data.seed ?? false, withProgress: data.withProgress ?? !!data.onProgress, delegate: data.delegate, + ...(data.requestId !== undefined && { requestId: data.requestId }), })), z .object({ @@ -204,6 +208,7 @@ const loadModelOptionsToRequestBaseSchema = z.union([ seed: data.seed ?? false, withProgress: data.withProgress ?? !!data.onProgress, delegate: data.delegate, + ...(data.requestId !== undefined && { requestId: data.requestId }), })), z .object({ @@ -232,6 +237,7 @@ const loadModelOptionsToRequestBaseSchema = z.union([ seed: data.seed ?? false, withProgress: data.withProgress ?? !!data.onProgress, delegate: data.delegate, + ...(data.requestId !== undefined && { requestId: data.requestId }), })), z .object({ @@ -249,6 +255,7 @@ const loadModelOptionsToRequestBaseSchema = z.union([ seed: data.seed ?? false, withProgress: data.withProgress ?? !!data.onProgress, delegate: data.delegate, + ...(data.requestId !== undefined && { requestId: data.requestId }), })), z .object({ @@ -266,6 +273,7 @@ const loadModelOptionsToRequestBaseSchema = z.union([ seed: data.seed ?? false, withProgress: data.withProgress ?? !!data.onProgress, delegate: data.delegate, + ...(data.requestId !== undefined && { requestId: data.requestId }), })), z .object({ @@ -283,6 +291,7 @@ const loadModelOptionsToRequestBaseSchema = z.union([ seed: data.seed ?? false, withProgress: data.withProgress ?? !!data.onProgress, delegate: data.delegate, + ...(data.requestId !== undefined && { requestId: data.requestId }), })), z .object({ @@ -301,6 +310,7 @@ const loadModelOptionsToRequestBaseSchema = z.union([ seed: data.seed ?? false, withProgress: data.withProgress ?? !!data.onProgress, delegate: data.delegate, + ...(data.requestId !== undefined && { requestId: data.requestId }), })), ]); @@ -314,6 +324,13 @@ const commonModelConfigSchema = z.object({ withProgress: z.boolean().optional(), seed: z.boolean().optional(), delegate: delegateSchema, + requestId: z + .string() + .min(1) + .optional() + .describe( + "Stable identifier for this in-flight load, generated by the client at call time. Optional on the wire so legacy clients keep working — the server falls back to a server-generated id when the field is missing. Exposed on the client-side decorated promise so callers can target this load with `cancel({ requestId })`.", + ), }); // Request schemas for each model type (use canonical types since transforms normalize) diff --git a/packages/sdk/schemas/rag.ts b/packages/sdk/schemas/rag.ts index 2058acde4f..52ffd6be38 100644 --- a/packages/sdk/schemas/rag.ts +++ b/packages/sdk/schemas/rag.ts @@ -183,16 +183,25 @@ const ragDeleteWorkspaceOperationSchema = ragDeleteWorkspaceParamsSchema.extend( // ============== Unified Request Schema ============== +// `requestId` is threaded onto every RAG operation as an optional field so +// the request registry can correlate the in-flight context with the +// client-side decorated-promise (`op.requestId`). Optional on the wire +// so legacy clients keep working — the server falls back to a +// server-generated id when the field is missing. +const ragRequestIdField = { + requestId: z.string().min(1).optional(), +}; + export const ragRequestSchema = z.discriminatedUnion("operation", [ - ragChunkOperationSchema, - ragIngestOperationSchema, - ragSaveEmbeddingsOperationSchema, - ragSearchOperationSchema, - ragDeleteEmbeddingsOperationSchema, - ragReindexOperationSchema, - ragListWorkspacesOperationSchema, - ragCloseWorkspaceOperationSchema, - ragDeleteWorkspaceOperationSchema, + ragChunkOperationSchema.extend(ragRequestIdField), + ragIngestOperationSchema.extend(ragRequestIdField), + ragSaveEmbeddingsOperationSchema.extend(ragRequestIdField), + ragSearchOperationSchema.extend(ragRequestIdField), + ragDeleteEmbeddingsOperationSchema.extend(ragRequestIdField), + ragReindexOperationSchema.extend(ragRequestIdField), + ragListWorkspacesOperationSchema.extend(ragRequestIdField), + ragCloseWorkspaceOperationSchema.extend(ragRequestIdField), + ragDeleteWorkspaceOperationSchema.extend(ragRequestIdField), ]); // ============== Response Schemas ============== diff --git a/packages/sdk/server/bare/rag-hyperdb/index.ts b/packages/sdk/server/bare/rag-hyperdb/index.ts index e2317035c9..fae7696469 100644 --- a/packages/sdk/server/bare/rag-hyperdb/index.ts +++ b/packages/sdk/server/bare/rag-hyperdb/index.ts @@ -13,7 +13,8 @@ export { type RagWorkspaceInfo, } from "@/server/bare/rag-hyperdb/rag-workspace-manager"; export { - registerRagOperation, - unregisterRagOperation, - cancelRagOperation, + getActiveRagRequest, + setActiveRagRequest, + clearActiveRagRequest, + getWorkspaceKey, } from "@/server/bare/rag-hyperdb/rag-operation-manager"; diff --git a/packages/sdk/server/bare/rag-hyperdb/rag-operation-manager.ts b/packages/sdk/server/bare/rag-hyperdb/rag-operation-manager.ts index 29edb2f31e..92696588df 100644 --- a/packages/sdk/server/bare/rag-hyperdb/rag-operation-manager.ts +++ b/packages/sdk/server/bare/rag-hyperdb/rag-operation-manager.ts @@ -1,59 +1,89 @@ -import { AbortController, type AbortSignal } from "bare-abort-controller"; -import { DEFAULT_WORKSPACE } from "@/server/bare/rag-hyperdb/rag-workspace-manager"; +import { getRequestRegistry } from "@/server/bare/runtime"; -interface RagOperationEntry { - abortController: AbortController; - operation: string; - startTime: number; -} +// Sentinel for "no workspace specified". Mirrors the value re-exported +// from `rag-workspace-manager.ts`; redeclared locally so this module is +// safely importable in environments that don't have the Bare runtime +// (unit tests, doc generators) — `rag-workspace-manager` pulls in +// `bare-fs` / `bare-path` at module load. +const DEFAULT_WORKSPACE = "default"; + +/** + * RAG operation tracking — workspace-level admission bookkeeping. + * + * In 0.11.0 the cancel surface for in-flight RAG operations consolidates + * onto the request registry (`getRequestRegistry().cancel({ requestId })`). + * The historical workspace-level pre-emption rule — "starting a new + * `ingest` / `reindex` / `saveEmbeddings` on a workspace cancels any prior + * op on the same workspace" — is preserved by the dispatcher in + * `server/rpc/handlers/rag.ts`: the dispatcher cancels the workspace's + * prior `requestId` (if any) and then begins a new registry context. + * Workspace-level admission lives in the dispatcher rather than as a + * registry policy primitive — it's a dispatch concern, not part of the + * registry's per-`kind` admission rules. + * + * This module owns the small workspace → requestId map that makes that + * pre-emption decision routable from the dispatcher. The map is module- + * scoped (one per Bare worker) so the dispatcher, the shutdown sweep + * (`cancelAllRagOperations`), and the workspace-close sweep (which + * delegates to it via `rag-workspace-manager.ts`) all see the same state. + */ -// Map of workspace -> active operation -const activeOperations = new Map(); +// workspace key → requestId of the in-flight RAG operation on that workspace. +const activeRagRequestByWorkspace = new Map(); -export function getWorkspaceKey(workspace?: string) { +export function getWorkspaceKey(workspace?: string): string { return workspace ?? DEFAULT_WORKSPACE; } -export function registerRagOperation( - workspace: string | undefined, - operation: string, -): AbortSignal { - const key = getWorkspaceKey(workspace); - - // Cancel any existing operation on this workspace - cancelRagOperation(workspace); - - const abortController = new AbortController(); - activeOperations.set(key, { - abortController, - operation, - startTime: Date.now(), - }); - - return abortController.signal; +/** + * Returns the `requestId` of any in-flight RAG operation on the workspace + * (or `undefined` if none). The dispatcher uses this to decide whether to + * pre-empt before calling `registry.begin(...)`. + */ +export function getActiveRagRequest(workspace?: string): string | undefined { + return activeRagRequestByWorkspace.get(getWorkspaceKey(workspace)); } -export function unregisterRagOperation(workspace?: string): void { - const key = getWorkspaceKey(workspace); - activeOperations.delete(key); +/** + * Records the `requestId` of a freshly-begun RAG operation. Called by the + * dispatcher in `rag.ts` after `registry.begin(...)` succeeds; paired with + * `clearActiveRagRequest` via the request scope's deferred cleanup so the + * map never outlives the request. + */ +export function setActiveRagRequest( + workspace: string | undefined, + requestId: string, +): void { + activeRagRequestByWorkspace.set(getWorkspaceKey(workspace), requestId); } -export function cancelRagOperation(workspace?: string): boolean { +/** + * Clears the workspace's mapping iff it still belongs to `requestId`. The + * conditional guard handles the natural race between two ingest calls on + * the same workspace: the older context's scope unwind must not stomp the + * newer context's mapping installed by the pre-emption sequence. + */ +export function clearActiveRagRequest( + workspace: string | undefined, + requestId: string, +): void { const key = getWorkspaceKey(workspace); - const entry = activeOperations.get(key); - - if (!entry) { - return false; + if (activeRagRequestByWorkspace.get(key) === requestId) { + activeRagRequestByWorkspace.delete(key); } - - entry.abortController.abort(); - activeOperations.delete(key); - return true; } +/** + * Shutdown / workspace-close sweep. Cancels every tracked RAG request via + * the registry and clears the workspace map. Idempotent — callers that + * re-invoke during a teardown (workspace-close fired twice, shutdown + * racing with `close-all`) get a no-op on the second pass. + */ export function cancelAllRagOperations(): void { - for (const [key, entry] of activeOperations.entries()) { - entry.abortController.abort(); - activeOperations.delete(key); + if (activeRagRequestByWorkspace.size === 0) return; + const registry = getRequestRegistry(); + for (const [key, requestId] of activeRagRequestByWorkspace) { + registry.cancel({ requestId, reason: "rag-shutdown" }); + activeRagRequestByWorkspace.delete(key); } } diff --git a/packages/sdk/server/rpc/handlers/cancelHandler.ts b/packages/sdk/server/rpc/handlers/cancelHandler.ts index 2d889a352b..cdba7e5081 100644 --- a/packages/sdk/server/rpc/handlers/cancelHandler.ts +++ b/packages/sdk/server/rpc/handlers/cancelHandler.ts @@ -2,7 +2,7 @@ import type { CancelRequest, CancelResponse } from "@/schemas/cancel"; import { cancel } from "@/server/bare/ops/cancel"; import { cancelTransfer } from "@/server/rpc/handlers/load-model/download-manager"; import { - cancelRagOperation, + getActiveRagRequest, DEFAULT_WORKSPACE, } from "@/server/bare/rag-hyperdb"; import { getRequestRegistry } from "@/server/bare/runtime"; @@ -18,10 +18,10 @@ export async function cancelHandler( case "inference": // Awaited so the RPC response resolves after the addon has // acknowledged the cancel for non-registry-migrated handlers - // (embeddings / transcription / translation / decoder / OCR / TTS - // until M3b/M3c). The registry-routed path inside `cancel()` is - // already synchronous w.r.t. the abort, so the await is a no-op - // for completion-stream's signal-driven cancel. + // (decoder / OCR / TTS). The registry-routed path inside + // `cancel()` is already synchronous w.r.t. the abort, so the + // await is a no-op for completion-stream's signal-driven + // cancel. await cancel({ modelId: request.modelId }, { kind: "completion" }); break; case "embeddings": @@ -32,21 +32,51 @@ export async function cancelHandler( requestId: request.requestId, }); if (cancelled === 0) { - logger.debug( + // info-level (not debug) because the decorated-promise pattern + // makes "no in-flight match" a common and user-visible case: + // a Stop button fired after the request settled but before the + // UI cleared lands here. Users debugging "my Stop button isn't + // working" need this visible without lowering the log level. + logger.info( `[cancel] no in-flight request matched requestId=${request.requestId}`, ); } break; } case "downloadAsset": + // Deprecated cancel arm. `downloadAsset` is registry-migrated + // and the primary cancel path is now + // `cancel({ operation: "request", requestId })`. This case + // stays for wire-compat with older clients; `cancelTransfer(...)` + // in download-manager.ts routes each subscriber through + // `registry.cancel({ requestId })` so the behaviour is + // equivalent to a broad per-`downloadKey` cancel. + logger.warn( + "[cancel] `cancel({ operation: \"downloadAsset\", downloadKey })` is deprecated — use `cancel({ requestId })` against the value exposed on the `loadModel(...)` / `downloadAsset(...)` promise instead. This compat path is scheduled for removal in a future release.", + ); cancelTransfer(request.downloadKey, request.clearCache); break; case "rag": { - const cancelled = cancelRagOperation(request.workspace); - if (!cancelled) { + // Deprecated cancel arm. RAG is registry-migrated with + // workspace-level admission in the dispatcher (`rag.ts`). + // Primary cancel path is + // `cancel({ operation: "request", requestId })`. This arm + // stays for wire-compat — it walks the workspace→requestId map + // installed by the dispatcher and routes via the registry. + logger.warn( + "[cancel] `cancel({ operation: \"rag\", workspace })` is deprecated — use `cancel({ requestId })` instead. This compat path is scheduled for removal in a future release.", + ); + const workspace = request.workspace ?? DEFAULT_WORKSPACE; + const requestId = getActiveRagRequest(workspace); + if (requestId === undefined) { logger.warn( - `No active RAG operation to cancel for workspace: ${request.workspace ?? DEFAULT_WORKSPACE}`, + `No active RAG operation to cancel for workspace: ${workspace}`, ); + } else { + getRequestRegistry().cancel({ + requestId, + reason: "rag-workspace-cancel", + }); } break; } diff --git a/packages/sdk/server/rpc/handlers/download-asset.ts b/packages/sdk/server/rpc/handlers/download-asset.ts index 892185f1c2..834591952a 100644 --- a/packages/sdk/server/rpc/handlers/download-asset.ts +++ b/packages/sdk/server/rpc/handlers/download-asset.ts @@ -12,9 +12,19 @@ import { resolveModelPath, resolveModelPathWithStats, } from "@/server/rpc/handlers/load-model/resolve"; -import { buildDownloadProfilingFields, type DownloadStats } from "@/server/rpc/handlers/load-model/types"; +import { + buildDownloadProfilingFields, + type DownloadStats, + type DownloadHooks, +} from "@/server/rpc/handlers/load-model/types"; import { nowMs, generateProfileId } from "@/profiling/clock"; import { getServerLogger } from "@/logging"; +import { + getRequestRegistry, + withRequestContext, +} from "@/server/bare/runtime"; +import { generateServerRequestId } from "@/server/bare/runtime/request-id"; +import { InferenceCancelledError } from "@/utils/errors-server"; const logger = getServerLogger(); @@ -29,6 +39,26 @@ export async function handleDownloadAsset( | undefined; const profilingEnabled = profilingMeta?.enabled !== false && !!profilingMeta; + const requestId = request.requestId ?? generateServerRequestId(); + // `downloadAsset` is artifact-shaped, not model-shaped — there is no + // `modelId` to register on the registry entry. Cancel by `requestId` + // is the primary path; `cancel({ modelId })` is intentionally a + // non-match for this kind. + await using ctx = getRequestRegistry().begin({ + requestId, + kind: "downloadAsset", + }); + const log = withRequestContext(getServerLogger(), ctx); + log.debug(`downloadAsset start assetSrc=${assetSrc}`); + + const hooks: DownloadHooks = { + requestBinding: { + signal: ctx.signal, + scope: ctx.scope, + requestId, + }, + }; + try { const totalDownloadStart = profilingEnabled ? nowMs() : 0; @@ -40,11 +70,19 @@ export async function handleDownloadAsset( assetSrc, progressCallback, seed, + ctx.signal, + hooks, ); sourceType = result.sourceType; downloadStats = result.downloadStats; } else { - await resolveModelPath(assetSrc, progressCallback, seed); + await resolveModelPath( + assetSrc, + progressCallback, + seed, + ctx.signal, + hooks, + ); } const response: DownloadAssetResponse = { @@ -74,6 +112,14 @@ export async function handleDownloadAsset( return response; } catch (error: unknown) { + // Mirror the load-model handler's cancel contract: a typed cancel + // bubbles up as the rejection so client-side callers can branch on + // `instanceof InferenceCancelledError`. Every other error keeps + // the legacy `success: false` envelope. + if (error instanceof InferenceCancelledError) { + log.info(`downloadAsset cancelled requestId=${requestId}`); + throw error; + } logger.error("Error downloading asset:", error); return { type: "downloadAsset", diff --git a/packages/sdk/server/rpc/handlers/load-model/download-manager.ts b/packages/sdk/server/rpc/handlers/load-model/download-manager.ts index c0b6f7935d..79771d2011 100644 --- a/packages/sdk/server/rpc/handlers/load-model/download-manager.ts +++ b/packages/sdk/server/rpc/handlers/load-model/download-manager.ts @@ -1,11 +1,39 @@ import type { ModelProgressUpdate } from "@/schemas"; -import { AbortController } from "bare-abort-controller"; -import { DownloadCancelledError } from "@/utils/errors-server"; +import { AbortController, type AbortSignal } from "bare-abort-controller"; +import { + DownloadCancelledError, + InferenceCancelledError, +} from "@/utils/errors-server"; import { getServerLogger } from "@/logging"; +import { getRequestRegistry } from "@/server/bare/runtime"; +import type { DisposableScope } from "@/server/bare/runtime/disposable-scope"; import type { DownloadHooks } from "@/server/rpc/handlers/load-model/types"; const logger = getServerLogger(); +/** + * Per-subscriber binding to a registry-tracked request. + * `startOrJoinDownload` is request-aware: each caller (the + * `await using ctx = registry.begin(...)` inside `handleLoadModel` / + * `handleDownloadAsset`) registers a subscriber bound to its + * `requestId`. A `cancel({ requestId })` against the registry aborts + * the subscriber's `ctx.signal`, which: + * - rejects this subscriber's promise so the awaiting handler unwinds; + * - removes this subscriber from `transfer.subscribers`; + * - tears down the transfer iff this was the last subscriber. + * + * The shared-transfer dedup logic in `startOrJoinDownload` is preserved + * — two callers requesting the same `downloadKey` still share one + * underlying download — but cancel is per-`requestId`-honest: + * cancelling one subscriber does not affect siblings on the same + * `downloadKey`. + */ +export interface SubscriberRequestBinding { + signal: AbortSignal; + scope: DisposableScope; + requestId: string; +} + export interface Subscriber { id: string; onProgress?: ((progress: ModelProgressUpdate) => void) | undefined; @@ -13,6 +41,8 @@ export interface Subscriber { resolve: (path: string) => void; reject: (error: unknown) => void; promise: Promise; + /** Identity of the registry request this subscriber belongs to, if any. */ + requestId?: string | undefined; } export interface Transfer { @@ -43,6 +73,7 @@ let nextSubscriberId = 0; function createSubscriber( onProgress?: (progress: ModelProgressUpdate) => void, + requestId?: string, ): Subscriber { let resolve!: (path: string) => void; let reject!: (error: unknown) => void; @@ -58,6 +89,7 @@ function createSubscriber( resolve, reject, promise, + requestId, }; } @@ -93,7 +125,7 @@ function deliverProgress( }); settleSubscriber(subscriber, error); - transfer.subscribers.delete(subscriber.id); + removeSubscriber(transfer, subscriber.id); } } @@ -108,16 +140,96 @@ function broadcastTransferProgress( } } +/** + * Idempotent: remove the subscriber from the transfer's roster and tear + * the transfer down iff no subscribers remain. Wired from two places per + * subscriber: + * + * - the `request.signal.addEventListener("abort", ...)` listener that + * fires when `registry.cancel({ requestId })` aborts the request's + * context — this is the "user clicked Stop" cancel path; + * - `request.scope.defer(...)` which runs at request scope unwind — + * the safety net catching every other unwind path (handler returns, + * handler throws for a non-cancel reason, awaited promise settled + * and the `await using` falls out of scope on the success path). + * + * Both ends call this helper so an already-cleaned-up subscriber is a + * no-op the second time around. + */ +function removeSubscriber(transfer: Transfer, subscriberId: string): void { + if (!transfer.subscribers.has(subscriberId)) return; + transfer.subscribers.delete(subscriberId); + maybeCancelTransfer(transfer); +} + +/** + * Last-subscriber rule: when every caller has detached (cancel or + * progress-callback throw), abort the shared transfer so the underlying + * HTTP / hyperdrive download tears down. Until then the transfer keeps + * running for the remaining subscribers — the content-addressed dedup + * semantics callers rely on. + */ +function maybeCancelTransfer(transfer: Transfer): void { + if (transfer.subscribers.size > 0) return; + if (transfer.abortController.signal.aborted) return; + logger.debug( + `[download-manager] last subscriber left, aborting transfer ${transfer.downloadKey}`, + ); + transfer.abortController.abort(); +} + +function attachRequestBinding( + transfer: Transfer, + subscriber: Subscriber, + request: SubscriberRequestBinding, +): void { + const onAbort = () => { + if (!subscriber.settled) { + settleSubscriber( + subscriber, + new InferenceCancelledError(request.requestId), + ); + } + removeSubscriber(transfer, subscriber.id); + }; + + if (request.signal.aborted) { + onAbort(); + return; + } + + request.signal.addEventListener("abort", onAbort, { once: true }); + + // Safety net: scope unwind on any handler exit path triggers the same + // cleanup. If the abort listener already ran it's a no-op. Cleaning + // up the abort listener here keeps the parent signal from carrying a + // dangling reference into the next request. + request.scope.defer(() => { + request.signal.removeEventListener("abort", onAbort); + if (!subscriber.settled) { + settleSubscriber( + subscriber, + new InferenceCancelledError(request.requestId), + ); + } + removeSubscriber(transfer, subscriber.id); + }); +} + export function startOrJoinDownload( downloadKey: string, startDownload: (ctx: DownloadContext) => Promise, onProgress?: (progress: ModelProgressUpdate) => void, + request?: SubscriberRequestBinding, ): StartOrJoinResult { const existing = activeTransfers.get(downloadKey); if (existing && !existing.abortController.signal.aborted) { logger.info(`📥 Reusing existing download for: ${downloadKey}`); - const subscriber = createSubscriber(onProgress); + const subscriber = createSubscriber(onProgress, request?.requestId); existing.subscribers.set(subscriber.id, subscriber); + if (request) { + attachRequestBinding(existing, subscriber, request); + } if (existing.lastProgress) { deliverProgress(existing, subscriber, existing.lastProgress); @@ -138,9 +250,12 @@ export function startOrJoinDownload( clearCache: false, }; - const initialSubscriber = createSubscriber(onProgress); + const initialSubscriber = createSubscriber(onProgress, request?.requestId); transfer.subscribers.set(initialSubscriber.id, initialSubscriber); activeTransfers.set(downloadKey, transfer); + if (request) { + attachRequestBinding(transfer, initialSubscriber, request); + } const downloadPromise = startDownload({ broadcastProgress: (progress) => { @@ -180,6 +295,18 @@ export function startOrJoinDownload( }; } +/** + * Legacy cancel entry point. Callers (`cancelHandler`'s deprecated + * `case "downloadAsset"` arm, the shutdown sweep, intra-resolve + * cleanup) call this with a `downloadKey`. The single source of cancel + * routing is the request registry, so this function resolves each + * subscriber's request via `registry.cancel({ requestId })`. + * + * Subscribers without a `requestId` (legacy callers that didn't pass a + * registry binding) are settled directly with `DownloadCancelledError` + * so we don't leak the transfer if a legacy code path holds the only + * reference. + */ export function cancelTransfer( downloadKey: string, clearCache = false, @@ -188,10 +315,32 @@ export function cancelTransfer( if (!transfer) return; transfer.clearCache = clearCache; - transfer.abortController.abort(); - for (const sub of transfer.subscribers.values()) { + const registry = getRequestRegistry(); + const orphanSubs: Subscriber[] = []; + for (const sub of Array.from(transfer.subscribers.values())) { + if (sub.requestId !== undefined) { + registry.cancel({ + requestId: sub.requestId, + reason: "download-transfer-cancel", + }); + } else { + orphanSubs.push(sub); + } + } + + if (orphanSubs.length === 0) { + return; + } + + // Legacy subscribers (no registry binding): settle each with + // `DownloadCancelledError` and route removal through the + // `removeSubscriber` helper so the last-subscriber teardown rule is + // enforced in one place. Registry-bound subscribers are handled by + // their `attachRequestBinding` listener triggered above. + for (const sub of orphanSubs) { settleSubscriber(sub, new DownloadCancelledError()); + removeSubscriber(transfer, sub.id); } } diff --git a/packages/sdk/server/rpc/handlers/load-model/handler.ts b/packages/sdk/server/rpc/handlers/load-model/handler.ts index 84b00e5196..4ecf880719 100644 --- a/packages/sdk/server/rpc/handlers/load-model/handler.ts +++ b/packages/sdk/server/rpc/handlers/load-model/handler.ts @@ -25,6 +25,7 @@ import { import { buildDownloadProfilingFields } from "@/server/rpc/handlers/load-model/types"; import { ConfigReloadNotSupportedError, + InferenceCancelledError, ModelTypeMismatchError, ModelIsDelegatedError, ModelNotFoundError, @@ -34,6 +35,11 @@ import { } from "@/utils/errors-server"; import { getServerLogger } from "@/logging"; import { getPlugin } from "@/server/plugins"; +import { + getRequestRegistry, + withRequestContext, +} from "@/server/bare/runtime"; +import { generateServerRequestId } from "@/server/bare/runtime/request-id"; const logger = getServerLogger(); @@ -57,6 +63,19 @@ export async function handleLoadModel( | undefined; const profilingEnabled = profilingMeta?.enabled !== false && !!profilingMeta; + const requestId = request.requestId ?? generateServerRequestId(); + // The handler `modelId` is derived from the config hash below — it + // isn't known until after `resolveConfig` runs. The registry context + // is opened with `modelId: undefined` so a cancel-by-modelId fired + // before the load completes is a clean no-op (rather than matching a + // half-built entry). Cancel-by-`requestId` works from `begin(...)` on. + await using ctx = getRequestRegistry().begin({ + requestId, + kind: "loadModel", + }); + const log = withRequestContext(getServerLogger(), ctx); + log.debug(`loadModel start modelSrc=${String(modelSrc ?? "")}`); + try { const plugin = getPlugin(canonicalModelType); if (!plugin) { @@ -86,6 +105,12 @@ export async function handleLoadModel( progressCallback, seed, profilingEnabled, + signal: ctx.signal, + requestBinding: { + signal: ctx.signal, + scope: ctx.scope, + requestId, + }, }); const primaryResolve = session.resolvePrimaryModelPath(modelSrc); @@ -111,6 +136,16 @@ export async function handleLoadModel( throw error; } + // Phase boundary between download and load. The bare load path + // (`plugin.createModel(...)` / `model.load(false)`) does not accept + // an abort signal today, so the only honest cancel hook here is the + // pre-load gate: if cancel arrived during download / resolveConfig, + // surface `InferenceCancelledError` before sinking time into the + // addon work. + if (ctx.signal.aborted) { + throw new InferenceCancelledError(requestId); + } + const configStr = canonicalConfigString( request.modelConfig, ); @@ -153,6 +188,19 @@ export async function handleLoadModel( profilingEnabled ? { collectTiming: true } : undefined, ); + // Load phase soft-cancel limitation: `plugin.createModel(...)` and + // `model.load(false)` ran without a signal. A cancel that arrived + // during the load phase has just landed in the registry but the + // model is now loaded and registered. Surface + // `InferenceCancelledError` so the caller's promise rejects + // consistently — even though the model state is the "loaded" + // post-condition. The orphan-model edge case is a known + // limitation that requires a per-load cancel surface on the addon + // to fix end-to-end. + if (ctx.signal.aborted) { + throw new InferenceCancelledError(requestId); + } + const response: LoadModelResponse = { type: "loadModel", success: true, @@ -191,6 +239,16 @@ export async function handleLoadModel( return response; } catch (error) { + // `InferenceCancelledError` rides the rejection path verbatim: the + // client side wants a typed cancel error on the promise it `await`s, + // not a `{ success: false, error }` envelope that surfaces as a + // generic `ModelLoadFailedError`. Every other error keeps the + // legacy `success: false` shape for backward compat with consumers + // that switch on the envelope. + if (error instanceof InferenceCancelledError) { + log.info(`loadModel cancelled requestId=${requestId}`); + throw error; + } logger.error("Error loading model:", error); return { type: "loadModel", diff --git a/packages/sdk/server/rpc/handlers/load-model/http.ts b/packages/sdk/server/rpc/handlers/load-model/http.ts index b875de0a80..8bc1572102 100644 --- a/packages/sdk/server/rpc/handlers/load-model/http.ts +++ b/packages/sdk/server/rpc/handlers/load-model/http.ts @@ -539,6 +539,7 @@ export async function downloadModelFromHttp( } }, progressCallback, + hooks?.requestBinding, ); return applyJoinedDownloadStats(result, hooks); @@ -685,6 +686,7 @@ async function downloadShardedModelFromHttp( } }, progressCallback, + hooks?.requestBinding, ); return applyJoinedDownloadStats(result, hooks); @@ -840,6 +842,7 @@ async function downloadShardedModelFromArchive( } }, progressCallback, + hooks?.requestBinding, ); return applyJoinedDownloadStats(result, hooks); diff --git a/packages/sdk/server/rpc/handlers/load-model/hyperdrive.ts b/packages/sdk/server/rpc/handlers/load-model/hyperdrive.ts index 33bd34a952..6bfb50e5ff 100644 --- a/packages/sdk/server/rpc/handlers/load-model/hyperdrive.ts +++ b/packages/sdk/server/rpc/handlers/load-model/hyperdrive.ts @@ -864,6 +864,7 @@ export async function downloadModelFromHyperdrive( return modelPath; }, progressCallback, + hooks?.requestBinding, ); return applyJoinedDownloadStats(result, hooks); diff --git a/packages/sdk/server/rpc/handlers/load-model/registry.ts b/packages/sdk/server/rpc/handlers/load-model/registry.ts index b866159051..a399e9cd3c 100644 --- a/packages/sdk/server/rpc/handlers/load-model/registry.ts +++ b/packages/sdk/server/rpc/handlers/load-model/registry.ts @@ -464,6 +464,7 @@ export async function downloadModelFromRegistry( return modelPath; }, progressCallback, + hooks?.requestBinding, ); return applyJoinedDownloadStats(result, hooks); diff --git a/packages/sdk/server/rpc/handlers/load-model/resolve-session.ts b/packages/sdk/server/rpc/handlers/load-model/resolve-session.ts index 43da876c2b..1a524e4b47 100644 --- a/packages/sdk/server/rpc/handlers/load-model/resolve-session.ts +++ b/packages/sdk/server/rpc/handlers/load-model/resolve-session.ts @@ -1,3 +1,4 @@ +import type { AbortSignal } from "bare-abort-controller"; import type { ModelProgressUpdate, ResolveContext } from "@/schemas"; import { resolveModelPath, @@ -5,6 +6,7 @@ import { } from "@/server/rpc/handlers/load-model/resolve"; import { cancelTransfer } from "@/server/rpc/handlers/load-model/download-manager"; import type { + DownloadRequestBinding, ResolveResult, DownloadHooks, } from "@/server/rpc/handlers/load-model/types"; @@ -14,6 +16,24 @@ export interface ResolveSessionOptions { progressCallback?: ((update: ModelProgressUpdate) => void) | undefined; seed?: boolean | undefined; profilingEnabled: boolean; + /** + * Optional cancel signal — typically `ctx.signal` from the surrounding + * `await using ctx = registry.begin(...)` block. When provided, + * `resolveModelPath` short-circuits with `InferenceCancelledError` if the + * signal is already aborted on entry; the same signal also propagates + * to in-progress transfers via the request binding below so cancel + * tears them down end-to-end. + */ + signal?: AbortSignal | undefined; + /** + * Optional per-request binding threaded into every `startOrJoinDownload` + * call. The download manager wires a per-subscriber abort listener + * against `binding.signal`, registers a scope-defer cleanup, and stamps + * the subscriber with `binding.requestId` so the legacy + * `cancelTransfer(downloadKey)` path can route through + * `registry.cancel({ requestId })`. + */ + requestBinding?: DownloadRequestBinding | undefined; } export interface ResolveSession { @@ -28,7 +48,8 @@ export interface ResolveSession { } export function createResolveSession(options: ResolveSessionOptions): ResolveSession { - const { progressCallback, seed, profilingEnabled } = options; + const { progressCallback, seed, profilingEnabled, signal, requestBinding } = + options; let primaryResult: ResolveResult | undefined; const resolveResults: ResolveResult[] = []; const activeDownloadKeys = new Set(); @@ -37,6 +58,7 @@ export function createResolveSession(options: ResolveSessionOptions): ResolveSes onDownloadKey(key: string) { activeDownloadKeys.add(key); }, + ...(requestBinding !== undefined && { requestBinding }), }; async function resolvePrimaryModelPath(modelSrc: unknown) { @@ -45,13 +67,20 @@ export function createResolveSession(options: ResolveSessionOptions): ResolveSes modelSrc, progressCallback, seed, + signal, downloadHooks, ); primaryResult = result; resolveResults.push(result); return result.path; } - return resolveModelPath(modelSrc, progressCallback, seed, downloadHooks); + return resolveModelPath( + modelSrc, + progressCallback, + seed, + signal, + downloadHooks, + ); } async function resolveForPlugin(src: unknown) { @@ -60,12 +89,19 @@ export function createResolveSession(options: ResolveSessionOptions): ResolveSes src, progressCallback, seed, + signal, downloadHooks, ); resolveResults.push(result); return result.path; } - return resolveModelPath(src, progressCallback, seed, downloadHooks); + return resolveModelPath( + src, + progressCallback, + seed, + signal, + downloadHooks, + ); } function createResolveContext( diff --git a/packages/sdk/server/rpc/handlers/load-model/resolve.ts b/packages/sdk/server/rpc/handlers/load-model/resolve.ts index ec75cb5192..216de90490 100644 --- a/packages/sdk/server/rpc/handlers/load-model/resolve.ts +++ b/packages/sdk/server/rpc/handlers/load-model/resolve.ts @@ -22,7 +22,9 @@ import { downloadModelFromRegistryWithStats, } from "./download-stats"; import type { ResolveResult, DownloadResult, DownloadHooks } from "./types"; +import type { AbortSignal } from "bare-abort-controller"; import { + InferenceCancelledError, ModelLoadFailedError, ModelNotFoundError, SeedingNotSupportedError, @@ -123,8 +125,14 @@ async function resolveModelPathCore( progressCallback: ((progress: ModelProgressUpdate) => void) | undefined, seed: boolean | undefined, mode: ResolveMode, + signal: AbortSignal | undefined, hooks?: DownloadHooks, ): Promise { + if (signal?.aborted) { + throw new InferenceCancelledError( + hooks?.requestBinding?.requestId ?? "unknown", + ); + } const srcString = modelInputToSrcSchema.parse(modelSrc); // Parse hyperdrive URLs if present @@ -215,9 +223,17 @@ export async function resolveModelPath( modelSrc: unknown, progressCallback?: (progress: ModelProgressUpdate) => void, seed?: boolean, + signal?: AbortSignal, hooks?: DownloadHooks, ): Promise { - const result = await resolveModelPathCore(modelSrc, progressCallback, seed, "base", hooks); + const result = await resolveModelPathCore( + modelSrc, + progressCallback, + seed, + "base", + signal, + hooks, + ); return result.path; } @@ -225,7 +241,15 @@ export async function resolveModelPathWithStats( modelSrc: unknown, progressCallback?: (progress: ModelProgressUpdate) => void, seed?: boolean, + signal?: AbortSignal, hooks?: DownloadHooks, ): Promise { - return resolveModelPathCore(modelSrc, progressCallback, seed, "stats", hooks); + return resolveModelPathCore( + modelSrc, + progressCallback, + seed, + "stats", + signal, + hooks, + ); } diff --git a/packages/sdk/server/rpc/handlers/load-model/types.ts b/packages/sdk/server/rpc/handlers/load-model/types.ts index 3f97a95a67..e1367ea735 100644 --- a/packages/sdk/server/rpc/handlers/load-model/types.ts +++ b/packages/sdk/server/rpc/handlers/load-model/types.ts @@ -1,4 +1,6 @@ import type { SourceType } from "@/schemas"; +import type { AbortSignal } from "bare-abort-controller"; +import type { DisposableScope } from "@/server/bare/runtime/disposable-scope"; export interface DownloadStats { downloadTimeMs?: number; @@ -20,12 +22,35 @@ export interface DownloadResult { stats?: DownloadStats; } +/** + * Internal request binding threaded through `DownloadHooks` so the + * download manager's `startOrJoinDownload` can register a per-subscriber + * cancel hook against the caller's `RequestContext`. The binding's + * fields mirror the registry context's surface area: `signal` is wired + * to a subscriber-removing listener, `scope` registers the idempotent + * cleanup, and `requestId` lets `cancelTransfer` (legacy entry point) + * route cancels through `registry.cancel(...)` rather than reaching + * into the transfer directly. + */ +export interface DownloadRequestBinding { + signal: AbortSignal; + scope: DisposableScope; + requestId: string; +} + export interface DownloadHooks { onDownloadKey?: (key: string) => void; markCacheHit?: () => void; markCacheMiss?: () => void; markSharedTransfer?: () => void; addChecksumValidationTimeMs?: (durationMs: number) => void; + /** + * When set, `startOrJoinDownload` attaches a per-subscriber cancel + * listener bound to this request. `registry.cancel({ requestId })` + * aborts only this subscriber; the transfer keeps running for siblings + * joined on the same `downloadKey` until the last subscriber leaves. + */ + requestBinding?: DownloadRequestBinding; } export interface LoadModelProfilingMeta { diff --git a/packages/sdk/server/rpc/handlers/rag.ts b/packages/sdk/server/rpc/handlers/rag.ts index 5a3fa33e0d..5507295a39 100644 --- a/packages/sdk/server/rpc/handlers/rag.ts +++ b/packages/sdk/server/rpc/handlers/rag.ts @@ -1,3 +1,4 @@ +import type { AbortSignal } from "bare-abort-controller"; import type { RagRequest, RagResponse, RagProgressUpdate } from "@/schemas"; import { chunk, @@ -10,9 +11,17 @@ import { closeWorkspace, deleteWorkspace, DEFAULT_WORKSPACE, - registerRagOperation, - unregisterRagOperation, + getActiveRagRequest, + setActiveRagRequest, + clearActiveRagRequest, } from "@/server/bare/rag-hyperdb"; +import { + getRequestRegistry, + withRequestContext, + type ManagedRequestContext, +} from "@/server/bare/runtime"; +import { generateServerRequestId } from "@/server/bare/runtime/request-id"; +import { getServerLogger } from "@/logging"; import { profileReplyHandler, registerOperationMetrics, @@ -48,10 +57,9 @@ registerOperationMetrics< function createHandlerOptions( operation: ProgressOperation, workspace: string, + signal: AbortSignal, onProgress?: (update: RagProgressUpdate) => void, ): HandlerOptions { - const signal = registerRagOperation(workspace, operation); - const options: HandlerOptions = { signal }; if (onProgress) { @@ -79,6 +87,41 @@ function omitOnProgress>( return rest; } +/** + * Begin a registry-tracked RAG context with workspace-level pre-emption. + * + * Workspace-level admission lives in the dispatcher rather than as a + * registry policy primitive (it's a dispatch concern, not a registry + * `kind` admission rule). The sequence is **cancel-prior → begin-new**: + * if another RAG operation is already running on the same workspace, + * cancel it first, then begin the new context. Reversing the order + * would cancel the just-installed context. + * + * The workspace → requestId map is updated after `begin(...)` succeeds + * and cleared on scope unwind via `scope.defer(...)`, with a + * "still mine?" guard so an older op's deferred cleanup cannot stomp + * a newer op's mapping. + */ +function beginRagContext( + workspace: string, + requestId: string, +): ManagedRequestContext { + const registry = getRequestRegistry(); + const prev = getActiveRagRequest(workspace); + if (prev !== undefined && prev !== requestId) { + registry.cancel({ requestId: prev, reason: "rag-workspace-preempt" }); + } + const ctx = registry.begin({ + requestId, + kind: "rag", + }); + setActiveRagRequest(workspace, requestId); + ctx.scope.defer(() => { + clearActiveRagRequest(workspace, requestId); + }); + return ctx; +} + export async function handleRag( request: RagRequest, onProgress?: (update: RagProgressUpdate) => void, @@ -105,45 +148,47 @@ async function handleRagInternal( case "ingest": { const workspace = request.workspace ?? DEFAULT_WORKSPACE; + const requestId = request.requestId ?? generateServerRequestId(); + await using ctx = beginRagContext(workspace, requestId); + const log = withRequestContext(getServerLogger(), ctx); + log.debug("ingest start"); const handlerOptions = createHandlerOptions( "ingest", workspace, + ctx.signal, onProgress, ); const params = omitOnProgress(request); - try { - const result = await ingest(params, handlerOptions); - return { - type: "rag", - operation: request.operation, - success: true, - processed: result.processed, - droppedIndices: result.droppedIndices, - }; - } finally { - unregisterRagOperation(workspace); - } + const result = await ingest(params, handlerOptions); + return { + type: "rag", + operation: request.operation, + success: true, + processed: result.processed, + droppedIndices: result.droppedIndices, + }; } case "saveEmbeddings": { const workspace = request.workspace ?? DEFAULT_WORKSPACE; + const requestId = request.requestId ?? generateServerRequestId(); + await using ctx = beginRagContext(workspace, requestId); + const log = withRequestContext(getServerLogger(), ctx); + log.debug("saveEmbeddings start"); const handlerOptions = createHandlerOptions( "saveEmbeddings", workspace, + ctx.signal, onProgress, ); const params = omitOnProgress(request); - try { - const processed = await saveEmbeddings(params, handlerOptions); - return { - type: "rag", - operation: request.operation, - success: true, - processed, - }; - } finally { - unregisterRagOperation(workspace); - } + const processed = await saveEmbeddings(params, handlerOptions); + return { + type: "rag", + operation: request.operation, + success: true, + processed, + }; } case "search": { @@ -167,23 +212,24 @@ async function handleRagInternal( case "reindex": { const workspace = request.workspace ?? DEFAULT_WORKSPACE; + const requestId = request.requestId ?? generateServerRequestId(); + await using ctx = beginRagContext(workspace, requestId); + const log = withRequestContext(getServerLogger(), ctx); + log.debug("reindex start"); const handlerOptions = createHandlerOptions( "reindex", workspace, + ctx.signal, onProgress, ); const params = omitOnProgress(request); - try { - const result = await reindex(params, handlerOptions); - return { - type: "rag", - operation: request.operation, - success: true, - result, - }; - } finally { - unregisterRagOperation(workspace); - } + const result = await reindex(params, handlerOptions); + return { + type: "rag", + operation: request.operation, + success: true, + result, + }; } case "listWorkspaces": { diff --git a/packages/sdk/test/unit/download-manager-subscriber.test.ts b/packages/sdk/test/unit/download-manager-subscriber.test.ts new file mode 100644 index 0000000000..2affead971 --- /dev/null +++ b/packages/sdk/test/unit/download-manager-subscriber.test.ts @@ -0,0 +1,298 @@ +// @ts-expect-error brittle has no type declarations +import test from "brittle"; +import { AbortController } from "bare-abort-controller"; +import { + startOrJoinDownload, + type DownloadContext, +} from "@/server/rpc/handlers/load-model/download-manager"; +import { createDisposableScope } from "@/server/bare/runtime/disposable-scope"; +import { InferenceCancelledError } from "@/utils/errors-server"; + +// ----------------------------------------------------------------------------- +// download-manager: per-`requestId` cancel + content-addressed dedup. +// +// These tests pin the contract for `startOrJoinDownload`: +// +// 1. Two callers with the same `downloadKey` share one underlying +// transfer (dedup preserved). +// 2. Cancelling one subscriber's `request.signal` rejects only that +// subscriber's promise — the joined subscriber keeps running. +// 3. When the last subscriber leaves (cancel or scope unwind), the +// transfer's `AbortController` fires so the network call tears +// down. Until then the transfer keeps running. +// 4. `scope.defer(...)` is the safety net: if a handler exits without +// cancelling, the deferred cleanup runs the same subscriber +// removal path. Idempotent w.r.t. the abort listener. +// +// The download function is a manually-controlled promise so each test +// can choreograph "subscriber attaches → first cancel → second cancel +// → underlying transfer aborts" without timing dependencies. +// ----------------------------------------------------------------------------- + +type T = { + is: (actual: unknown, expected: unknown, msg?: string) => void; + ok: (value: unknown, msg?: string) => void; + exception: ( + fn: () => Promise | unknown, + matcher?: unknown, + msg?: string, + ) => Promise; +}; + +interface ControllableDownload { + start: (ctx: DownloadContext) => Promise; + ctx: DownloadContext | null; + settle: (path: string) => void; + reject: (err: unknown) => void; + attached: Promise; +} + +function makeControllableDownload(): ControllableDownload { + let resolveAttached!: (ctx: DownloadContext) => void; + const attached = new Promise((res) => { + resolveAttached = res; + }); + + let resolveDownload!: (path: string) => void; + let rejectDownload!: (err: unknown) => void; + const downloadPromise = new Promise((res, rej) => { + resolveDownload = res; + rejectDownload = rej; + }); + + const obj: ControllableDownload = { + ctx: null, + attached, + settle: (path) => resolveDownload(path), + reject: (err) => rejectDownload(err), + start: (ctx: DownloadContext) => { + obj.ctx = ctx; + resolveAttached(ctx); + return downloadPromise; + }, + }; + return obj; +} + +function bindingFromAbortController( + ac: AbortController, + requestId: string, +): { + signal: AbortController["signal"]; + scope: ReturnType; + requestId: string; +} { + return { + signal: ac.signal, + scope: createDisposableScope(), + requestId, + }; +} + +test("download-manager: two callers with same key share one transfer", async (t: T) => { + const dl = makeControllableDownload(); + const downloadKey = `dedup:${Math.random()}`; + + const acA = new AbortController(); + const acB = new AbortController(); + + const first = startOrJoinDownload( + downloadKey, + dl.start, + undefined, + bindingFromAbortController(acA, "req-A"), + ); + const second = startOrJoinDownload( + downloadKey, + () => { + throw new Error( + "second caller must reuse the existing transfer, not start a new one", + ); + }, + undefined, + bindingFromAbortController(acB, "req-B"), + ); + + t.is(first.joined, false, "first call starts the transfer"); + t.is(second.joined, true, "second call joins the existing transfer"); + + await dl.attached; + dl.settle("/cached/model.gguf"); + + const [a, b] = await Promise.all([first.promise, second.promise]); + t.is(a, "/cached/model.gguf"); + t.is(b, "/cached/model.gguf"); +}); + +test("download-manager: cancelling one subscriber does not affect the other", async (t: T) => { + const dl = makeControllableDownload(); + const downloadKey = `cancel-one:${Math.random()}`; + + const acA = new AbortController(); + const acB = new AbortController(); + + const first = startOrJoinDownload( + downloadKey, + dl.start, + undefined, + bindingFromAbortController(acA, "req-A"), + ); + const second = startOrJoinDownload( + downloadKey, + dl.start, + undefined, + bindingFromAbortController(acB, "req-B"), + ); + + await dl.attached; + t.is( + dl.ctx?.signal.aborted, + false, + "underlying transfer is not aborted while subscribers remain", + ); + + // Cancel only A. B is still attached so the underlying transfer + // must keep running. + acA.abort(); + + await t.exception( + async () => { + await first.promise; + }, + /cancel/i, + "subscriber A's promise rejects with a cancel error", + ); + + // The underlying transfer must still be live for B. + t.is( + dl.ctx?.signal.aborted, + false, + "transfer keeps running while B is still subscribed", + ); + + dl.settle("/path/b.gguf"); + t.is(await second.promise, "/path/b.gguf"); +}); + +test("download-manager: last subscriber leaving aborts the transfer", async (t: T) => { + const dl = makeControllableDownload(); + const downloadKey = `last-sub:${Math.random()}`; + + const acA = new AbortController(); + const acB = new AbortController(); + + const first = startOrJoinDownload( + downloadKey, + dl.start, + undefined, + bindingFromAbortController(acA, "req-A"), + ); + const second = startOrJoinDownload( + downloadKey, + dl.start, + undefined, + bindingFromAbortController(acB, "req-B"), + ); + + await dl.attached; + + acA.abort(); + await t.exception(async () => { + await first.promise; + }); + t.is( + dl.ctx?.signal.aborted, + false, + "transfer still alive after first cancel", + ); + + acB.abort(); + await t.exception(async () => { + await second.promise; + }); + t.is( + dl.ctx?.signal.aborted, + true, + "transfer aborts when the last subscriber leaves", + ); + + // Reject the underlying download to flush the promise — required so + // the test process exits cleanly. The settlement happens after both + // subscribers have already settled with InferenceCancelledError, so + // it has no observable effect. + dl.reject(new Error("download aborted")); +}); + +test("download-manager: scope.defer is the safety net for handler-exit paths", async (t: T) => { + const dl = makeControllableDownload(); + const downloadKey = `defer-unwind:${Math.random()}`; + + const ac = new AbortController(); + const binding = bindingFromAbortController(ac, "req-defer"); + + const result = startOrJoinDownload( + downloadKey, + dl.start, + undefined, + binding, + ); + + await dl.attached; + + // Simulate a handler that returned without explicitly cancelling + // (e.g. the request scope unwinds because the awaiting code path + // threw for a non-cancel reason). The scope.defer-registered cleanup + // must still remove the subscriber from the transfer. + await binding.scope[Symbol.asyncDispose](); + + await t.exception( + async () => { + await result.promise; + }, + /cancel/i, + "scope unwind settles the subscriber via the deferred cleanup", + ); + + t.is( + dl.ctx?.signal.aborted, + true, + "scope unwind on the sole subscriber aborts the underlying transfer", + ); + + dl.reject(new Error("download aborted")); +}); + +test("download-manager: cancel error carries the requestId", async (t: T) => { + const dl = makeControllableDownload(); + const downloadKey = `cancel-id:${Math.random()}`; + + const ac = new AbortController(); + const result = startOrJoinDownload( + downloadKey, + dl.start, + undefined, + bindingFromAbortController(ac, "req-with-id"), + ); + + await dl.attached; + ac.abort(); + + try { + await result.promise; + t.ok(false, "promise should have rejected"); + } catch (err) { + t.ok( + err instanceof InferenceCancelledError, + "rejection is an InferenceCancelledError", + ); + if (err instanceof InferenceCancelledError) { + t.is( + err.requestId, + "req-with-id", + "the error preserves the requestId for downstream observability", + ); + } + } + + dl.reject(new Error("download aborted")); +}); diff --git a/packages/sdk/test/unit/rag-workspace-preempt.test.ts b/packages/sdk/test/unit/rag-workspace-preempt.test.ts new file mode 100644 index 0000000000..cdec6c19e9 --- /dev/null +++ b/packages/sdk/test/unit/rag-workspace-preempt.test.ts @@ -0,0 +1,83 @@ +// @ts-expect-error brittle has no type declarations +import test from "brittle"; +import { + getActiveRagRequest, + setActiveRagRequest, + clearActiveRagRequest, +} from "@/server/bare/rag-hyperdb/rag-operation-manager"; + +// ----------------------------------------------------------------------------- +// RAG workspace → requestId map. +// +// The dispatcher-level pre-emption rule in `server/rpc/handlers/rag.ts` +// (cancel any existing RAG request on the same workspace before +// beginning a new one) is built on three small helpers in +// `rag-operation-manager.ts`: +// +// - `getActiveRagRequest(workspace)` — returns the currently-tracked +// `requestId` (or `undefined`) +// - `setActiveRagRequest(workspace, requestId)` — replaces it +// - `clearActiveRagRequest(workspace, requestId)` — clears iff the +// map still holds the same id (the "still mine?" guard) +// +// The "still mine?" guard is what makes pre-emption safe: an older +// op's scope-deferred cleanup must not wipe out a newer op's mapping. +// These tests pin that contract in isolation from the registry so a +// refactor that loses the guard will fail here before the integration +// surface breaks. +// ----------------------------------------------------------------------------- + +type T = { + is: (actual: unknown, expected: unknown, msg?: string) => void; + ok: (value: unknown, msg?: string) => void; +}; + +const WS = "ws-test"; +const WS_OTHER = "ws-other"; + +test("rag workspace map: get returns undefined before set", (t: T) => { + clearActiveRagRequest(WS, "unused"); + t.is(getActiveRagRequest(WS), undefined); +}); + +test("rag workspace map: set / get round-trip", (t: T) => { + setActiveRagRequest(WS, "r-1"); + t.is(getActiveRagRequest(WS), "r-1"); + clearActiveRagRequest(WS, "r-1"); + t.is(getActiveRagRequest(WS), undefined); +}); + +test("rag workspace map: set replaces (pre-emption write path)", (t: T) => { + setActiveRagRequest(WS, "r-1"); + setActiveRagRequest(WS, "r-2"); + t.is(getActiveRagRequest(WS), "r-2", "newer requestId replaces the older"); + clearActiveRagRequest(WS, "r-2"); +}); + +test("rag workspace map: clear is a no-op for stale requestId (still-mine guard)", (t: T) => { + setActiveRagRequest(WS, "r-current"); + // Older op's scope unwind fires after a newer op has already + // installed its mapping. The stale clear must not stomp. + clearActiveRagRequest(WS, "r-stale"); + t.is( + getActiveRagRequest(WS), + "r-current", + "stale clear leaves the current mapping intact", + ); + clearActiveRagRequest(WS, "r-current"); +}); + +test("rag workspace map: workspaces are isolated", (t: T) => { + setActiveRagRequest(WS, "r-a"); + setActiveRagRequest(WS_OTHER, "r-b"); + t.is(getActiveRagRequest(WS), "r-a"); + t.is(getActiveRagRequest(WS_OTHER), "r-b"); + clearActiveRagRequest(WS, "r-a"); + t.is(getActiveRagRequest(WS), undefined); + t.is( + getActiveRagRequest(WS_OTHER), + "r-b", + "clearing one workspace must not touch the other", + ); + clearActiveRagRequest(WS_OTHER, "r-b"); +}); diff --git a/packages/sdk/test/unit/request-id-wire.test.ts b/packages/sdk/test/unit/request-id-wire.test.ts new file mode 100644 index 0000000000..acfaf22f61 --- /dev/null +++ b/packages/sdk/test/unit/request-id-wire.test.ts @@ -0,0 +1,109 @@ +// @ts-expect-error brittle has no type declarations +import test from "brittle"; +import { + loadModelOptionsToRequestSchema, + downloadAssetOptionsToRequestSchema, +} from "@/schemas"; +import { ragRequestSchema } from "@/schemas/rag"; +import { ModelType } from "@/schemas/model-types"; + +// ----------------------------------------------------------------------------- +// requestId wire-shape round-trip — schema half. +// +// The decorated-promise contract says: the client generates a +// `requestId` once, the request envelope carries it on the wire, and +// the server uses that same value as the registry-entry key. These +// tests pin the **envelope half** — that the request schemas accept +// and preserve `requestId` for `loadModel`, `downloadAsset`, and `rag`. +// +// The handler-side half (server keys the registry on the client's +// `requestId`) is exercised by the per-handler tests in +// `request-registry.test.ts` + the dispatcher-level cancel arm in +// `cancelHandler.ts`. +// ----------------------------------------------------------------------------- + +type T = { + is: (actual: unknown, expected: unknown, msg?: string) => void; + ok: (value: unknown, msg?: string) => void; +}; + +test("loadModelOptionsToRequestSchema: forwards requestId onto the wire envelope", (t: T) => { + const parsed = loadModelOptionsToRequestSchema.parse({ + modelType: ModelType.llamacppCompletion, + modelSrc: "/tmp/model.gguf", + requestId: "client-uuid-load", + }); + t.is( + (parsed as { requestId?: string }).requestId, + "client-uuid-load", + "loadModel envelope must carry the client-generated requestId", + ); +}); + +test("loadModelOptionsToRequestSchema: requestId is optional (legacy clients)", (t: T) => { + const parsed = loadModelOptionsToRequestSchema.parse({ + modelType: ModelType.llamacppCompletion, + modelSrc: "/tmp/model.gguf", + }); + t.is( + (parsed as { requestId?: string }).requestId, + undefined, + "missing requestId stays undefined on the envelope — server falls back to server-generated id", + ); +}); + +test("downloadAssetOptionsToRequestSchema: forwards requestId onto the wire envelope", (t: T) => { + const parsed = downloadAssetOptionsToRequestSchema.parse({ + assetSrc: "/tmp/asset.bin", + requestId: "client-uuid-dl", + }); + t.is( + (parsed as { requestId?: string }).requestId, + "client-uuid-dl", + "downloadAsset envelope must carry the client-generated requestId", + ); +}); + +test("downloadAssetOptionsToRequestSchema: requestId is optional", (t: T) => { + const parsed = downloadAssetOptionsToRequestSchema.parse({ + assetSrc: "/tmp/asset.bin", + }); + t.is((parsed as { requestId?: string }).requestId, undefined); +}); + +test("ragRequestSchema: forwards requestId for ingest", (t: T) => { + const parsed = ragRequestSchema.parse({ + type: "rag", + operation: "ingest", + workspace: "ws-a", + modelId: "model-a", + documents: "hello", + requestId: "client-uuid-rag", + }); + t.is( + (parsed as { requestId?: string }).requestId, + "client-uuid-rag", + "rag ingest envelope must carry the client-generated requestId", + ); +}); + +test("ragRequestSchema: requestId is optional for ingest", (t: T) => { + const parsed = ragRequestSchema.parse({ + type: "rag", + operation: "ingest", + workspace: "ws-a", + modelId: "model-a", + documents: "hello", + }); + t.is((parsed as { requestId?: string }).requestId, undefined); +}); + +test("ragRequestSchema: forwards requestId for reindex (storage-only op)", (t: T) => { + const parsed = ragRequestSchema.parse({ + type: "rag", + operation: "reindex", + workspace: "ws-a", + requestId: "client-uuid-reindex", + }); + t.is((parsed as { requestId?: string }).requestId, "client-uuid-reindex"); +}); diff --git a/packages/sdk/test/unit/utils/decorate-promise.test.ts b/packages/sdk/test/unit/utils/decorate-promise.test.ts new file mode 100644 index 0000000000..b195a18ff8 --- /dev/null +++ b/packages/sdk/test/unit/utils/decorate-promise.test.ts @@ -0,0 +1,126 @@ +// @ts-expect-error brittle has no type declarations +import test from "brittle"; +import { decoratePromise } from "@/utils/decorate-promise"; + +// ----------------------------------------------------------------------------- +// decoratePromise unit tests. +// +// Locks in the contract the long-running client-API helpers depend on: +// - awaiting a decorated promise still unwraps to T (backward-compat +// pin — if this goes red the `loadModel(...)`/`downloadAsset(...)` +// consumers will silently start getting the wrapper object instead +// of the model id / asset id). +// - the metadata is reachable synchronously before the inner promise +// settles — that's the "grab `op.requestId` then cancel before the +// network call resolves" use case. +// - rejection still propagates through `await` so callers see the +// original error (no swallowing). +// - the helper returns the same object identity — `decorate` mutates +// in place via `Object.assign`. We rely on this so the inner +// promise's `.then` chain keeps working without re-wrapping. +// ----------------------------------------------------------------------------- + +type T = { + is: (actual: unknown, expected: unknown, msg?: string) => void; + alike: (actual: unknown, expected: unknown, msg?: string) => void; + ok: (value: unknown, msg?: string) => void; + exception: ( + fn: () => Promise | unknown, + matcher?: unknown, + msg?: string, + ) => Promise; +}; + +test("decoratePromise: await still unwraps to T (backward-compat)", async (t: T) => { + const inner = Promise.resolve("model-id-123"); + const op = decoratePromise(inner, { requestId: "abc" }); + + const result = await op; + t.is( + result, + "model-id-123", + "await on decorated promise must return the inner T, not the wrapper", + ); + t.is(typeof result, "string", "result type is preserved"); +}); + +test("decoratePromise: metadata reachable synchronously before settle", async (t: T) => { + // Promise that won't settle for a tick — we want to read `requestId` + // synchronously before it does. + let resolveInner!: (value: string) => void; + const inner = new Promise((resolve) => { + resolveInner = resolve; + }); + const op = decoratePromise(inner, { requestId: "sync-id" }); + + t.is( + op.requestId, + "sync-id", + "requestId must be readable synchronously after decorate", + ); + + resolveInner("done"); + const result = await op; + t.is(result, "done"); + t.is(op.requestId, "sync-id", "metadata still present after settle"); +}); + +test("decoratePromise: rejection propagates through await", async (t: T) => { + const inner = Promise.reject(new Error("inner failure")); + const op = decoratePromise(inner, { requestId: "rej-id" }); + + // Read the metadata first to confirm it's there even on a rejecting + // promise; this is the path consumers use in a `try { await op } catch`. + t.is(op.requestId, "rej-id"); + + await t.exception(async () => { + await op; + }, /inner failure/); +}); + +test("decoratePromise: returns the same object identity (in-place assign)", (t: T) => { + const inner = Promise.resolve(42); + const op = decoratePromise(inner, { requestId: "id-1" }); + + t.is( + op, + // @ts-expect-error -- this is the identity check that motivates the + // helper's return-same-instance contract. + inner, + "decoratePromise must mutate in place; the returned promise is the input promise", + ); +}); + +test("decoratePromise: .then / .catch / .finally chain intact", async (t: T) => { + const inner = Promise.resolve("piped"); + const op = decoratePromise(inner, { requestId: "chain-id" }); + + let finallyRan = false; + const piped = await op + .then((v) => `${v}-then`) + .finally(() => { + finallyRan = true; + }); + + t.is(piped, "piped-then", ".then must keep flowing through the decoration"); + t.ok(finallyRan, ".finally must fire"); + t.is( + op.requestId, + "chain-id", + "metadata stays on the original op even after chaining", + ); +}); + +test("decoratePromise: multiple metadata fields are all attached", async (t: T) => { + const inner = Promise.resolve("v"); + const op = decoratePromise(inner, { + requestId: "multi-id", + kind: "loadModel", + seq: 7, + }); + + t.is(op.requestId, "multi-id"); + t.is(op.kind, "loadModel"); + t.is(op.seq, 7); + t.is(await op, "v"); +}); diff --git a/packages/sdk/utils/decorate-promise.ts b/packages/sdk/utils/decorate-promise.ts new file mode 100644 index 0000000000..16a8464181 --- /dev/null +++ b/packages/sdk/utils/decorate-promise.ts @@ -0,0 +1,38 @@ +/** + * Attach metadata to a Promise so the metadata is reachable + * synchronously while the promise's normal async surface keeps working. + * + * The decorated-promise pattern lets a long-running client-API call + * (`loadModel(...)`, `downloadAsset(...)`) return a value that is both: + * - a `Promise` you can `await` for the resolved result; and + * - a record carrying call metadata (`requestId`) you can read + * synchronously to target the in-flight call with + * `cancel({ requestId })` before it resolves. + * + * Implementation notes: + * - `Object.assign` attaches enumerable own properties without + * touching the prototype chain, so `await`, `.then`, `.catch`, + * `.finally`, and async-await unwrapping continue to work exactly + * as for a plain `Promise`. + * - The intersection type `Promise & M` is sound because the + * resolved-value type `T` does not intersect with `M`'s key set in + * practice (callers pick metadata keys that don't collide with + * `Promise` members like `then`/`catch`/`finally`). Picking such a + * key would shadow the Promise method and break the unwrap chain; + * don't. + * - We deliberately avoid extending `Promise.prototype` or calling + * `Object.setPrototypeOf`. Either would silently affect every + * promise in the worker; the plain-object approach is intentional + * and the helper exists to keep call sites consistent. + * + * @example + * const op = decoratePromise(innerPromise, { requestId: "abc" }); + * op.requestId; // "abc" (synchronous) + * await op; // resolves to T (legacy contract preserved) + */ +export function decoratePromise>( + promise: Promise, + metadata: M, +): Promise & M { + return Object.assign(promise, metadata); +}