diff --git a/.cursor/rules/sdk/docs/kv-cache-system.mdc b/.cursor/rules/sdk/docs/kv-cache-system.mdc index 5dc9ea9003..94ce4e9d60 100644 --- a/.cursor/rules/sdk/docs/kv-cache-system.mdc +++ b/.cursor/rules/sdk/docs/kv-cache-system.mdc @@ -88,53 +88,104 @@ When a new cache key is used for the first time: | File | Purpose | |------|---------| -| `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts` | Main completion handler with cache logic (string-key flow, auto-generated flow) | -| `server/bare/ops/kv-cache-utils.ts` | Cache utilities: path generation, config hash, in-memory registry, file ops | +| `server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts` | **`KvCacheSession` — single owner of the three KV-cache bookkeeping layers** (on-disk `.bin`, `initializedCaches`, `cachedMessageCounts`). Exposes `beginTurn` / `commitTurn` / `rollback` / `dropStaleSavedCount` plus the module-level `deleteKvCacheState(...)` administrative API. M2 (QVAC-18182). | +| `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts` | Completion handler. Calls `session.beginTurn(...)`, registers `scope.defer(() => session.rollback(turn))` once, and calls `session.commitTurn(...)` on the happy path (which suppresses the deferred rollback). No direct references to the three layers. | +| `server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts` | Pure `decideCachedHistorySlice(...)` helper used by the session — slice decision for the next addon call. No state. | +| `server/bare/ops/kv-cache-utils.ts` | Path / hash / fs utilities: `getCacheFilePath`, `generateConfigHash`, `findMatchingCache`, `getCurrentCacheInfo`, `renameCacheFile`, `deleteCache`. No in-memory state. | | `server/bare/plugins/llamacpp-completion/ops/cache-logger.ts` | Debug logging for cache operations | +| `server/rpc/handlers/delete-cache.ts` | `handleDeleteCache` RPC entry point. Delegates to `deleteKvCacheState(...)` — zero direct references to the three layers (M2 deliverable 5). | | `server/utils/cache.ts` | `getKVCacheDir()` base directory | | `client/api/delete-cache.ts` | Client-side delete cache API | ## Key Behaviors -### In-Memory Cache Registry +### `KvCacheSession` Ownership (M2) -`kv-cache-utils.ts` maintains a `Set` of initialized caches to avoid redundant filesystem checks. `customCacheExists()` checks the in-memory set first, then falls back to async filesystem check. When a cache file is found on disk (from a previous run), it also marks it as initialized in the in-memory registry. +Before M2 the completion handler coordinated three independent bookkeeping layers around every cancel/error branch: -### Cache Initialization (initSystemPromptCache) +1. An in-memory `Set` of "initialized caches" (`kv-cache-utils.ts`). +2. A `Map` of saved-message counts (`kv-cache-state.ts`). +3. The on-disk `.bin` files written by the addon. -Primes a new cache by sending system prompt + tools through `runModel` with `{ cacheKey: cachePath, saveCacheToDisk: true, prefill: true }`. The prompt is ingested into the KV cache and persisted to disk without producing any output tokens, so the call resolves as soon as priming finishes. +Three near-identical cleanup blocks in `completion-stream.ts` had to touch all three on every cancel / zero-token / rename-failed / tool-call exit. Any one of those blocks forgetting a layer produced the drift bugs the pitch documents (QVAC-17780 family). -### Message Preparation (prepareMessagesForCache) +**M2 collapses this into `KvCacheSession`**, the **single mutation point** for the three layers. The handler's loop is now: -When cache exists and history is non-empty, sends only the new messages (since last cached count). Otherwise sends full history minus the system prompt. The cache file path is passed via `runOptions.cacheKey`, not embedded in the message array. +```typescript +const session = createKvCacheSession(modelId); +const turn = await session.beginTurn({ ... }); // primes cache if missing, returns handle +scope.defer(() => session.rollback(turn)); // ONE cleanup hook for every exit path +// ... run model ... +if (shouldCommit) await session.commitTurn(turn, ...); // suppresses the deferred rollback +``` -### Config Hash Generation (generateConfigHash) +`commitTurn` flips an internal flag on the turn handle; `rollback` reads the flag and short-circuits. The happy path commits and the deferred rollback becomes a no-op. Every other exit path (cancel, zero-token, addon error, rename failure, tool-call turn) lets `scope[Symbol.asyncDispose]` run the deferred rollback, which atomically unlinks the `.bin` file, deletes the `initializedCaches` entry, and forgets the `cachedMessageCounts` entry. -Cache validity is tied to a SHA-256 hash of system prompt content + sorted tool names. Model config is NOT included (per addon team: doesn't affect cache validity). Changing tools mid-session creates a new cache with the new tools anchored. +The maps that backed `clearCacheRegistry` and `cachedMessageCounts` are **private to `kv-cache-session.ts`**. No other module reads or writes them. -### Auto-Generated Cache Flow +### Cache Initialization (primeIfMissing) and Addon Non-Transactional Save -When `kvCache: true`, the cache key is derived from a hash of the conversation history. `findMatchingCache()` checks if a cache exists for `history[0..n-1]`. After completion, the cache file is renamed to reflect the updated history hash. +`beginTurn` injects a `primeIfMissing(cachePath)` closure that the session calls when the cache doesn't exist in-memory and on-disk. The closure (constructed by the completion handler) sends system prompt + (static) tools through `runModel` with `{ cacheKey: cachePath, saveCacheToDisk: true, prefill: true }`. -## Cache Persistence +**Addon contract today (non-transactional):** the llama.cpp addon's save path is `CacheManager::writeCacheFile` → `llama_state_save_file(...)`, and the addon **discards** the bool return value. `maybeSaveCacheToDisk` is the very last call on the prefill path, so any earlier throw means no save was attempted. As a result, the prime closure's outcomes from the SDK's perspective are: -- **After each completion** — saved via `saveCacheToDisk: true` in `runOptions` passed to `model.run()` (the addon persists the cache inline during the same inference call) -- **Session switch** — when a different `cacheKey` is passed, the addon auto-saves the old session before loading the new one -- **Omitting `cacheKey`** — the addon auto-saves the active session and clears it -- **Model unload** — all active caches flushed +| Scenario | Closure result | Disk state | +|---|---|---| +| Eval interrupted (cancel mid-prefill) | resolves cleanly | no file (early return before save) | +| Eval succeeds, save succeeds | resolves cleanly | full file | +| Eval succeeds, save fails (return value swallowed) | **resolves cleanly** | empty or partial file | +| `validatePrompt` / `loadMedia` / similar throws (pre-eval) | rejects | no file | + +Critically: there is **no path** where the closure rejects AND a partial file exists on disk. Any rejected prime is by construction "no save was attempted". + +**SDK-side defenses (matched access-probes):** + +- **`verifyPrimedFile(cachePath)` at prime time** — after `primeIfMissing` resolves, `fsPromises.stat` the canonical path. If missing or zero-size, best-effort unlink the empty leftover and throw so the session does NOT mark the cache initialized. Catches the cancelled-mid-prime and addon-silent-empty-write rows above. +- **`verifySaveAndRecord(cachePath, count)` at commit time** — same `access` probe applied before recording the new saved-message count. A missing file triggers `runRollback(...)` instead of recording a phantom commit. + +**What the SDK probes do NOT catch:** a partial-but-nonzero file. Closing that gap requires either a structural integrity check the SDK can't currently compute, or — the right answer — an addon-layer change so the addon throws on save failure. + +**Pending addon-layer fix:** `CacheManager::writeCacheFile` should check `llama_state_save_file`'s return value and throw `qvac_errors::StatusError(ADDON_ID, "UnableToSaveSessionFile", ...)` on failure (and ideally write atomically inside the addon — temp + rename around the existing save call). When that lands, **both `verifyPrimedFile` and `verifySaveAndRecord` collapse**: the prime closure's rejection becomes the authoritative signal, the rollback hook handles cleanup, and both access-probes can be retired together. Tracked as `[tech-debt] llama.cpp addon: throw on llama_state_save_file failure` in the SDK Asana board (gid `1214778658064488`). + +Injecting the prime via closure keeps `kv-cache-session.ts` free of model-registry / addon dependencies — the session is unit-testable without a real model. + +### Saved-Count Slicing & Stale-Boundary Recovery (decideCachedHistorySlice) -### Deleting Caches +`TurnHandle.savedCount` carries the on-disk message-count snapshot at `beginTurn` time. `prepareMessagesForCache` calls the pure `decideCachedHistorySlice(savedCount, cacheExists, history)` to pick what the addon receives next: + +- Cache miss or empty history → send the whole history minus the system message. +- Cache hit with a valid `savedCount` → send only the unsaved tail (`history.slice(savedCount)`). +- Cache hit but the slice would be empty (stale boundary — QVAC-17780) → fall back to the full non-system history and tell the session via `session.dropStaleSavedCount(turn)` so the bad boundary doesn't propagate. The on-disk file is left intact (it's still usable); only the boundary count is wrong. + +### Config Hash Generation (generateConfigHash) + +Cache validity is tied to a SHA-256 hash of system prompt content + sorted tool names. Model config is NOT included (per addon team: doesn't affect cache validity). Changing tools mid-session creates a new cache with the new tools anchored. Dynamic-mode tools intentionally do NOT participate in the hash so the cache can survive per-turn tool sets. + +### Auto-Cache Rename Flow + +When `kvCache: true`, `beginTurn` resolves the pre-response cache path (key derived from `history.slice(0, -1)`) via `findMatchingCache(...)` (cache hit) or `getCurrentCacheInfo(...)` (cache miss). On the happy path, the handler computes the post-response key from `result.responseText`, then calls `session.commitTurn(turn, { kind: "autoRename", targetCachePath, messageCount })` which renames the file and records the new saved count at the destination. A rename failure (or any other commit failure) falls through to the deferred rollback — single cleanup path, no drift. + +### Atomic Cache Deletion (`deleteKvCacheState`) ```typescript import { deleteCache } from "@qvac/sdk"; -await deleteCache({ all: true }); // Delete all -await deleteCache({ kvCacheKey: "session-a" }); // Delete specific key -await deleteCache({ kvCacheKey: "session-a", modelId: "..." }); // Delete specific model's cache +await deleteCache({ all: true }); // wipe and recreate the kv-cache root +await deleteCache({ kvCacheKey: "session-a" }); // remove session-a across every model +await deleteCache({ kvCacheKey: "session-a", modelId: "..." }); // remove only one model's session-a cache ``` +`handleDeleteCache` delegates to `deleteKvCacheState(...)` — a module-level export from `kv-cache-session.ts` that owns the same three layers. It removes the matching on-disk directory tree, prefix-cleans `cachedMessageCounts` against the removed path, and scope-clears `initializedCaches` by `(kvCacheKey[, modelId])`. The RPC handler itself has **zero direct references** to `fsPromises.unlink`, the in-memory `initializedCaches` set, or the `cachedMessageCounts` map. + See `schemas/delete-cache.ts` for the request schema. +## Cache Persistence + +- **After each completion** — saved via `saveCacheToDisk: true` in `runOptions` passed to `model.run()` (the addon persists the cache inline during the same inference call) +- **Session switch** — when a different `cacheKey` is passed, the addon auto-saves the old session before loading the new one +- **Omitting `cacheKey`** — the addon auto-saves the active session and clears it +- **Model unload** — all active caches flushed + ## Context Overflow with KV Cache When using KV Cache with sliding window (`n_discarded`), the context doesn't overflow because: diff --git a/.cursor/rules/sdk/docs/request-lifecycle-system.mdc b/.cursor/rules/sdk/docs/request-lifecycle-system.mdc index b1c86ed46a..467c21aaa3 100644 --- a/.cursor/rules/sdk/docs/request-lifecycle-system.mdc +++ b/.cursor/rules/sdk/docs/request-lifecycle-system.mdc @@ -35,9 +35,9 @@ The pre-0.11.0 wire contract was `cancel({ operation: "inference", modelId })` 2. A `Map` of saved-message counts (`kv-cache-state.ts`). 3. The on-disk `.bin` files written by the addon. -Each cancel/error branch had to: `unlink` the file, `clearCacheRegistry({ cacheKey, modelId })`, `cachedMessageCounts.delete(...)`. The three branches duplicated this trio; any one of them could drift from the others on a partial failure. Search for `clearCacheRegistry` in `completion-stream.ts` to see the duplication. +Each cancel/error branch had to: `unlink` the file, `clearCacheRegistry({ cacheKey, modelId })`, `cachedMessageCounts.delete(...)`. The three branches duplicated this trio; any one of them could drift from the others on a partial failure. -`DisposableScope` collapses this into one place (commit-on-success, rollback through `Symbol.asyncDispose` on anything else). **M2** introduces `KvCacheSession` as the single owner of the three layers and replaces the duplicated cleanup blocks. +**M2 (shipped)** introduces `KvCacheSession` (`server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts`) as the single owner of all three layers. The completion handler now calls `session.beginTurn(...)`, registers `scope.defer(() => session.rollback(turn))` once, and calls `session.commitTurn(...)` on the happy path (which suppresses the deferred rollback). The three duplicated cleanup blocks are gone; the maps that backed `clearCacheRegistry` and `cachedMessageCounts` are now private to the session module. Cross-model administrative cleanups go through the module-level `deleteKvCacheState(...)`, which `handleDeleteCache` delegates to. ### 3. Cancellation tracked via side counters @@ -69,7 +69,8 @@ Concretely, Vercel's AI SDK (`streamText`) is a public-codebase example of the s | Manual cleanup in `if (signal.aborted) { ... }` | Duplicated rollback on cancel and error paths | `ctx.scope.defer(...)` or `await using` rollback | | Side counter (`cancelCounter++`) for cancel tracking | Truth drifts from `signal.aborted` | Read `signal.aborted` synchronously when needed | | Passing `AbortController` between handlers | Multiple owners, no single revoke | Handlers receive `ctx.signal` from `registry.begin(...)` | -| `throw new Error("cancelled")` | Loses partial state across RPC | Structured error (`InferenceCancelledError` in M2) | +| `throw new Error("cancelled")` | Loses partial state across RPC | `InferenceCancelledError` from `@/utils/errors-server` (re-exported from `@qvac/sdk`); carries `requestId` + `partial: { text, toolCalls, stats }` and is constructed client-side from the `stopReason: "cancelled"` event | +| Reaching into `cachedMessageCounts` / `clearCacheRegistry` / `fsPromises.unlink` together | Drift across the three KV-cache layers | `KvCacheSession` (`session.beginTurn` + `scope.defer(session.rollback)` + `session.commitTurn`) — see `docs/kv-cache-system.mdc` | `request-lifecycle-primitives.mdc` has the worked code examples. @@ -77,14 +78,14 @@ Concretely, Vercel's AI SDK (`streamText`) is a public-codebase example of the s The full milestone breakdown lives in `tasks/release-0.11.0-planning/pitch-2-tasks.md` (workspace-local; not committed to the repo). Headline: -| Milestone | Wire contract | Handlers migrated | -|-----------|-------------------------------------------------------------------------------|------------------------------------------------------------------------------------| -| **M1** | `cancel({ requestId })` lands; old `cancel({ modelId })` still works | llama.cpp completion | -| **M2** | Typed cancel outcomes; `KvCacheSession` replaces three-layer cancel bookkeeping; closes the same-tick cancel-before-begin race | (M2 introduces `InferenceCancelledError`; no new handlers) | -| **M3a** | Migrate embeddings handler | embeddings | -| **M3b** | Migrate audio/text handlers | transcribe, translate | -| **M3c** | Migrate decoder / OCR / TTS | diffusion, ocr, tts | -| **M3d** | Migrate downloadAsset / loadModel | downloadAsset, loadModel | +| Milestone | Wire contract | Handlers migrated | +|-------------|-------------------------------------------------------------------------------|------------------------------------------------------------------------------------| +| **M1** ✅ | `cancel({ requestId })` lands; old `cancel({ modelId })` still works | llama.cpp completion | +| **M2** ✅ | `stopReason: "cancelled"` on `completionDone`; `InferenceCancelledError` (52419) rejects `CompletionRun` promise-aggregates with a `partial` payload while the `events` stream ends normally; `KvCacheSession` consolidates the three KV-cache layers (and `handleDeleteCache` now delegates to `deleteKvCacheState`); Stop-button race closed via a bounded `cancelled-before-begin` map in the registry | (M2 added `InferenceCancelledError`; no new handlers) | +| **M3a** | Migrate embeddings handler | embeddings | +| **M3b** | Migrate audio/text handlers | transcribe, translate | +| **M3c** | Migrate decoder / OCR / TTS | diffusion, ocr, tts | +| **M3d** | Migrate downloadAsset / loadModel | downloadAsset, loadModel | Until a handler is registry-migrated, the broad-cancel path (`cancel({ operation: , modelId })`) falls back to `addon.cancel()` directly — see the fallback in `server/bare/ops/cancel.ts`. The wire contract for non-migrated kinds is therefore unchanged: callers continue to use `cancel({ operation: , modelId })` exactly as before, and behavior is preserved through the migration. @@ -137,30 +138,43 @@ Extend the `CancelTarget` discriminated union in `request-registry.ts`. Today su The addon (`@qvac/llm-llamacpp` and friends) exposes a synchronous `cancel(jobId?)` that signals the C++ side to stop decoding ASAP. The SDK wires `signal.addEventListener("abort", onAbort, { once: true })` so a registry cancel fires `addon.cancel()` at the binding leaf — but the SDK's source of truth for "was this cancelled" is still `signal.aborted`, not addon state. This keeps the SDK decoupled from per-addon cancel semantics; an addon without a `cancel()` method still gets best-effort cancellation through the SDK's signal-propagated termination of the streaming loop. -### Known gap: same-tick "cancel-before-begin" race +### Closed in M2: same-tick "cancel-before-begin" race (Stop-button race) -If a client cancels via `cancel({ requestId })` before the server's `registry.begin({ requestId })` has run for that id, the cancel finds zero matches and returns. The later `begin(...)` opens a fresh context that the cancel didn't reach. M1 documents this with a tripwire unit test (`registry: same-tick cancel-before-begin returns 0 and does not retroactively abort the later begin()`); M2 closes the race with a bounded "cancelled-before-begin" set the registry consults from inside `begin(...)`. See `pitch-2-tasks.md` (M2 scope). +**Pre-M2 (M1 contract, removed):** if a client cancelled via `cancel({ requestId })` before the server's `registry.begin({ requestId })` had run for that id, the cancel found zero matches, returned, and the later `begin(...)` opened a fresh context the cancel didn't reach. A user's "Stop" button click could silently no-op when it raced the round-trip of the original `completion(...)` request. + +**M2 contract (Option A, chosen):** the registry tracks `requestId`s whose `cancel(...)` arrived before a matching `begin(...)`. The next `begin(...)` for one of those ids aborts the new controller before returning, with the recorded reason forwarded to `signal.reason`. The context is observable in state `"cancelling"` from the outset — no momentary `"running"` snapshot with an already-aborted signal. + +Bound: at most `CANCEL_BEFORE_BEGIN_MAX_ENTRIES = 128` entries, evicted FIFO when full, with a `CANCEL_BEFORE_BEGIN_TTL_MS = 30_000` time-to-live so a stale id never decides a later, unrelated `begin(...)` even if the size cap isn't reached. Both constants are exported via `__requestRegistryTestHooks` for test pinning. A malicious client cannot grow the map unbounded. + +Wire surface: **unchanged.** `cancel({ requestId })` still returns 0 when there's no in-flight match (the truth on the wire — no request was *matched and aborted by this call*); the effective cancel lands when the begin arrives. The wire contract for `CancelResponse` did not gain a `matched: boolean` field — Option B was considered and rejected (see `pitch-3-decisions.md`, D6). + +Test coverage: `same-tick cancel-before-begin retroactively aborts the later begin() (M2 Stop-button race close)` and `bounded cancel-before-begin set does not grow past its cap (TTL + size eviction)` in `test/unit/runtime/request-registry.test.ts`. ## Implementation Files -| File | Purpose | -|-------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------| -| `server/bare/runtime/disposable-scope.ts` | `DisposableScope` factory + module-load `Symbol.asyncDispose` guard | -| `server/bare/runtime/request-context.ts` | `RequestContext`, `RequestKind`, `RequestState` types | -| `server/bare/runtime/request-registry.ts` | `createRequestRegistry()`, `ManagedRequestContext`, `RequestOutcome`, cancel/begin/end/list logic | -| `server/bare/runtime/request-registry-singleton.ts` | `getRequestRegistry()` worker-scoped accessor | -| `server/bare/runtime/request-id.ts` | UUID generation helper for caller-provided ids | -| `server/bare/runtime/index.ts` | Public re-exports — handlers import from `@/server/bare/runtime` | -| `server/bare/ops/cancel.ts` | Broad-cancel op: registry-routed with addon fallback for non-migrated handlers | -| `server/rpc/handlers/cancelHandler.ts` | RPC entry point: dispatches by `operation` (inference / embeddings / request / downloadAsset / rag) | -| `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts`| First handler migrated — reference implementation of the canonical shape | -| `test/unit/runtime/disposable-scope.test.ts` | Scope contract: LIFO, idempotency, error aggregation, late-defer | -| `test/unit/runtime/request-registry.test.ts` | Registry contract: begin/cancel/end, parent-signal composition, listener detach, cancel-before-begin | -| `schemas/cancel.ts` | `CancelRequest` discriminated union + `cancelByRequestIdSugarSchema` for `sdk.cancel({ requestId })` | -| `utils/errors-server.ts` | `RequestIdConflictError` (52417), `AsyncDisposeUnavailableError` (53503) | +| File | Purpose | +|----------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------| +| `server/bare/runtime/disposable-scope.ts` | `DisposableScope` factory + module-load `Symbol.asyncDispose` guard | +| `server/bare/runtime/request-context.ts` | `RequestContext`, `RequestKind`, `RequestState` types | +| `server/bare/runtime/request-registry.ts` | `createRequestRegistry()`, `ManagedRequestContext`, `RequestOutcome`, cancel/begin/end/list logic, bounded `cancelled-before-begin` race-close map (M2) | +| `server/bare/runtime/request-registry-singleton.ts` | `getRequestRegistry()` worker-scoped accessor | +| `server/bare/runtime/request-id.ts` | UUID generation helper for caller-provided ids | +| `server/bare/runtime/index.ts` | Public re-exports — handlers import from `@/server/bare/runtime` | +| `server/bare/ops/cancel.ts` | Broad-cancel op: registry-routed with addon fallback for non-migrated handlers | +| `server/rpc/handlers/cancelHandler.ts` | RPC entry point: dispatches by `operation` (inference / embeddings / request / downloadAsset / rag) | +| `server/rpc/handlers/delete-cache.ts` | Delegates to `deleteKvCacheState(...)` — zero direct references to the three KV-cache layers (M2) | +| `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts` | First handler migrated — reference implementation of the canonical shape; uses `KvCacheSession` (M2) | +| `server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts` | `KvCacheSession` factory + `deleteKvCacheState`: single owner of the three KV-cache layers (M2) | +| `client/api/completion-stream.ts` | Client-side construction of `InferenceCancelledError` on `stopReason: "cancelled"` (M2) | +| `schemas/completion-event.ts` | `stopReason` enum extended with `"cancelled"` (M2) | +| `test/unit/runtime/disposable-scope.test.ts` | Scope contract: LIFO, idempotency, error aggregation, late-defer | +| `test/unit/runtime/request-registry.test.ts` | Registry contract: begin/cancel/end, parent-signal composition, listener detach, M2 Stop-button race close, bounded-set invariants | +| `test/unit/runtime/kv-cache-session.test.ts` | `KvCacheSession` contract: begin/commit/rollback semantics, double-rollback idempotency, delete-by-scope | +| `schemas/cancel.ts` | `CancelRequest` discriminated union + `cancelByRequestIdSugarSchema` for `sdk.cancel({ requestId })` | +| `utils/errors-server.ts` | `RequestIdConflictError` (52417), `InferenceCancelledError` (52419), `AsyncDisposeUnavailableError` (53503) | ## See Also - [`request-lifecycle-primitives.mdc`](../request-lifecycle-primitives.mdc) — canonical handler shape + anti-patterns (this rule fires automatically when editing server-side handlers). - [`error-handling.mdc`](../error-handling.mdc) — structured-error placement and the client/server error separation. -- [`docs/kv-cache-system.mdc`](./kv-cache-system.mdc) — the bookkeeping layers that `KvCacheSession` (M2) will consolidate. +- [`docs/kv-cache-system.mdc`](./kv-cache-system.mdc) — `KvCacheSession` ownership of the three KV-cache bookkeeping layers and its `beginTurn` / `commitTurn` / `rollback` / `deleteKvCacheState` API. diff --git a/.cursor/rules/sdk/error-handling.mdc b/.cursor/rules/sdk/error-handling.mdc index 8083560cf9..3d0bc96e83 100644 --- a/.cursor/rules/sdk/error-handling.mdc +++ b/.cursor/rules/sdk/error-handling.mdc @@ -177,6 +177,9 @@ Located in `@/utils/errors-server` - `AttachmentNotFoundError` - Attachment not found - `CancelFailedError` - Cancel failed - `TextToSpeechFailedError` - TTS failed +- `RequestIdConflictError` (52417) - `registry.begin(...)` called with a `requestId` already present +- `RequestNotFoundError` (52418) - registry lookup miss (no in-flight request for the given id) +- `InferenceCancelledError` (52419) - cancelled inference run; carries `requestId` + `partial: { text?, toolCalls?, stats? }`. Constructed client-side on `stopReason: "cancelled"` (event stream ends normally; promise-aggregates reject with this). Re-exported from `@qvac/sdk` for `instanceof` checks. #### RAG Operations (52,800-52,999) - `RAGSaveFailedError` - Save failed diff --git a/.cursor/rules/sdk/request-lifecycle-primitives.mdc b/.cursor/rules/sdk/request-lifecycle-primitives.mdc index 145eef0d7d..4a0f5a6eef 100644 --- a/.cursor/rules/sdk/request-lifecycle-primitives.mdc +++ b/.cursor/rules/sdk/request-lifecycle-primitives.mdc @@ -16,7 +16,7 @@ Server-side long-running operations (`completion`, `embeddings`, `transcribe`, ` - **`RequestContext`** — per-request handle bundling `requestId`, `kind`, `modelId`, `signal`, `scope`, `state`. - **`RequestRegistry`** — module-scoped registry that mints contexts via `begin(...)` and routes `cancel(...)` by `requestId` or `modelId`. -Migration is rolling out across milestones (M1 ships completion; M2 adds typed cancel outcomes + `KvCacheSession`; M3 migrates embeddings / transcribe / translate / loadModel / downloadAsset). The contract below applies to every newly-migrated handler. +Migration is rolling out across milestones (M1 shipped completion; M2 shipped typed cancel outcomes + `KvCacheSession` + the Stop-button race close; M3 migrates embeddings / transcribe / translate / loadModel / downloadAsset). The contract below applies to every newly-migrated handler. ## Canonical Handler Shape @@ -123,7 +123,18 @@ Controllers are owned by the registry. Handlers receive `ctx.signal` from `regis if (signal.aborted) throw new Error("cancelled"); ``` -Use a structured error from `@/utils/errors-server` — see `error-handling.mdc`. M2 will add a dedicated `InferenceCancelledError` (cancelled promise-aggregates carry partial state across the RPC boundary). Until then, the `events` stream simply ends and existing `CompletionFailedError` carries cancel-as-failure cases. +Use `InferenceCancelledError` from `@/utils/errors-server` (re-exported from `@qvac/sdk` so clients can `instanceof`-check it). The structured error carries `requestId` + a `partial: { text, toolCalls, stats }` payload so the cancelled promise-aggregates (`CompletionRun.final`, `.text`, `.toolCalls`, `.stats`) hand the caller whatever was accumulated before the cancel. The `events` stream still ends *normally* with `stopReason: "cancelled"` — stream-first consumers see cancel as a typed event, aggregate-first consumers see it as a typed rejection. See `error-handling.mdc`. + +### Reaching into three-layer KV-cache state by hand + +```typescript +// WRONG — drift-prone three-layer cleanup duplicated on every error branch +try { await fsPromises.unlink(cachePathToUse); } catch (...) {} +clearCacheRegistry({ cacheKey, modelId }); +cachedMessageCounts.delete(cachePathToUse); +``` + +The on-disk `.bin` file, the `initializedCaches` set, and the `cachedMessageCounts` map are private to `KvCacheSession` (`server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts`). Use `session.beginTurn(...)` + `scope.defer(() => session.rollback(turn))` + `session.commitTurn(...)` instead — one cleanup point that touches all three layers atomically. Cross-model administrative cleanups go through the module-level `deleteKvCacheState(...)`, which `handleDeleteCache` delegates to. See `docs/kv-cache-system.mdc`. ## Cancel API Surface @@ -213,21 +224,23 @@ Cleanups run in LIFO order on dispose. If multiple cleanups throw, an `Aggregate ### Error Codes -Two errors are owned by this stack today (both in `@/utils/errors-server`): +Three errors are owned by this stack today (all in `@/utils/errors-server`): | Code | Class | When | |-------|--------------------------------|----------------------------------------------------------------------------| | 52417 | `RequestIdConflictError` | `registry.begin(...)` called with a `requestId` already present. | +| 52419 | `InferenceCancelledError` | A cancelled inference run's `CompletionRun` promise-aggregates (`final` / `text` / `toolCalls` / `stats`) reject with this carrying the partial state. The `events` stream itself ends normally with `stopReason: "cancelled"`. | | 53503 | `AsyncDisposeUnavailableError` | Module-load guard: host runtime doesn't expose `Symbol.asyncDispose`. | -M2 adds `InferenceCancelledError` for the typed cancel-outcome contract — promise-aggregates (`final`, `text`, `toolCalls`, `stats`) reject with it carrying the partial state, while the `events` stream ends normally with `stopReason: "cancelled"`. Until M2 ships, handlers fall back on `CompletionFailedError` / existing per-op errors. +`InferenceCancelledError` carries `requestId` plus a `partial: { text?, toolCalls?, stats? }` payload. Construction is **client-side** (`packages/sdk/client/api/completion-stream.ts`): when the event stream ends with `stopReason: "cancelled"`, the client builds the error from its locally-aggregated partial state and rejects the aggregates. The class is server-defined for reusability and re-exported from `@qvac/sdk` so client code can `instanceof`-check it. See `error-handling.mdc`. ## Verification -Two unit-test files pin the contract — read them as canonical examples: +Three unit-test files pin the contract — read them as canonical examples: - `packages/sdk/test/unit/runtime/disposable-scope.test.ts` — LIFO cleanup, idempotency, error aggregation, late `defer` behavior. -- `packages/sdk/test/unit/runtime/request-registry.test.ts` — `begin`/`cancel`/`end` flow, `requestId` conflict detection, parent-signal composition + listener detach discipline, and the same-tick "cancel-before-begin" tripwire. +- `packages/sdk/test/unit/runtime/request-registry.test.ts` — `begin`/`cancel`/`end` flow, `requestId` conflict detection, parent-signal composition + listener detach discipline, the M2 Stop-button race close (`same-tick cancel-before-begin retroactively aborts the later begin()`), and the bounded `cancelled-before-begin` map invariants. +- `packages/sdk/test/unit/runtime/kv-cache-session.test.ts` — `beginTurn` / `commitTurn` / `rollback` semantics in isolation: commit suppresses the deferred rollback, rollback after commit is a no-op, double-rollback is idempotent, and rollback wipes all three layers (file / `initializedCaches` / `cachedMessageCounts`) regardless of who fails along the way. When adding new behavior to the primitives, add the test before the implementation and pair it with the corresponding doc update in this rule. @@ -265,5 +278,5 @@ Already covered by the `CancelTarget` discriminated union. If you genuinely need ## Related Rules - `error-handling.mdc` — `InferenceCancelledError` / `AsyncDisposeUnavailableError` / `RequestIdConflictError` placement and propagation across the RPC boundary. -- `docs/kv-cache-system.mdc` — KV-cache bookkeeping coupled to cancellation (`shouldRecordSavedCount`, cancel-branch rollback). `KvCacheSession` is M2 — until then handlers replicate the three-layer cleanup pattern around `signal.aborted`. -- `docs/request-lifecycle-system.mdc` — full reference (design rationale, migration roadmap, FAQ). +- `docs/kv-cache-system.mdc` — `KvCacheSession` ownership of the three KV-cache bookkeeping layers (on-disk `.bin`, `initializedCaches`, `cachedMessageCounts`), `beginTurn` / `commitTurn` / `rollback` semantics, and the cross-model `deleteKvCacheState` administrative API. +- `docs/request-lifecycle-system.mdc` — full reference (design rationale, migration roadmap, FAQ, M2 Stop-button race close rationale). diff --git a/packages/sdk/client/api/completion-stream.ts b/packages/sdk/client/api/completion-stream.ts index 0fe0ef0c1a..da2d6895d7 100644 --- a/packages/sdk/client/api/completion-stream.ts +++ b/packages/sdk/client/api/completion-stream.ts @@ -14,7 +14,10 @@ import { type ToolCallWithCall, type RPCOptions, } from "@/schemas"; -import { CompletionFailedError } from "@/utils/errors-server"; +import { + CompletionFailedError, + InferenceCancelledError, +} from "@/utils/errors-server"; import { getMcpToolsWithHandlers } from "@/utils/mcp-adapter"; import { validateTools, @@ -265,7 +268,7 @@ export function completion(params: CompletionParams): CompletionRun { notifyWaiters(); if (streamResponse.done) { - const { final, error } = buildFinalFromEvents( + const { final, error, cancelled } = buildFinalFromEvents( allEvents, allHandlers, ); @@ -274,6 +277,25 @@ export function completion(params: CompletionParams): CompletionRun { finalRejecter(err); statsRejecter(err); toolCallsRejecter(err); + } else if (cancelled) { + // The wire stream ended with `stopReason: "cancelled"` — the + // run was aborted mid-flight. Cancellation contract: `events` + // ends normally (consumers iterating `run.events` see the + // cancelled `completionDone` and exit naturally), and the + // promise-aggregates reject with `InferenceCancelledError` + // carrying whatever the aggregator accumulated up to the + // cancel point. Consumers do `instanceof + // InferenceCancelledError` and read `.partial.text` / + // `.partial.toolCalls` / `.partial.stats` if they want the + // partial output. + const err = new InferenceCancelledError(requestId, { + text: final.contentText, + toolCalls: final.toolCalls, + ...(final.stats && { stats: final.stats }), + }); + finalRejecter(err); + statsRejecter(err); + toolCallsRejecter(err); } else { finalResolver(final); statsResolver(final.stats); diff --git a/packages/sdk/examples/cancel-by-request-id.ts b/packages/sdk/examples/cancel-by-request-id.ts index b7c3deb662..29fefe13f3 100644 --- a/packages/sdk/examples/cancel-by-request-id.ts +++ b/packages/sdk/examples/cancel-by-request-id.ts @@ -10,14 +10,29 @@ * * 1. `cancel({ requestId })` — targeted cancel, the primary path * introduced in 0.11.0. The `requestId` is available synchronously - * on the `CompletionRun`, but the cancel only takes effect once the - * server has begun the request; a cancel issued in the same tick - * as `completion()` may arrive at the worker before the request is - * registered and is logged as a no-match. + * on the `CompletionRun`. Same-tick cancels (issued before the + * server has registered the request) are recorded and applied + * retroactively when `begin(...)` arrives, so they aren't silently + * dropped. * 2. `cancel({ operation: "inference", modelId })` — broad cancel * (escape hatch, kept indefinitely). Cancels every inference running * on the model. Useful for unload, app shutdown, admin sweeps when * the caller doesn't have a `requestId` to hand. + * + * --- Cancel outcomes (0.11.0+) --- + * + * A cancel surfaces on two channels: + * + * - `run.events` ends *normally* with a `completionDone` event carrying + * `stopReason: "cancelled"`. The loop exits cleanly, no thrown error. + * - `run.text` / `run.final` / `run.stats` / `run.toolCalls` reject + * with `InferenceCancelledError(requestId, partial)`, where `partial` + * holds whatever the model produced before the cancel landed + * (accumulated `text`, completed `toolCalls`, last-known `stats`). + * + * Pick the channel that matches how you consume the run: event-loop + * consumers don't need to catch anything; promise-aggregate consumers + * pattern-match on `instanceof InferenceCancelledError`. */ import { @@ -25,6 +40,7 @@ import { completion, loadModel, unloadModel, + InferenceCancelledError, QWEN3_600M_INST_Q4, } from "@qvac/sdk"; @@ -55,14 +71,45 @@ try { console.log("(cancel issued)"); }, 250); + // Channel 1: the events stream ends normally on cancel. The + // `completionDone` event's `stopReason` tells you why the loop is + // about to exit ("eos" | "length" | "cancelled" | "error" | ...). let tokenCount = 0; + let endReason: string | undefined; for await (const event of run.events) { if (event.type === "contentDelta") { tokenCount++; process.stdout.write(event.text); + } else if (event.type === "completionDone") { + endReason = event.stopReason; + } + } + console.log( + `\n\nstreamed ${tokenCount} content deltas, stopReason=${endReason}.`, + ); + + // Channel 2: promise-aggregates reject with InferenceCancelledError + // on cancel. The accumulated state up to the cancel point is preserved + // on `err.partial`. + try { + const text = await run.text; + console.log(`completed normally (${text.length} chars).`); + } catch (err) { + if (err instanceof InferenceCancelledError) { + console.log(`run.text rejected: cancelled (requestId=${err.requestId})`); + console.log(`partial text length: ${(err.partial.text ?? "").length}`); + if (err.partial.stats?.tokensPerSecond !== undefined) { + console.log( + `partial stats: ${err.partial.stats.tokensPerSecond.toFixed(1)} tok/s`, + ); + } + if (err.partial.toolCalls && err.partial.toolCalls.length > 0) { + console.log(`partial tool calls: ${err.partial.toolCalls.length}`); + } + } else { + throw err; } } - console.log(`\n\nstreamed ${tokenCount} content deltas before cancel.`); await unloadModel({ modelId }); process.exit(0); diff --git a/packages/sdk/index.ts b/packages/sdk/index.ts index 6818c1c8a8..9add15039e 100644 --- a/packages/sdk/index.ts +++ b/packages/sdk/index.ts @@ -149,6 +149,14 @@ export * from "./models/registry"; export { SUPPORTED_AUDIO_FORMATS } from "./constants/audio"; +// Error classes that clients need for `instanceof` checks on rejected +// promises. `InferenceCancelledError` rides the standard `QvacError` +// envelope, but consumers reach for it through `instanceof` on +// `await run.final` / `run.text` / `run.toolCalls` / `run.stats` +// rejections. +export { InferenceCancelledError } from "./utils/errors-server"; +export type { InferenceCancelledPartial } from "./utils/errors-server"; + // Logging exports export { getLogger, SDK_LOG_ID } from "./logging"; export type { Logger, LogTransport, LoggerOptions } from "./logging"; diff --git a/packages/sdk/schemas/completion-event.ts b/packages/sdk/schemas/completion-event.ts index 0e34d84387..2bd82c52f7 100644 --- a/packages/sdk/schemas/completion-event.ts +++ b/packages/sdk/schemas/completion-event.ts @@ -63,7 +63,17 @@ const rawOutputSchema = z.object({ fullText: z.string(), }); -const stopReasonEnum = z.enum(["eos", "length", "stopSequence"]); +// `"cancelled"` is a clean termination, not a mid-stream failure: the +// stream ended on purpose because the request was aborted, just not at +// EOS. It rides the success-done path (alongside "eos" / "length" / +// "stopSequence") so consumers iterating `events` see the stream end +// naturally. The companion error path (`errorDoneSchema` below) keeps +// `stopReason: "error"` for actual mid-stream failures where the +// partial state is unsafe to use. The promise-aggregates on the +// client-side `CompletionRun` (`final` / `text` / `toolCalls` / `stats`) +// reject with `InferenceCancelledError` carrying the partial state — see +// `client/api/completion-stream.ts` for the rejection plumbing. +const stopReasonEnum = z.enum(["eos", "length", "stopSequence", "cancelled"]); const successDoneSchema = z .object({ diff --git a/packages/sdk/schemas/sdk-errors-server.ts b/packages/sdk/schemas/sdk-errors-server.ts index bc422b51d0..6549b3f32c 100644 --- a/packages/sdk/schemas/sdk-errors-server.ts +++ b/packages/sdk/schemas/sdk-errors-server.ts @@ -40,6 +40,7 @@ export const SDK_SERVER_ERROR_CODES = { MODEL_OPERATION_NOT_SUPPORTED: 52416, REQUEST_ID_CONFLICT: 52417, REQUEST_NOT_FOUND: 52418, + INFERENCE_CANCELLED: 52419, // RAG Operations (52,800-52,999) RAG_SAVE_FAILED: 52800, @@ -303,6 +304,11 @@ const serverErrorDefinitions: ErrorCodesMap = { message: (requestId: string) => `No in-flight request with id "${requestId}"`, }, + [SDK_SERVER_ERROR_CODES.INFERENCE_CANCELLED]: { + name: "INFERENCE_CANCELLED", + message: (requestId: string) => + `Inference request "${requestId}" was cancelled before it could complete`, + }, // RAG Operations (52,800-52,999) [SDK_SERVER_ERROR_CODES.RAG_SAVE_FAILED]: { diff --git a/packages/sdk/server/bare/ops/kv-cache-utils.ts b/packages/sdk/server/bare/ops/kv-cache-utils.ts index b0ba2d4312..46d0feb73f 100644 --- a/packages/sdk/server/bare/ops/kv-cache-utils.ts +++ b/packages/sdk/server/bare/ops/kv-cache-utils.ts @@ -11,55 +11,10 @@ import { getServerLogger } from "@/logging"; const logger = getServerLogger(); -// In-memory registry tracks caches initialized this session (addon defers disk writes) -const initializedCaches = new Set(); - -function getCacheRegistryKey( - modelId: string, - configHash: string, - cacheKey: string, -): string { - return `${modelId}:${configHash}:${cacheKey}`; -} - -export function markCacheInitialized( - modelId: string, - configHash: string, - cacheKey: string, -): void { - initializedCaches.add(getCacheRegistryKey(modelId, configHash, cacheKey)); -} - -export function isCacheInitialized( - modelId: string, - configHash: string, - cacheKey: string, -): boolean { - return initializedCaches.has( - getCacheRegistryKey(modelId, configHash, cacheKey), - ); -} - -export function clearCacheRegistry(scope?: { - cacheKey?: string | undefined; - modelId?: string | undefined; -}): void { - if (!scope || (scope.cacheKey === undefined && scope.modelId === undefined)) { - initializedCaches.clear(); - return; - } - // key format: "modelId:configHash:cacheKey" - for (const key of initializedCaches) { - const firstSep = key.indexOf(":"); - const secondSep = key.indexOf(":", firstSep + 1); - if (firstSep === -1 || secondSep === -1) continue; - const modelId = key.slice(0, firstSep); - const cacheKey = key.slice(secondSep + 1); - if (scope.cacheKey !== undefined && cacheKey !== scope.cacheKey) continue; - if (scope.modelId !== undefined && modelId !== scope.modelId) continue; - initializedCaches.delete(key); - } -} +// In-memory KV-cache state lives in `KvCacheSession` (the single +// mutation point for all three KV-cache bookkeeping layers). This +// module keeps only the pure path / hash utilities that don't touch +// in-memory state. export function extractSystemPrompt(messages: CacheMessage[]): string | null { const systemMessage = messages.find((msg) => msg.role === "system"); @@ -169,26 +124,10 @@ export async function renameCacheFile( } } -export async function customCacheExists( - modelId: string, - configHash: string, - cacheKey: string, -): Promise { - // Check in-memory registry first (addon defers disk writes) - if (isCacheInitialized(modelId, configHash, cacheKey)) { - return true; - } - - // Then check file system (for caches from previous runs) - const cachePath = await getCacheFilePath(modelId, configHash, cacheKey); - try { - await fsPromises.access(cachePath); - markCacheInitialized(modelId, configHash, cacheKey); - return true; - } catch { - return false; - } -} +// Cache-existence probing (in-memory registry first, fall back to +// `fs.access`) lives in `KvCacheSession.beginTurn(...)`. Keeping the +// in-memory `initializedCaches` set private to the session module +// avoids drift between the two layers. export async function deleteCache( options: { all: true } | { kvCacheKey: string; modelId?: string }, diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts index a1a142ccda..d50c251a76 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/completion-stream.ts @@ -14,31 +14,25 @@ import { logCacheDisabled, logCacheInit, logCacheSave, - logCacheSaveError, - logCacheStatus, logMessagesToAddon, } from "@/server/bare/plugins/llamacpp-completion/ops/cache-logger"; import { - clearCacheRegistry, - customCacheExists, extractSystemPrompt, - findMatchingCache, - generateConfigHash, - getCacheFilePath, getCurrentCacheInfo, - markCacheInitialized, - renameCacheFile, } from "@/server/bare/ops/kv-cache-utils"; import { getModel, getModelConfig, type AnyModel, } from "@/server/bare/registry/model-registry"; +import { decideCachedHistorySlice } from "@/server/bare/plugins/llamacpp-completion/ops/kv-cache-state"; import { - cachedMessageCounts, - clearCachedMessageCounts as clearCachedMessageCountsFromState, - decideCachedHistorySlice, -} from "@/server/bare/plugins/llamacpp-completion/ops/kv-cache-state"; + createKvCacheSession, + generateConfigHash, + type KvCacheSession, + type TurnHandle, +} from "@/server/bare/plugins/llamacpp-completion/ops/kv-cache-session"; +import type { DisposableScope } from "@/server/bare/runtime/disposable-scope"; import { appendToolsToHistory, detectToolDialect, @@ -55,8 +49,7 @@ import { hasDefinedValues, } from "@/profiling/model-execution"; import type { LlmStats } from "@/server/bare/types/addon-responses"; -import fs, { promises as fsPromises } from "bare-fs"; -import path from "bare-path"; +import fs from "bare-fs"; const logger = getServerLogger(); @@ -108,14 +101,6 @@ type CompletionRunOptions = Pick< generationParams?: CompletionGenerationParams; }; -// Re-export so existing callers keep their import surface intact. The pure -// state module has no `bare-*` imports, so we inject the platform path -// separator here — without this, prefix-based clears would miss entries -// under directory keys on Windows. -export function clearCachedMessageCounts(prefix?: string): void { - clearCachedMessageCountsFromState(prefix, path.sep); -} - /** * Decide whether a completed turn earned the right to record its kv-cache * boundary. A `savedCount` is only safe to write when the turn ran to @@ -126,40 +111,16 @@ export function clearCachedMessageCounts(prefix?: string): void { * * Replaces the pre-0.11.0 `shouldRecordSavedCount(wasCancelled, ...)` with * a signal-driven check that reads directly from the request's - * `AbortSignal`. The local helper keeps both call sites in + * `AbortSignal`. The local helper keeps the call sites in * `completion-stream.ts` honest without importing the registry every time. */ -function shouldRecordSavedCount( +function shouldCommitTurn( signal: AbortSignal, producedTokens: boolean, ): boolean { return !signal.aborted && producedTokens; } -// Verify the addon actually persisted the cache file before recording its -// message count. The addon currently swallows write errors silently, so a -// missing file means the next turn must resend the full history rather than -// slicing against a stale `savedCount`. -// -// TODO: once the addon surfaces save failures (e.g. throws -// `UnableToSaveSessionFile` when `llama_state_save_file` returns false), -// drop the `access()` probe and wrap the `model.run()` call in a real -// try/catch that forwards the error to `logCacheSaveError`. -async function recordCacheSaveCount( - cachePath: string, - messageCount: number, -): Promise { - try { - await fsPromises.access(cachePath); - cachedMessageCounts.set(cachePath, messageCount); - return true; - } catch (err) { - cachedMessageCounts.delete(cachePath); - logCacheSaveError(cachePath, err); - return false; - } -} - function transformMessage( message: | { @@ -287,7 +248,8 @@ type HistoryMsg = { * consumer pushing both an assistant transcript and a follow-up user * message between completions) all reaches the model. * - Cache hit with a stale/missing `savedCount`: fall back to the full - * non-system history. + * non-system history. The session is told (`dropStaleSavedCount`) so + * the bad boundary doesn't propagate into the next turn. * * Dynamic mode (`tools` argument set): * - The addon anchors the tool block after the last user message and @@ -304,7 +266,8 @@ type HistoryMsg = { * * otherwise: send just the last message + tool block. */ function prepareMessagesForCache( - cachePathToUse: string, + session: KvCacheSession, + turn: TurnHandle, cacheExists: boolean, history: HistoryMsg[], tools?: Tool[], @@ -318,21 +281,23 @@ function prepareMessagesForCache( } if (!dynamic) { - // Static path — slice from the recorded `savedCount` so callers can + // Static path — slice from the turn's `savedCount` so callers can // stage multiple messages between completions. `decideCachedHistorySlice` // also guards against the QVAC-17780 stale-count regression: if the // saved boundary would slice the history down to an empty payload // (e.g. after a cancelled mid-decode), it falls back to the full // non-system history and signals the caller to drop the bad entry. - const savedCount = cachedMessageCounts.get(cachePathToUse) ?? 0; + // The session owns the entry; `dropStaleSavedCount` clears it + // without touching the on-disk file (the file is still trustworthy + // — only the boundary count is wrong). const { messages, clearStaleCount } = decideCachedHistorySlice( - savedCount, + turn.savedCount, cacheExists, history, ); if (clearStaleCount) { - cachedMessageCounts.delete(cachePathToUse); + session.dropStaleSavedCount(turn); } return transformMessages(messages); @@ -450,11 +415,11 @@ export async function* completion( toolDialect?: ToolDialect; responseFormat?: ResponseFormat; }, - opts: { signal: AbortSignal }, + opts: { signal: AbortSignal; scope: DisposableScope }, ): AsyncGenerator<{ token: string }, CompletionResult, unknown> { const { history, modelId, kvCache, tools, generationParams, responseFormat } = params; - const { signal } = opts; + const { signal, scope } = opts; const modelConfig = getModelConfig(modelId); const toolsEnabled = (modelConfig as { tools?: boolean }).tools === true; @@ -516,254 +481,180 @@ export async function* completion( // the signal is already aborted at register time — but the registry // synchronously aborts a fresh controller when `parentSignal` was // already aborted at `begin(...)`. Without this fall-through, the - // addon would keep decoding until `shouldRecordSavedCount` notices - // post-loop. Re-using `onAbort` here keeps the listener body as the - // single source of truth for "what cancel does." + // addon would keep decoding until the post-loop check notices. + // Re-using `onAbort` here keeps the listener body as the single + // source of truth for "what cancel does." if (signal.aborted) onAbort(); - // Wrap the body so the `abort` listener is detached on every exit path - // (happy completion, thrown error, generator `return()` from upstream). - // `{ once: true }` already removes the listener if the signal fires, so - // the `removeEventListener` here is the cleanup hook for the - // signal-never-fired path — mirrors the registry's own `detachParent` - // discipline and stops relying on GC for cleanup. - try { - if (kvCache) { - const systemPromptFromHistory = extractSystemPrompt(history); - // Dynamic mode lets each turn carry its own tool set, so the cache - // hash must not depend on the tool list — otherwise a tool change - // would force a fresh cache file and defeat the whole optimisation. - const configHash = generateConfigHash( - systemPromptFromHistory, - dynamicTools ? undefined : tools, - ); - - const systemPromptToUse = - systemPromptFromHistory || - (modelConfig as { system_prompt?: string }).system_prompt || - "You are a helpful assistant."; - - let cachePathToUse: string; - - if (typeof kvCache === "string") { - cachePathToUse = await getCacheFilePath(modelId, configHash, kvCache); - let cacheExists = await customCacheExists(modelId, configHash, kvCache); - logCacheStatus(kvCache, cacheExists); - - if (!cacheExists) { - await initSystemPromptCache( - model, - cachePathToUse, - systemPromptToUse, - kvCache, - // Static-mode tools are baked into the system-prompt cache so - // they're shared across the session. Dynamic-mode tools belong - // to a per-turn anchor and must not enter the system cache. - staticTools ? tools : undefined, - ); - markCacheInitialized(modelId, configHash, kvCache); - cacheExists = true; - } - - const messagesToSend = prepareMessagesForCache( - cachePathToUse, - cacheExists, - history, - dynamicTools ? tools : undefined, - ); - logMessagesToAddon(messagesToSend, "PROMPT_SEND"); - - const result = yield* processModelResponse( - model, - messagesToSend, - tools, - mergedGenerationParams, - { cacheKey: cachePathToUse, saveCacheToDisk: true }, - dialect, - ); + // Detach the abort listener on every exit path (happy, throw, generator + // `return()` from upstream). `{ once: true }` already removes the + // listener if the signal fires, so the `removeEventListener` here is + // the cleanup hook for the signal-never-fired path. + scope.defer(() => { + signal.removeEventListener("abort", onAbort); + }); - if (shouldRecordSavedCount(signal, result.producedTokens)) { - // Turn ran to completion and produced content — record the new - // boundary so the next turn can slice its history. - await recordCacheSaveCount(cachePathToUse, history.length + 1); - } else { - // The addon writes the cache file unconditionally on - // `saveCacheToDisk` turns, including cancellations and zero-token - // exits, so what's left on disk holds partial decode state that - // does not correspond to a clean turn boundary. Mirror the - // auto-key handling: drop the file, clear the in-memory init - // flag (otherwise `customCacheExists` would still report true), - // and forget the saved count. Next turn re-primes the system - // prompt cleanly — a one-turn perf hit, but no risk of the - // addon loading the stale KV state. - try { - await fsPromises.unlink(cachePathToUse); - } catch (unlinkError) { - logger.warn( - `[kv-cache] Failed to remove cache file after cancelled or empty custom-key turn; next turn may load stale KV state. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, - ); - } - clearCacheRegistry({ cacheKey: kvCache, modelId }); - cachedMessageCounts.delete(cachePathToUse); - } - return result; - } else { - // Auto-generate cache key based on conversation history - const cacheMessages: CacheMessage[] = history.map((msg) => ({ - role: msg.role, - content: msg.content, - attachments: msg.attachments ?? undefined, - })); - - const existingCache = await findMatchingCache( - modelId, - configHash, - cacheMessages, - ); - const preResponseCacheInfo = await getCurrentCacheInfo( - modelId, - configHash, - cacheMessages, - ); + if (!kvCache) { + // KV-cache disabled — straight passthrough, no session involvement. + let historyWithTools: Array = history; + if (staticTools && tools) { + historyWithTools = prependToolsToHistory(history, tools); + } else if (dynamicTools && tools) { + historyWithTools = appendToolsToHistory(history, tools); + } - cachePathToUse = - existingCache !== null - ? existingCache.cachePath - : preResponseCacheInfo.cachePath; - - let cacheExists = existingCache !== null; - logCacheStatus("auto", cacheExists); - - if (!cacheExists) { - await initSystemPromptCache( - model, - cachePathToUse, - systemPromptToUse, - "auto", - staticTools ? tools : undefined, - ); - markCacheInitialized( - modelId, - configHash, - preResponseCacheInfo.cacheKey, - ); - cacheExists = true; - } - - const messagesToSend = prepareMessagesForCache( - cachePathToUse, - cacheExists, - history, - dynamicTools ? tools : undefined, - ); - logMessagesToAddon(messagesToSend, "PROMPT_SEND"); - - const result = yield* processModelResponse( - model, - messagesToSend, - tools, - mergedGenerationParams, - { cacheKey: cachePathToUse, saveCacheToDisk: true }, - dialect, - ); + const transformedHistory = transformMessages(historyWithTools); + logCacheDisabled(); + logMessagesToAddon(transformedHistory, "NO_CACHE"); + return yield* processModelResponse( + model, + transformedHistory, + tools, + mergedGenerationParams, + undefined, + dialect, + ); + } - // TODO: support auto-cache for tool-call turns by keying off the - // structured assistant/tool messages callers push into history, - // not result.responseText (which is raw tool-call markup here). - // Until then, remove any cache file the addon wrote so it doesn't - // leak on disk (the next turn would compute a different key and - // never reach it). - if (result.toolCalls.length > 0) { - logger.warn( - `[kv-cache] Auto cache tool-call turn; removing orphaned cache to avoid disk leak. path=${cachePathToUse}`, - ); - try { - await fsPromises.unlink(cachePathToUse); - } catch (unlinkError) { - logger.warn( - `[kv-cache] Failed to remove orphaned tool-turn cache file; disk leak likely. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, - ); - } - cachedMessageCounts.delete(cachePathToUse); - return result; - } - - // A cancelled or zero-token turn cannot be promoted to a post-response - // cache: the post-response key is derived from `result.responseText`, - // which is empty/partial in those cases, and the on-disk cache the - // addon wrote is not aligned with the current-history hash. Treat it - // like the tool-call branch — drop the cache file and clear the count. - if (!shouldRecordSavedCount(signal, result.producedTokens)) { - try { - await fsPromises.unlink(cachePathToUse); - } catch (unlinkError) { - logger.warn( - `[kv-cache] Failed to remove cache file after cancelled or empty turn; disk leak possible. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, - ); - } - cachedMessageCounts.delete(cachePathToUse); - return result; - } - - const savedHistory = buildAutoCacheSaveHistory( - cacheMessages, - result.responseText, - ); - const postResponseCacheInfo = await getCurrentCacheInfo( - modelId, - configHash, - savedHistory, - ); + // ---- KV-cache path. The session owns all three bookkeeping layers + // (on-disk `.bin`, `initializedCaches`, `cachedMessageCounts`). The + // handler asks for a turn, registers rollback on the scope, and on + // the happy path calls `commitTurn` which short-circuits the deferred + // rollback. Cancellations / zero-token replies / rename failures all + // unwind through the same `scope.defer` hook. ---- + + const session = createKvCacheSession(modelId); + const systemPromptFromHistory = extractSystemPrompt(history); + // Dynamic mode lets each turn carry its own tool set, so the cache + // hash must not depend on the tool list — otherwise a tool change + // would force a fresh cache file and defeat the whole optimisation. + const configHash = generateConfigHash( + systemPromptFromHistory, + dynamicTools ? undefined : tools, + ); - if ( - !(await renameCacheFile( - cachePathToUse, - postResponseCacheInfo.cachePath, - )) - ) { - logger.warn( - `[kv-cache] Auto cache rename failed; removing stale cache to avoid disk leak. from=${cachePathToUse} to=${postResponseCacheInfo.cachePath}`, - ); - try { - await fsPromises.unlink(cachePathToUse); - } catch (unlinkError) { - logger.warn( - `[kv-cache] Failed to remove stale cache file; disk leak likely. path=${cachePathToUse} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, - ); - } - cachedMessageCounts.delete(cachePathToUse); - return result; - } - - cachedMessageCounts.delete(cachePathToUse); - await recordCacheSaveCount( - postResponseCacheInfo.cachePath, - savedHistory.length, - ); + const systemPromptToUse = + systemPromptFromHistory || + (modelConfig as { system_prompt?: string }).system_prompt || + "You are a helpful assistant."; + + const primeIfMissing = async (cachePath: string) => { + await initSystemPromptCache( + model, + cachePath, + systemPromptToUse, + typeof kvCache === "string" ? kvCache : "auto", + // Static-mode tools are baked into the system-prompt cache so + // they're shared across the session. Dynamic-mode tools belong + // to a per-turn anchor and must not enter the system cache. + staticTools ? tools : undefined, + ); + }; - return result; - } - } else { - let historyWithTools: Array = history; - if (staticTools && tools) { - historyWithTools = prependToolsToHistory(history, tools); - } else if (dynamicTools && tools) { - historyWithTools = appendToolsToHistory(history, tools); - } + let turn: TurnHandle; + if (typeof kvCache === "string") { + turn = await session.beginTurn({ + kind: "custom", + customKey: kvCache, + configHash, + primeIfMissing, + }); + } else { + const cacheMessages: CacheMessage[] = history.map((msg) => ({ + role: msg.role, + content: msg.content, + attachments: msg.attachments ?? undefined, + })); + turn = await session.beginTurn({ + kind: "auto", + configHash, + history: cacheMessages, + primeIfMissing, + }); + } + + // Single cleanup hook for every non-success exit path. `commitTurn` + // flips the turn's internal `committed` flag so this becomes a no-op + // on the happy path. Scope unwinding is LIFO — registered after the + // `removeEventListener` defer above so rollback runs before the + // listener detach. + scope.defer(() => session.rollback(turn)); + + // `cacheExists` is implied by `beginTurn` — the session either found + // an existing cache or just primed one. Pass `true` to the message + // selector so the slicing branches engage. + const messagesToSend = prepareMessagesForCache( + session, + turn, + /* cacheExists */ true, + history, + dynamicTools ? tools : undefined, + ); + logMessagesToAddon(messagesToSend, "PROMPT_SEND"); - const transformedHistory = transformMessages(historyWithTools); - logCacheDisabled(); - logMessagesToAddon(transformedHistory, "NO_CACHE"); - return yield* processModelResponse( - model, - transformedHistory, - tools, - mergedGenerationParams, - undefined, - dialect, - ); + const result = yield* processModelResponse( + model, + messagesToSend, + tools, + mergedGenerationParams, + { cacheKey: turn.cachePath, saveCacheToDisk: true }, + dialect, + ); + + if (typeof kvCache === "string") { + // Custom-key path: the addon wrote the new cache state inline at + // the same path. Either commit (records the boundary, suppresses + // rollback) or fall through to the deferred rollback. + if (shouldCommitTurn(signal, result.producedTokens)) { + await session.commitTurn(turn, { + kind: "static", + messageCount: history.length + 1, + }); } - } finally { - signal.removeEventListener("abort", onAbort); + return result; } + + // Auto-cache path. + // + // Tool-call turns: the auto-cache key is derived from + // `result.responseText`, which here is raw tool-call markup rather + // than a clean assistant message. There's no safe post-response key + // to rename to, so we let the deferred rollback drop the file. Once + // the SDK supports auto-cache for structured assistant/tool turns, + // this becomes a normal commit path. + if (result.toolCalls.length > 0) { + logger.warn( + `[kv-cache] Auto cache tool-call turn; rolling back to avoid disk leak. path=${turn.cachePath}`, + ); + return result; + } + + if (!shouldCommitTurn(signal, result.producedTokens)) { + // Cancelled or zero-token turn — the addon wrote the file but its + // contents don't correspond to a clean turn boundary. Let the + // deferred rollback unlink it. + return result; + } + + const savedHistory = buildAutoCacheSaveHistory( + history.map((msg) => ({ + role: msg.role, + content: msg.content, + attachments: msg.attachments ?? undefined, + })), + result.responseText, + ); + const postResponseCacheInfo = await getCurrentCacheInfo( + modelId, + configHash, + savedHistory, + ); + + await session.commitTurn(turn, { + kind: "autoRename", + targetCachePath: postResponseCacheInfo.cachePath, + messageCount: savedHistory.length, + }); + + return result; } diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts new file mode 100644 index 0000000000..802d619ed7 --- /dev/null +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts @@ -0,0 +1,652 @@ +import { promises as fsPromises } from "bare-fs"; +import path from "bare-path"; +import { + findMatchingCache, + generateConfigHash, + getCacheFilePath, + getCurrentCacheInfo, + renameCacheFile, + deleteCache as deleteCacheUtil, +} from "@/server/bare/ops/kv-cache-utils"; +import type { CacheMessage } from "@/server/utils"; +import { + logCacheSaveError, + logCacheStatus, +} from "@/server/bare/plugins/llamacpp-completion/ops/cache-logger"; +import { getServerLogger } from "@/logging"; + +const logger = getServerLogger(); + +/** + * Single owner of the three KV-cache bookkeeping layers. + * + * The llama.cpp completion handler has three independent layers it must + * keep consistent across every cancel/error branch: + * + * 1. `cachedMessageCounts: Map` — the "n messages + * currently on disk" tracker. + * 2. `initializedCaches: Set` — the "addon defers disk writes; + * we know this cache is primed" tracker. + * 3. On-disk `.bin` files written by the addon. + * + * Without a single owner, every cancel / zero-token / rename-failed / + * tool-call exit would need to touch all three; any branch that forgets + * a layer produces three-layer drift bugs. + * + * `KvCacheSession` collapses the three layers behind one object with + * three operations: + * + * - `beginTurn` — resolves the cache file path, primes the system + * prompt cache if missing (delegated to a caller-supplied closure + * so the session doesn't depend on the model addon), marks the + * cache initialized, and returns a `TurnHandle` carrying the + * resolved path + the snapshot of the on-disk saved count. + * - `commitTurn` — records the new saved count (for custom-key + * turns) or renames the addon's pre-response file to the + * post-response path and records the count there (for auto-cache + * turns). Flips the turn's internal `committed` flag so the + * deferred `rollback` becomes a no-op on the happy path. + * - `rollback` — atomically deletes the on-disk file, clears the + * in-memory init entry, and forgets the saved count. **All three + * layers, always, in one place.** Handlers call it once via + * `ctx.scope.defer(() => session.rollback(turn))`; `commitTurn` + * short-circuits it on success. + * + * The module-level `deleteKvCacheState(...)` function (below) provides + * an administrative cross-model delete API for the + * `handleDeleteCache` RPC handler. + * + * The module-scoped `cachedMessageCounts` and `initializedCaches` maps + * are *private* to this file — no other module reaches into them. + * Callers that need cache-status info do so through the session API. + */ + +// ----- module-scoped state. The session is the single mutation point +// for the in-memory KV-cache bookkeeping. ----- + +/** + * Number of chat messages the kv-cache file on disk is known to cover, + * keyed by cache path. Written by `commitTurn`, read by `getSavedCount`, + * deleted by `rollback` / `delete` / `dropStaleSavedCount`. The same + * INVARIANT that existed in `kv-cache-state.ts` still holds: an entry is + * present only when the corresponding `.bin` file is considered + * trustworthy. Cancelled or zero-token turns must remove the entry so + * the next-turn slice doesn't read a stale boundary. + */ +const cachedMessageCounts = new Map(); + +/** + * In-memory registry of caches initialized this session. The addon + * defers disk writes, so the absence of a `.bin` file on disk isn't + * proof that the cache hasn't been primed in this worker process. Keyed + * by `${modelId}:${configHash}:${cacheKey}`, so on-disk caches from + * older worker runs still hit the lazy-load path in `beginTurn`. + */ +const initializedCaches = new Set(); + +function initRegistryKey( + modelId: string, + configHash: string, + cacheKey: string, +): string { + return `${modelId}:${configHash}:${cacheKey}`; +} + +// ----- public types ----- + +export interface TurnHandle { + /** Resolved on-disk cache file path the addon will read from / write to. */ + readonly cachePath: string; + /** + * Snapshot of the on-disk saved-message count at `beginTurn` time + * (0 if the cache was just primed). Consumed by `decideCachedHistorySlice` + * to pick the message tail for the next addon call. + */ + readonly savedCount: number; +} + +export interface BeginCustomTurnInput { + kind: "custom"; + /** User-provided session key (`completion({ kvCache: "session-a" })`). */ + customKey: string; + /** Hash of system prompt + (static) tool names. */ + configHash: string; + /** + * Prime the cache by sending system prompt + (static) tools to the + * addon. Called when the cache doesn't exist in-memory OR on disk. + * Kept as an injected closure so this module has no dependency on the + * model registry / addon — the handler closes over `model` and tools. + */ + primeIfMissing: (cachePath: string) => Promise; +} + +export interface BeginAutoTurnInput { + kind: "auto"; + /** Hash of system prompt + (static) tool names. */ + configHash: string; + /** Conversation history used to compute the pre-response cache key. */ + history: CacheMessage[]; + /** See `BeginCustomTurnInput.primeIfMissing`. */ + primeIfMissing: (cachePath: string) => Promise; +} + +export type BeginTurnInput = BeginCustomTurnInput | BeginAutoTurnInput; + +export interface StaticCommitResult { + kind: "static"; + /** `history.length + 1` — recorded at the turn's current `cachePath`. */ + messageCount: number; +} + +export interface AutoRenameCommitResult { + kind: "autoRename"; + /** + * Destination path the addon's pre-response cache file should be + * renamed to (computed from `cacheMessages + responseText`). The + * stale entry at the source path is dropped from `cachedMessageCounts` + * and the new count is recorded at this target path. + */ + targetCachePath: string; + /** Number of messages the renamed cache represents (`savedHistory.length`). */ + messageCount: number; +} + +export type CommitResult = StaticCommitResult | AutoRenameCommitResult; + +export interface KvCacheSession { + /** + * Open a new turn against the cache. Resolves the cache file path, + * primes the system-prompt cache if needed (delegated to + * `input.primeIfMissing`), marks the cache initialized, and returns a + * `TurnHandle` the handler attaches to `ctx.scope.defer(...)` for the + * rollback hook. + */ + beginTurn(input: BeginTurnInput): Promise; + + /** + * Commit a successful turn — records the new saved-message count, + * preserves the cache file, and (for auto-cache turns) renames the + * addon's pre-response file to the post-response path. Flips the + * turn's internal `committed` flag so the deferred `rollback` becomes + * a no-op on the happy path. + */ + commitTurn(turn: TurnHandle, result: CommitResult): Promise; + + /** + * Roll back an in-flight turn — atomically deletes the on-disk cache + * file, clears the in-memory `initializedCaches` entry, and forgets + * the `cachedMessageCounts` entry. **All three layers, always, in + * one place.** Idempotent: a turn that has already been committed + * or rolled back is a no-op on subsequent calls. Handlers register + * this via `ctx.scope.defer(...)` so it runs regardless of how the + * handler exits (success branch removes itself via `commitTurn`). + */ + rollback(turn: TurnHandle): Promise; + + /** + * Forget the in-memory saved-message count for the turn's path + * without unlinking the file or clearing the init flag. Used when + * `decideCachedHistorySlice` detects a stale boundary + * (`clearStaleCount: true`) — the next turn re-sends the full history + * but the cache itself is still usable. + */ + dropStaleSavedCount(turn: TurnHandle): void; +} + +interface InternalTurnState { + cachePath: string; + registryKey: string; + /** Flipped by `commitTurn`; consulted at the top of `rollback`. */ + committed: boolean; + /** Flipped at the end of `rollback`; protects against double-rollback. */ + rolledBack: boolean; +} + +// ----- factory ----- + +export function createKvCacheSession(modelId: string): KvCacheSession { + // Per-session map: each `TurnHandle` carries an opaque entry here. A + // WeakMap so handles drop their state once the handler scope releases + // the reference; the module-scoped maps above survive. + const turnState = new WeakMap(); + + function makeHandle(cachePath: string, registryKey: string): TurnHandle { + const handle: TurnHandle = { + cachePath, + savedCount: cachedMessageCounts.get(cachePath) ?? 0, + }; + turnState.set(handle, { + cachePath, + registryKey, + committed: false, + rolledBack: false, + }); + return handle; + } + + async function beginCustom(input: BeginCustomTurnInput): Promise { + const cachePath = await getCacheFilePath( + modelId, + input.configHash, + input.customKey, + ); + const registryKey = initRegistryKey( + modelId, + input.configHash, + input.customKey, + ); + + // In-memory registry check first — the addon defers disk writes, so + // a freshly-primed cache may not yet exist on disk. If the + // in-memory flag isn't set, fall back to a filesystem probe so + // caches surviving across worker restarts still hit the reuse path. + let exists = initializedCaches.has(registryKey); + if (!exists) { + try { + await fsPromises.access(cachePath); + exists = true; + initializedCaches.add(registryKey); + } catch { + exists = false; + } + } + logCacheStatus(input.customKey, exists); + + if (!exists) { + await input.primeIfMissing(cachePath); + await verifyPrimedFile(cachePath); + initializedCaches.add(registryKey); + } + + return makeHandle(cachePath, registryKey); + } + + async function beginAuto(input: BeginAutoTurnInput): Promise { + // The pre-response cache key is derived from + // `history.slice(0, -1)` — `findMatchingCache` does that + // internally. The post-response key (used after a successful turn) + // is computed by the caller and passed to `commitTurn` as + // `targetCachePath`. + const existingCache = await findMatchingCache( + modelId, + input.configHash, + input.history, + ); + const preResponseCacheInfo = await getCurrentCacheInfo( + modelId, + input.configHash, + input.history, + ); + + const cachePath = + existingCache !== null + ? existingCache.cachePath + : preResponseCacheInfo.cachePath; + const cacheKeyForRegistry = + existingCache !== null + ? existingCache.cacheKey + : preResponseCacheInfo.cacheKey; + const registryKey = initRegistryKey( + modelId, + input.configHash, + cacheKeyForRegistry, + ); + + const cacheExists = existingCache !== null; + logCacheStatus("auto", cacheExists); + + if (!cacheExists) { + await input.primeIfMissing(cachePath); + await verifyPrimedFile(cachePath); + initializedCaches.add(registryKey); + } + + return makeHandle(cachePath, registryKey); + } + + async function beginTurn(input: BeginTurnInput): Promise { + if (input.kind === "custom") return beginCustom(input); + return beginAuto(input); + } + + async function commitTurn( + turn: TurnHandle, + result: CommitResult, + ): Promise { + const state = turnState.get(turn); + if (!state) { + // Handle from a different session or already torn down. Treat as + // no-op — caller shouldn't be reaching into another session's + // state, but failing loudly here punishes the rollback-after-end + // path more than it helps. + return; + } + if (state.committed || state.rolledBack) return; + + if (result.kind === "static") { + // Custom-key path: the addon wrote the new cache state inline + // at the same path. Verify the file persisted (the addon + // currently swallows save errors — see TODO in + // `verifySaveAndRecord`) and record the new boundary. + const ok = await verifySaveAndRecord( + state.cachePath, + result.messageCount, + ); + if (!ok) { + // The expected save didn't land — treat the turn as a rollback + // so the next turn re-primes cleanly. + await runRollback(state); + return; + } + state.committed = true; + return; + } + + // Auto-rename path: the pre-response file is now stale (its key + // refers to history minus the last user turn). Rename it to the + // post-response key and record the new count there. + if (!(await renameCacheFile(state.cachePath, result.targetCachePath))) { + logger.warn( + `[kv-cache] Auto cache rename failed; rolling back. from=${state.cachePath} to=${result.targetCachePath}`, + ); + await runRollback(state); + return; + } + + // The source path's entry is gone (the file moved). Drop it and + // record the new count at the rename target. + cachedMessageCounts.delete(state.cachePath); + + const ok = await verifySaveAndRecord( + result.targetCachePath, + result.messageCount, + ); + if (!ok) { + // Rename succeeded but the file isn't where we expected. Roll + // back via the target path instead of the (now-empty) source. + state.cachePath = result.targetCachePath; + await runRollback(state); + return; + } + + // Successful auto-rename. The handle's `cachePath` field still + // points at the (now-gone) source path — that's fine, the handle + // is committed and won't roll back. Future turns compute fresh + // paths. + state.committed = true; + } + + async function rollback(turn: TurnHandle): Promise { + const state = turnState.get(turn); + if (!state) return; + if (state.committed || state.rolledBack) return; + await runRollback(state); + } + + async function runRollback(state: InternalTurnState): Promise { + // Order matters only weakly: unlink first so a partial disk-state + // can't be re-loaded by a sibling turn between the file delete and + // the in-memory clear. In practice handlers serialise per model; + // the order is belt-and-suspenders. + try { + await fsPromises.unlink(state.cachePath); + } catch (unlinkError) { + logger.warn( + `[kv-cache] Failed to remove cache file during rollback; next turn may load stale KV state. path=${state.cachePath} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, + ); + } + initializedCaches.delete(state.registryKey); + cachedMessageCounts.delete(state.cachePath); + state.rolledBack = true; + } + + function dropStaleSavedCount(turn: TurnHandle): void { + const state = turnState.get(turn); + if (!state) return; + cachedMessageCounts.delete(state.cachePath); + } + + return { + beginTurn, + commitTurn, + rollback, + dropStaleSavedCount, + }; +} + +// ----- module-level administrative API ----- + +/** + * Atomically delete every layer of KV-cache state for a + * `(kvCacheKey, modelId)` pair, or wipe everything. Single entry point + * — the only mutation point for cross-model state outside of + * turn-scoped `commitTurn`/`rollback`. + * + * Why this isn't a method on `KvCacheSession`: deletes are + * cross-model (`all: true` has no model; the keyed form has + * `modelId` optional on the wire). A session, by contrast, is + * created with a *fixed* `modelId` for the duration of a turn. Making + * delete a method would force callers to materialise an irrelevant + * session for cross-model administrative cleanups. + * + * Layers cleared, in order: + * 1. On-disk: `deleteCache(...)` removes the matching directory + * tree (or wipes and recreates the root for `all: true`). + * 2. `cachedMessageCounts`: prefix-cleanup by the removed directory + * so any per-cache count under the deleted tree is forgotten. + * 3. `initializedCaches`: scope clear by `(kvCacheKey[, modelId])`, + * matching the on-disk scope. + * + * Concurrency with in-flight turns: this delete is wire-async with + * respect to any turn currently holding a `TurnHandle` for the same + * cache key. Worst case the on-disk `.bin` is removed while a turn is + * mid-write; the turn's eventual `commitTurn(...)` then fails the + * `verifySaveAndRecord` probe (file gone) and rolls back idempotently. + * No coordination primitive is needed because every layer's mutation + * is idempotent (`unlink` no-ops if missing, `Map.delete` / `Set.delete` + * no-op on absent keys). + */ +export async function deleteKvCacheState( + target: { kvCacheKey: string; modelId?: string } | { all: true }, +): Promise { + if ("all" in target) { + const removed = await deleteCacheUtil({ all: true }); + cachedMessageCounts.clear(); + initializedCaches.clear(); + // `removed` is the kv-cache root dir; logging surfaces it for + // ops visibility but isn't part of the contract. + logger.debug(`[kv-cache] Cleared all caches under ${removed}`); + return; + } + + const removedPath = await deleteCacheUtil({ + kvCacheKey: target.kvCacheKey, + ...(target.modelId !== undefined && { modelId: target.modelId }), + }); + + // Prefix-cleanup the in-memory counts. The on-disk directory tree + // is `{kvCacheRoot}/{kvCacheKey}[/{modelId}]/`, so every entry in + // `cachedMessageCounts` whose key is the removed directory itself + // or sits beneath it must go. + clearCachedMessageCountsByPrefix(removedPath, path.sep); + + // The in-memory init-set keys are + // `${modelId}:${configHash}:${kvCacheKey}` — clear by the user- + // facing kvCacheKey (and optionally narrow by modelId). + clearInitializedCachesByScope({ + cacheKey: target.kvCacheKey, + ...(target.modelId !== undefined && { modelId: target.modelId }), + }); +} + +// ----- private helpers ----- + +/** + * Verify that the addon actually persisted a usable cache file after a + * prime. Mirrors the `verifySaveAndRecord` access-probe used at commit + * time, applied at prime time so the session doesn't mark a cache + * `initializedCaches.add(...)` against a path that's missing or empty + * on disk. + * + * Failure modes this catches: + * + * - The addon's `model.run({ saveSessionPath })` was interrupted + * before the save call ran (e.g. signal abort during prefill); the + * prime closure resolves cleanly because addon save errors are not + * propagated, but no file is on disk. + * - The addon's `llama_state_save_file` was called but produced an + * empty file (out-of-space / fs error swallowed by the addon). + * + * Failure modes this does **NOT** catch: + * + * - A partial-but-nonzero file written by the addon (e.g. header + + * truncated KV state). Catching this requires either an + * addon-side change (have `CacheManager::writeCacheFile` check the + * return value of `llama_state_save_file` and throw on failure) or + * a structural hash check we can't currently compute from the + * SDK. Filed as a follow-up — see `cache-api.md` in the addon + * repo / tracking ticket. + * + * On failure we best-effort `unlink` an empty leftover file (so the + * next existence probe doesn't trust it) and throw — the handler in + * `completion-stream.ts` lets the error propagate up and no + * `initializedCaches` entry is recorded. + */ +async function verifyPrimedFile(cachePath: string): Promise { + let stats: { size: number }; + try { + stats = await fsPromises.stat(cachePath); + } catch (statError) { + // ENOENT is the common case here — addon prime returned without + // calling save (most often: signal abort during prefill). + throw new Error( + `[kv-cache] prime closure resolved but no cache file was written. path=${cachePath} cause=${statError instanceof Error ? statError.message : String(statError)}`, + ); + } + if (stats.size === 0) { + // Best-effort cleanup so a future probe doesn't trust the empty + // file. Unlink failure is non-fatal — we still throw on the + // primary "prime didn't persist" condition. + try { + await fsPromises.unlink(cachePath); + } catch (unlinkError) { + logger.warn( + `[kv-cache] Failed to remove empty primed cache file. path=${cachePath} error=${unlinkError instanceof Error ? unlinkError.message : String(unlinkError)}`, + ); + } + throw new Error( + `[kv-cache] prime closure resolved but cache file is empty. path=${cachePath}`, + ); + } +} + +/** + * Verify the addon actually persisted the cache file before recording + * its message count. The addon currently swallows write errors + * silently, so a missing file means the next turn must resend the full + * history rather than slicing against a stale `savedCount`. + * + * TODO: once the addon surfaces save failures (e.g. throws + * `UnableToSaveSessionFile` when `llama_state_save_file` returns + * false), drop the `access()` probe and wrap the `model.run()` call in + * a real try/catch that forwards the error. + */ +async function verifySaveAndRecord( + cachePath: string, + messageCount: number, +): Promise { + try { + await fsPromises.access(cachePath); + cachedMessageCounts.set(cachePath, messageCount); + return true; + } catch (err) { + cachedMessageCounts.delete(cachePath); + logCacheSaveError(cachePath, err); + return false; + } +} + +function clearCachedMessageCountsByPrefix(prefix: string, sep: string): void { + if (!prefix) { + cachedMessageCounts.clear(); + return; + } + for (const key of cachedMessageCounts.keys()) { + if (key === prefix) { + cachedMessageCounts.delete(key); + continue; + } + if (!key.startsWith(prefix + sep)) continue; + cachedMessageCounts.delete(key); + } +} + +function clearInitializedCachesByScope(scope: { + cacheKey?: string | undefined; + modelId?: string | undefined; +}): void { + if (scope.cacheKey === undefined && scope.modelId === undefined) { + initializedCaches.clear(); + return; + } + for (const key of initializedCaches) { + const firstSep = key.indexOf(":"); + const secondSep = key.indexOf(":", firstSep + 1); + if (firstSep === -1 || secondSep === -1) continue; + const entryModelId = key.slice(0, firstSep); + const entryCacheKey = key.slice(secondSep + 1); + if (scope.cacheKey !== undefined && entryCacheKey !== scope.cacheKey) { + continue; + } + if (scope.modelId !== undefined && entryModelId !== scope.modelId) { + continue; + } + initializedCaches.delete(key); + } +} + +// ----- test-only escape hatches ----- + +/** + * Test-only access to the module-scoped state. Production code reaches + * for cache state exclusively through the session API; the unit suite + * for `kv-cache-session.test.ts` needs to seed and inspect raw state + * to assert the rollback / commit invariants. Not part of the public + * SDK surface. + * + * @internal + */ +export const __kvCacheSessionTestHooks = { + getSavedCount(cachePath: string): number | undefined { + return cachedMessageCounts.get(cachePath); + }, + setSavedCountForTest(cachePath: string, count: number): void { + cachedMessageCounts.set(cachePath, count); + }, + hasInitializedKey( + modelId: string, + configHash: string, + cacheKey: string, + ): boolean { + return initializedCaches.has( + initRegistryKey(modelId, configHash, cacheKey), + ); + }, + markInitializedForTest( + modelId: string, + configHash: string, + cacheKey: string, + ): void { + initializedCaches.add(initRegistryKey(modelId, configHash, cacheKey)); + }, + resetForTest(): void { + cachedMessageCounts.clear(); + initializedCaches.clear(); + }, +}; + +// Re-export `generateConfigHash` from the path utilities so callers of +// the session can compute the hash without separately importing +// `kv-cache-utils`. The function itself stays in `kv-cache-utils.ts` +// (pure, no state) — only the re-export lives here. +export { generateConfigHash }; diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts index 335423da5d..6d92d1d397 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts @@ -1,77 +1,20 @@ /** - * Pure state + decision helpers for kv-cache bookkeeping used by - * `completion-stream.ts`. + * Pure decision helper for kv-cache history slicing used by + * `completion-stream.ts` (via the slice computed from + * `TurnHandle.savedCount`). * * This module intentionally has **no** `bare-*` imports so it can be - * exercised directly from unit tests running under `bun` without pulling - * in the Bare runtime (which is not available in that environment). - * The file-system-dependent pieces (e.g. `recordCacheSaveCount`) live in - * `completion-stream.ts` and consume the state exported here. + * exercised directly from unit tests running under `bun` without + * pulling in the Bare runtime (which is not available in that + * environment). * - * History note (QVAC-18181, SDK 0.11.0): this module previously also - * carried a per-model cancel counter (`modelCancelCounters` / - * `noteCancelRequested` / `snapshotCancelCount`) used by `completion()` - * to detect mid-decode cancellation. That side channel was retired - * alongside the introduction of `RequestRegistry` — the in-flight - * `RequestContext.signal` is now the single source of truth and - * `completion-stream.ts` reads `signal.aborted` directly. + * Cancel detection flows through the per-request `AbortSignal` from + * `RequestRegistry`; the `cachedMessageCounts` map lives in + * `kv-cache-session.ts`, which owns all three KV-cache bookkeeping + * layers (saved counts, init flags, on-disk files). Only the pure + * slice-decision helper remains here. */ -/** - * Number of chat messages the kv-cache file on disk is known to cover, keyed - * by cache path. Written after a successful completion records a save, read - * by `prepareMessagesForCache` to slice the history on the next turn so a - * consumer can stage multiple messages between completions (e.g. an - * `[assistant, user]` recovery sequence) without resending the whole history. - * - * INVARIANT: an entry is only present if the corresponding kv-cache file is - * considered trustworthy. On any turn where the SDK cannot prove the saved - * count reflects the on-disk state (cancellation mid-decode, zero-token - * reply, cache file missing after a save attempt), the entry MUST be - * deleted; a stale entry causes the next turn to slice its history down to - * an empty payload and the model returns zero tokens. - * - * Mode notes: - * - Static-mode turns are the only readers of this map. `clearStaleCount` - * in `decideCachedHistorySlice` exists so a stale-but-non-zero entry - * can be detected and dropped on the next read. - * - Dynamic-mode turns do NOT consume this map — the addon trims tools - * and the chain output from the kv-cache after each round, so the - * SDK falls back to role-based dispatch in `prepareMessagesForCache`. - * Writes still happen on dynamic-mode turns, but the recorded count - * reflects the messages the SDK shipped, not the (possibly trimmed) - * on-disk cache shape. The map is internally consistent within a - * single mode; a `kvCache` key should not be reused across modes. - */ -export const cachedMessageCounts = new Map(); - -/** - * Clear bookkeeping entries. With no argument, clears the whole map. With a - * `prefix`, removes any entry whose path is equal to it OR sits beneath it - * as a directory (i.e. `key.startsWith(prefix + sep)`). - * - * Runtime callers MUST pass the platform path separator (e.g. `path.sep` - * from `bare-path`) so directory-prefix matches are correct on every - * target. The "/" default exists only for unit tests under `bun`, where - * cache paths are POSIX-shaped and importing `bare-path` would pull in - * the Bare runtime. The exported wrapper in `completion-stream.ts` - * injects the real separator for in-process use. - */ -export function clearCachedMessageCounts(prefix?: string, sep = "/"): void { - if (!prefix) { - cachedMessageCounts.clear(); - return; - } - for (const key of cachedMessageCounts.keys()) { - if (key === prefix) { - cachedMessageCounts.delete(key); - continue; - } - if (!key.startsWith(prefix + sep)) continue; - cachedMessageCounts.delete(key); - } -} - export interface HistoryMessage { role: string; content: string; @@ -83,8 +26,9 @@ export interface HistorySliceDecision { messages: HistoryMessage[]; /** * True when the decision path proves the current `savedCount` is stale - * and the caller should `cachedMessageCounts.delete(cachePath)` to avoid - * propagating the bad count to the next turn. + * and the caller should drop the cached entry (via + * `KvCacheSession.dropStaleSavedCount(turn)`) to avoid propagating the + * bad count to the next turn. */ clearStaleCount: boolean; } @@ -93,13 +37,14 @@ export interface HistorySliceDecision { * Pure slice decision for `prepareMessagesForCache`. * * Mirrors the shape of the logic in `completion-stream.ts` but without - * calling `transformMessages` (which depends on `bare-fs` for attachment - * probing). Kept here so the decision can be unit-tested in isolation. + * calling `transformMessages` (which depends on `bare-fs` for + * attachment probing). Kept here so the decision can be unit-tested in + * isolation. * - * The key regression guard: when a non-zero `savedCount` would slice the - * history down to an empty array, it is treated as stale — the caller - * falls back to sending the system-stripped full history rather than - * handing the model an empty payload. + * The key regression guard: when a non-zero `savedCount` would slice + * the history down to an empty array, it is treated as stale — the + * caller falls back to sending the system-stripped full history rather + * than handing the model an empty payload. */ export function decideCachedHistorySlice( savedCount: number, diff --git a/packages/sdk/server/bare/plugins/llamacpp-completion/plugin.ts b/packages/sdk/server/bare/plugins/llamacpp-completion/plugin.ts index fd483e2411..66e0bea46a 100644 --- a/packages/sdk/server/bare/plugins/llamacpp-completion/plugin.ts +++ b/packages/sdk/server/bare/plugins/llamacpp-completion/plugin.ts @@ -151,7 +151,7 @@ export const llmPlugin = definePlugin({ ...(toolsActive && { toolDialect: dialect }), ...(request.responseFormat && { responseFormat: request.responseFormat }), }, - { signal: ctx.signal }, + { signal: ctx.signal, scope: ctx.scope }, ); try { @@ -173,9 +173,17 @@ export const llmPlugin = definePlugin({ } const { modelExecutionMs, stats, toolCalls } = result.value; + // `stopReason: "cancelled"` rides the success-done path: the + // events stream ends normally, the cancellation is observable + // via the last event's `stopReason`, and the client-side + // `CompletionRun` aggregates (`final` / `text` / `toolCalls` / + // `stats`) reject with `InferenceCancelledError` carrying the + // partial state. + const cancelled = ctx.signal.aborted; const terminalEvents = normalizer.finish({ ...(stats && { stats }), ...(toolCalls.length > 0 && { toolCalls }), + ...(cancelled && { stopReason: "cancelled" as const }), }); if (!request.stream) { diff --git a/packages/sdk/server/bare/runtime/request-registry.ts b/packages/sdk/server/bare/runtime/request-registry.ts index 58e374ed63..11e291c886 100644 --- a/packages/sdk/server/bare/runtime/request-registry.ts +++ b/packages/sdk/server/bare/runtime/request-registry.ts @@ -1,7 +1,4 @@ -import { - AbortController, - type AbortSignal, -} from "bare-abort-controller"; +import { AbortController, type AbortSignal } from "bare-abort-controller"; import { createDisposableScope, type DisposableScope, @@ -121,9 +118,109 @@ interface RegistryEntry { detachParent: () => void; } +/** + * Bookkeeping entry for a `cancel({ requestId })` that arrived before + * the matching `begin({ requestId })` ran. Used to close the + * Stop-button race. + */ +interface CancelBeforeBeginEntry { + /** `Date.now()` snapshot for TTL eviction. */ + at: number; + /** Forwarded to `controller.abort(reason)` once `begin(...)` arrives. */ + reason?: string; +} + +/** + * Tuning knobs for the "cancelled-before-begin" bookkeeping set. + * + * The race window is bounded by the client-to-server round-trip: a + * `cancel({ requestId })` issued by the client at the same time as the + * matching `completion(...)` either lands first (and we need to remember + * it long enough for the server's `begin(...)` to follow) or lands + * second (and we never touch this set). 30 seconds is overkill for a + * 500ms round-trip but gives slow networks / pause-the-debugger + * scenarios enough slack while still bounding worst-case retention. + * + * The size cap protects against a buggy or malicious client firing a + * stream of cancels for ids that never get a `begin(...)` follow-up — + * each `cancel({ requestId })` that doesn't match an in-flight context + * inserts one entry, so without a cap the worker would grow the map + * unbounded. At the cap, the oldest entry is evicted. + * + * Tweak with care: both bounds appear in the registry race test + * (`bounded cancel-before-begin set does not grow past its cap`). + */ +const CANCEL_BEFORE_BEGIN_MAX_ENTRIES = 128; +const CANCEL_BEFORE_BEGIN_TTL_MS = 30_000; + export function createRequestRegistry(): RequestRegistry { const entries = new Map(); + /** + * "Cancelled-before-begin" tripwire. A + * `cancel({ requestId })` whose target isn't yet in `entries` records + * the id here; the subsequent `begin({ requestId: })` then + * aborts the new controller before returning. Map order is insertion + * order — the iterator's first key is the oldest entry, which makes + * the size-cap eviction free. + * + * Invariants: + * - Every read path (`begin`, `cancel`-by-id) calls + * `pruneCancelBeforeBeginExpired()` first so a 30s+ stale entry + * never decides a fresh `begin(...)`. + * - Insertion enforces the size cap by evicting the oldest entry + * when at capacity — a malicious client cannot grow this map + * unbounded. + * - On a successful `begin(...)` match, the entry is consumed + * (removed) so a second `begin(...)` with the same id (which + * would itself throw `RequestIdConflictError`) doesn't see a + * phantom pre-cancel. + */ + const cancelledBeforeBegin = new Map(); + + function pruneCancelBeforeBeginExpired(now: number = Date.now()): void { + if (cancelledBeforeBegin.size === 0) return; + const cutoff = now - CANCEL_BEFORE_BEGIN_TTL_MS; + for (const [id, entry] of cancelledBeforeBegin) { + if (entry.at > cutoff) break; // Insertion order ⇒ rest are newer. + cancelledBeforeBegin.delete(id); + } + } + + function recordCancelBeforeBegin( + requestId: string, + reason: string | undefined, + ): void { + const now = Date.now(); + pruneCancelBeforeBeginExpired(now); + // Re-canceling an id that is already tracked refreshes its TTL but + // keeps the original reason — the first cancel "won" the race. + if (cancelledBeforeBegin.has(requestId)) { + const existing = cancelledBeforeBegin.get(requestId)!; + cancelledBeforeBegin.delete(requestId); + cancelledBeforeBegin.set(requestId, { ...existing, at: now }); + return; + } + if (cancelledBeforeBegin.size >= CANCEL_BEFORE_BEGIN_MAX_ENTRIES) { + const oldest = cancelledBeforeBegin.keys().next().value; + if (oldest !== undefined) cancelledBeforeBegin.delete(oldest); + } + cancelledBeforeBegin.set( + requestId, + reason !== undefined ? { at: now, reason } : { at: now }, + ); + } + + function consumeCancelBeforeBegin( + requestId: string, + ): CancelBeforeBeginEntry | undefined { + pruneCancelBeforeBeginExpired(); + const entry = cancelledBeforeBegin.get(requestId); + if (!entry) return undefined; + cancelledBeforeBegin.delete(requestId); + return entry; + } + function cancelEntry(entry: RegistryEntry, reason?: string): boolean { if (entry.controller.signal.aborted) return false; entry.ctx.state = "cancelling"; @@ -152,6 +249,18 @@ export function createRequestRegistry(): RequestRegistry { const controller = new AbortController(); const scope = createDisposableScope(); + // Stop-button race close. If a + // `cancel({ requestId })` already arrived for this id, abort the + // new controller before observers can subscribe to it. The + // tripwire entry is consumed so a later, separate `begin(...)` + // with the same id is unaffected (in practice ids are UUIDv4 and + // never reused; this guard just keeps the contract self- + // consistent under retries). + const preCancel = consumeCancelBeforeBegin(opts.requestId); + if (preCancel) { + controller.abort(preCancel.reason); + } + let detachParent = () => {}; if (opts.parentSignal) { const parent = opts.parentSignal; @@ -170,7 +279,13 @@ export function createRequestRegistry(): RequestRegistry { modelId: opts.modelId, signal: controller.signal, scope, - state: "running", + // Land the context in `cancelling` from the outset whenever the + // controller was already aborted by `begin(...)` itself — either + // the Stop-button race (`preCancel`) or a `parentSignal` that was + // already aborted at begin time. Both branches abort the + // controller above, so without this guard observers would see a + // momentarily-`running` context with an already-aborted signal. + state: preCancel || opts.parentSignal?.aborted ? "cancelling" : "running", }; const entry: RegistryEntry = { ctx, controller, scope, detachParent }; @@ -216,7 +331,16 @@ export function createRequestRegistry(): RequestRegistry { let cancelled = 0; if ("requestId" in target) { const entry = entries.get(target.requestId); - if (entry && cancelEntry(entry, target.reason)) cancelled++; + if (entry) { + if (cancelEntry(entry, target.reason)) cancelled++; + return cancelled; + } + // Stop-button race: the client beat its own + // `begin(...)`. Record the cancel so the next matching `begin` + // aborts immediately. The return value stays 0 — no in-flight + // request was matched, which is still the truth — but the + // *effective* cancel will land when the begin arrives. + recordCancelBeforeBegin(target.requestId, target.reason); return cancelled; } for (const entry of entries.values()) { @@ -247,9 +371,34 @@ export function createRequestRegistry(): RequestRegistry { await disposeEntry(entry, outcome); } - return { begin, get, list, cancel, cancelAll, end }; + return { + begin, + get, + list, + cancel, + cancelAll, + end, + // Test-only: lets the registry race tests assert the bound + // invariants on the internal "cancelled-before-begin" set without + // exposing it as a public surface. Kept off the `RequestRegistry` + // interface (typed via the augmented return type below) so handler + // code can't depend on it accidentally. + __cancelBeforeBeginSize: () => cancelledBeforeBegin.size, + } as RequestRegistry & { __cancelBeforeBeginSize: () => number }; } +/** + * Test-only knobs exported for `request-registry.test.ts` so the bound + * assertions can pin the documented limits without re-reading them via + * fragile string comparison. **Not part of the public SDK surface.** + * + * @internal + */ +export const __requestRegistryTestHooks = { + cancelBeforeBeginMaxEntries: CANCEL_BEFORE_BEGIN_MAX_ENTRIES, + cancelBeforeBeginTtlMs: CANCEL_BEFORE_BEGIN_TTL_MS, +}; + function derivedTerminalState(ctx: RequestContext): RequestOutcome { if (ctx.state === "failed") return "failed"; if (ctx.signal.aborted || ctx.state === "cancelled") return "cancelled"; diff --git a/packages/sdk/server/rpc/handlers/delete-cache.ts b/packages/sdk/server/rpc/handlers/delete-cache.ts index 1da9d2842c..d013c4a0b9 100644 --- a/packages/sdk/server/rpc/handlers/delete-cache.ts +++ b/packages/sdk/server/rpc/handlers/delete-cache.ts @@ -1,33 +1,28 @@ import type { DeleteCacheRequest, DeleteCacheResponse } from "@/schemas"; -import { - clearCacheRegistry, - deleteCache as deleteCacheUtil, -} from "@/server/bare/ops/kv-cache-utils"; -import { clearCachedMessageCounts } from "@/server/bare/plugins/llamacpp-completion/ops/completion-stream"; +import { deleteKvCacheState } from "@/server/bare/plugins/llamacpp-completion/ops/kv-cache-session"; import { getServerLogger } from "@/logging"; const logger = getServerLogger(); +/** + * RPC handler for `deleteCache(...)`. The three KV-cache bookkeeping + * layers (on-disk `.bin`, `initializedCaches`, `cachedMessageCounts`) + * are private to `kv-cache-session.ts`; this handler delegates to that + * module's single administrative entry point (`deleteKvCacheState`). + * The handler must have **zero** direct references to + * `fsPromises.unlink`, the `initializedCaches` set, or the + * `cachedMessageCounts` map. + */ export async function handleDeleteCache( request: DeleteCacheRequest, ): Promise { try { if ("all" in request && request.all) { - await deleteCacheUtil({ all: true }); - clearCachedMessageCounts(); - clearCacheRegistry(); + await deleteKvCacheState({ all: true }); } else if ("kvCacheKey" in request) { - const params: { kvCacheKey: string; modelId?: string } = { + await deleteKvCacheState({ kvCacheKey: request.kvCacheKey, - }; - if (request.modelId !== undefined) { - params.modelId = request.modelId; - } - const removedPath = await deleteCacheUtil(params); - clearCachedMessageCounts(removedPath); - clearCacheRegistry({ - cacheKey: request.kvCacheKey, - modelId: request.modelId, + ...(request.modelId !== undefined && { modelId: request.modelId }), }); } diff --git a/packages/sdk/test/unit/completion-event-schemas.test.ts b/packages/sdk/test/unit/completion-event-schemas.test.ts index 7ddb0ffb6a..16800eedc4 100644 --- a/packages/sdk/test/unit/completion-event-schemas.test.ts +++ b/packages/sdk/test/unit/completion-event-schemas.test.ts @@ -28,6 +28,34 @@ test("completionDone: enforces error/stopReason invariant", (t) => { bad({ type: "completionDone", seq: 0, error: { message: "orphan" } }); }); +test("completionDone: accepts the 'cancelled' stopReason on the success path", (t) => { + // A cancelled completion is a *clean* termination, not an error. The + // event must validate against the `successDoneSchema` discriminant — + // same shape as `"eos"` — so stream-first consumers see the cancel + // as a typed `stopReason` rather than a thrown error. The companion + // `errorDoneSchema`'s `stopReason: "error"` is reserved for genuine + // mid-stream failures where the partial state is unsafe to use. + const ok = (v: unknown) => t.is(doneEventSchema.safeParse(v).success, true); + const bad = (v: unknown) => t.is(doneEventSchema.safeParse(v).success, false); + + ok({ type: "completionDone", seq: 5, stopReason: "cancelled" }); + ok({ + type: "completionDone", + seq: 5, + stopReason: "cancelled", + raw: { fullText: "partial..." }, + }); + + // Cancelled is NOT a valid error-done discriminant — the error + // payload is forbidden because the stream-end carries no error. + bad({ + type: "completionDone", + seq: 5, + stopReason: "cancelled", + error: { message: "spurious" }, + }); +}); + test("completionEventSchema: routes event types and rejects unknown", (t) => { const ok = (v: unknown) => t.is(completionEventSchema.safeParse(v).success, true); @@ -37,6 +65,7 @@ test("completionEventSchema: routes event types and rejects unknown", (t) => { ok({ type: "completionStats", seq: 3, stats: { tokensPerSecond: 45 } }); ok({ type: "completionDone", seq: 4, stopReason: "error", error: { message: "timeout" } }); ok({ type: "completionDone", seq: 5 }); + ok({ type: "completionDone", seq: 6, stopReason: "cancelled" }); t.is(completionEventSchema.safeParse({ type: "unknown", seq: 0 }).success, false); t.is(completionEventSchema.safeParse({ type: "contentDelta", seq: -1, text: "x" }).success, false); diff --git a/packages/sdk/test/unit/kv-cache-cancel.test.ts b/packages/sdk/test/unit/kv-cache-cancel.test.ts index 8f221e4270..f2f07122cd 100644 --- a/packages/sdk/test/unit/kv-cache-cancel.test.ts +++ b/packages/sdk/test/unit/kv-cache-cancel.test.ts @@ -1,8 +1,6 @@ // @ts-expect-error brittle has no type declarations import test from "brittle"; import { - cachedMessageCounts, - clearCachedMessageCounts, decideCachedHistorySlice, type HistoryMessage, } from "@/server/bare/plugins/llamacpp-completion/ops/kv-cache-state"; @@ -11,14 +9,26 @@ import { // Unit-level regression coverage for `decideCachedHistorySlice` — the pure // piece of the kv-cache cancel/zero-token fix (QVAC-17780). // -// In SDK 0.11.0 the cancel-counter side channel that used to live in this -// module (`modelCancelCounters`, `noteCancelRequested`, `snapshotCancelCount`, -// `shouldRecordSavedCount`) was retired. Cancel detection now flows through -// the per-request `AbortSignal` from `RequestRegistry` (see -// `test/unit/runtime/request-registry.test.ts`) and `completion-stream.ts` -// reads `signal.aborted` directly. The slice-decision regression coverage -// below is still relevant — it guards the "stale savedCount → empty payload" -// failure mode that's independent of how cancel is plumbed. +// In SDK 0.11.0 the cancel-counter side channel that used to live in +// this module (`modelCancelCounters`, `noteCancelRequested`, +// `snapshotCancelCount`, `shouldRecordSavedCount`) was retired. Cancel +// detection now flows through the per-request `AbortSignal` from +// `RequestRegistry` (see `test/unit/runtime/request-registry.test.ts`) +// and `completion-stream.ts` reads `signal.aborted` directly. +// +// The `cachedMessageCounts` map and its `clearCachedMessageCounts` +// helper that this file used to seed via `import { cachedMessageCounts } +// from "kv-cache-state"` were moved into `kv-cache-session.ts` as the +// single owner of all three KV-cache bookkeeping layers. The pure +// slice-decision helper still lives in `kv-cache-state.ts` and takes +// `savedCount` as a plain parameter; these tests now drive it without +// seeding any module state. The session's own commit/rollback +// semantics are covered by `runtime/kv-cache-session.test.ts`. +// +// The slice-decision regression coverage below remains relevant — it +// guards the "stale savedCount → empty payload" failure mode that's +// independent of how cancel is plumbed or who owns the saved-count +// state. // ----------------------------------------------------------------------------- type T = { @@ -26,12 +36,7 @@ type T = { is: (actual: unknown, expected: unknown, msg?: string) => void; }; -function resetState() { - clearCachedMessageCounts(); -} - test("decideCachedHistorySlice: baseline slice when savedCount is valid", (t: T) => { - resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, { role: "user", content: "hi" }, @@ -51,7 +56,6 @@ test("decideCachedHistorySlice: baseline slice when savedCount is valid", (t: T) }); test("decideCachedHistorySlice: stale count (slice would be empty) falls back and flags clear", (t: T) => { - resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, { role: "user", content: "u1" }, @@ -74,7 +78,6 @@ test("decideCachedHistorySlice: stale count (slice would be empty) falls back an }); test("decideCachedHistorySlice: savedCount > history.length falls back and flags clear", (t: T) => { - resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, { role: "user", content: "u1" }, @@ -89,7 +92,6 @@ test("decideCachedHistorySlice: savedCount > history.length falls back and flags }); test("decideCachedHistorySlice: savedCount = 0, cache exists → strip system, no clear", (t: T) => { - resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, { role: "user", content: "u1" }, @@ -104,7 +106,6 @@ test("decideCachedHistorySlice: savedCount = 0, cache exists → strip system, n }); test("decideCachedHistorySlice: cache does not exist → strip system regardless of savedCount", (t: T) => { - resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, { role: "user", content: "u1" }, @@ -123,7 +124,6 @@ test("decideCachedHistorySlice: cache does not exist → strip system regardless }); test("decideCachedHistorySlice: empty history returns empty, no clear", (t: T) => { - resetState(); const { messages, clearStaleCount } = decideCachedHistorySlice(2, true, []); t.alike(messages, []); t.is(clearStaleCount, false); @@ -134,7 +134,6 @@ test("decideCachedHistorySlice: savedCount = history.length slices to [] and fla // `history.length + 1` for a 2-message history; the user's next turn // has 3 messages and a savedCount of 3 — slicing yields []. The // fallback must fire. - resetState(); const history: HistoryMessage[] = [ { role: "system", content: "sys" }, { role: "user", content: "u1" }, @@ -157,16 +156,16 @@ test("regression: an externally-seeded stale savedCount still triggers the fallb // (e.g. from a pre-upgrade SDK instance still running in memory) and // confirm that `decideCachedHistorySlice` refuses to emit an empty // payload and also flags the stale count for cleanup. - resetState(); - const cachePath = "/tmp/qvac-17780-poisoned.bin"; - cachedMessageCounts.set(cachePath, 3); - + // + // The `cachedMessageCounts` map is private to `kv-cache-session.ts`, + // so this regression is exercised by feeding the poisoned count into + // the pure helper directly — the same surface the session calls. + const savedCount = 3; const history: HistoryMessage[] = [ { role: "system", content: "sys" }, { role: "user", content: "u1" }, { role: "user", content: "u2" }, ]; - const savedCount = cachedMessageCounts.get(cachePath) ?? 0; const { messages, clearStaleCount } = decideCachedHistorySlice( savedCount, true, diff --git a/packages/sdk/test/unit/runtime/kv-cache-session.test.ts b/packages/sdk/test/unit/runtime/kv-cache-session.test.ts new file mode 100644 index 0000000000..4663fa2511 --- /dev/null +++ b/packages/sdk/test/unit/runtime/kv-cache-session.test.ts @@ -0,0 +1,644 @@ +// @ts-expect-error brittle has no type declarations +import test from "brittle"; + +// ----------------------------------------------------------------------------- +// `KvCacheSession` unit tests. +// +// The session is the single owner of the three KV-cache bookkeeping layers +// (on-disk `.bin`, `initializedCaches` set, `cachedMessageCounts` map). +// Without a single owner the completion handler would have to touch all +// three on every cancel / error branch and quickly drift out of sync. +// The functional-equivalence assertions below pin the contract: +// +// 1. `beginTurn` primes the cache (calls the injected closure) the +// first time and reuses the in-memory init flag on subsequent +// turns — no spurious re-prime. +// 2. `commitTurn({ kind: "static" })` records the new saved count and +// flips the turn's `committed` flag so the deferred `rollback` +// becomes a no-op on the happy path. +// 3. `rollback` clears all three layers, even when the on-disk file +// doesn't exist (the `unlink` error is logged but not propagated; +// in-memory state is still cleared). +// 4. `rollback` after `commitTurn` is a no-op (handle-internal flag +// protects the committed state from later disposal). +// 5. Double-`rollback` is idempotent. +// 6. `dropStaleSavedCount` clears the saved count without unlinking +// the file or touching the init flag (used by the slice fallback +// in `decideCachedHistorySlice`). +// 7. `deleteKvCacheState({ kvCacheKey })` clears every layer for the +// targeted key, across models. Used by `handleDeleteCache`. +// 8. `deleteKvCacheState({ all: true })` wipes everything. +// +// ---- Runtime gating ---- +// +// `kv-cache-session.ts` imports `bare-fs` and `bare-path` at module +// scope (production code path — the session resolves real on-disk +// cache files). `bare-path/lib/posix.js` references `Bare.platform` at +// import time, and `bare-os` carries N-API bindings — neither resolves +// in Bun. To keep the file importable by `bun run test:unit` without +// crashing the suite, the tests below load the session module via +// dynamic `import()` from inside a `bareTest(...)` wrapper that +// `test.skip`s when running under Bun. The full suite runs under the +// Bare runtime (when a Bare unit-test entry exists for this directory) +// and serves as documentation + readable contract under Bun. +// +// This mirrors the pattern established in `test/unit/path-security.test.ts` +// for similar bare-only coverage. +// ----------------------------------------------------------------------------- + +const isBunUnitTestRunner = + typeof (globalThis as { Bun?: unknown }).Bun !== "undefined"; +// @ts-ignore Bare global only exists in Bare runtime +const isBareRuntime = + !isBunUnitTestRunner && typeof globalThis.Bare !== "undefined"; + +type T = { + is: (actual: unknown, expected: unknown, msg?: string) => void; + alike: (actual: unknown, expected: unknown, msg?: string) => void; + ok: (value: unknown, msg?: string) => void; + pass: (msg?: string) => void; + fail: (msg?: string) => void; + exception: ( + fn: () => Promise | unknown, + matcher?: unknown, + msg?: string, + ) => Promise; +}; + +function bareTest(name: string, fn: (t: T) => Promise | void) { + if (isBareRuntime) { + test(name, fn); + } else { + test.skip(`[bare-only] ${name}`, () => {}); + } +} + +// Lazy loader for the session module. Only invoked inside test +// bodies; never runs under Bun because the `bareTest` wrapper above +// short-circuits to `test.skip`. +async function loadSession() { + const fs = await import("fs"); + const os = await import("os"); + const path = await import("path"); + + const testHome = fs.mkdtempSync(path.join(os.tmpdir(), "qvac-kvcache-")); + process.env["HOME"] = testHome; + + const mod = + await import("@/server/bare/plugins/llamacpp-completion/ops/kv-cache-session"); + + // Reset state between tests — module state is per-process, the + // tests share it. + mod.__kvCacheSessionTestHooks.resetForTest(); + + function cleanup() { + try { + fs.rmSync(testHome, { recursive: true, force: true }); + } catch { + /* best-effort */ + } + } + + return { fs, mod, cleanup }; +} + +bareTest( + "kv-cache-session: beginTurn primes the cache on first use, reuses on second", + async (t: T) => { + const { mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash( + "you are a helpful assistant.", + [], + ); + let primeCallCount = 0; + const primeIfMissing = async () => { + primeCallCount++; + }; + + const firstTurn = await session.beginTurn({ + kind: "custom", + customKey: "session-a", + configHash, + primeIfMissing, + }); + t.is(primeCallCount, 1, "first turn primes the cache"); + t.is(firstTurn.savedCount, 0, "no saved count on a freshly-primed cache"); + t.ok( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "session-a", + ), + "initializedCaches entry registered after prime", + ); + + await session.commitTurn(firstTurn, { + kind: "static", + messageCount: 3, + }); + + const secondTurn = await session.beginTurn({ + kind: "custom", + customKey: "session-a", + configHash, + primeIfMissing, + }); + t.is( + primeCallCount, + 1, + "second turn reuses the primed cache — no spurious re-prime", + ); + t.is( + secondTurn.savedCount, + 3, + "saved count from the first turn's commit is reflected on the second turn's handle", + ); + } finally { + cleanup(); + } + }, +); + +bareTest( + "kv-cache-session: commitTurn records the new saved count and suppresses rollback", + async (t: T) => { + const { fs, mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + const primeIfMissing = async () => {}; + + const turn = await session.beginTurn({ + kind: "custom", + customKey: "session-commit", + configHash, + primeIfMissing, + }); + + // The addon silently swallows save errors, so the session + // `fs.access`-checks the cache file before recording the count. + // Simulate that the addon wrote the file. + fs.writeFileSync(turn.cachePath, "fake-cache-bytes"); + + await session.commitTurn(turn, { kind: "static", messageCount: 7 }); + + t.is( + mod.__kvCacheSessionTestHooks.getSavedCount(turn.cachePath), + 7, + "commit records the new saved message count", + ); + + // Rollback after commit must be a no-op — the committed state + // has to survive a wholesale scope teardown. + await session.rollback(turn); + t.is( + mod.__kvCacheSessionTestHooks.getSavedCount(turn.cachePath), + 7, + "rollback after commit does NOT clear the saved count", + ); + t.ok( + fs.existsSync(turn.cachePath), + "rollback after commit does NOT delete the cache file", + ); + t.ok( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "session-commit", + ), + "rollback after commit does NOT clear the in-memory init flag", + ); + } finally { + cleanup(); + } + }, +); + +bareTest( + "kv-cache-session: rollback wipes all three layers atomically", + async (t: T) => { + const { fs, mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + const primeIfMissing = async () => {}; + + const turn = await session.beginTurn({ + kind: "custom", + customKey: "session-rollback", + configHash, + primeIfMissing, + }); + fs.writeFileSync(turn.cachePath, "stale-bytes"); + mod.__kvCacheSessionTestHooks.setSavedCountForTest(turn.cachePath, 4); + + await session.rollback(turn); + + t.is( + fs.existsSync(turn.cachePath), + false, + "rollback unlinked the on-disk cache file", + ); + t.is( + mod.__kvCacheSessionTestHooks.getSavedCount(turn.cachePath), + undefined, + "rollback forgot the cachedMessageCounts entry", + ); + t.is( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "session-rollback", + ), + false, + "rollback cleared the initializedCaches entry", + ); + } finally { + cleanup(); + } + }, +); + +bareTest( + "kv-cache-session: rollback tolerates a missing on-disk file", + async (t: T) => { + const { mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + const primeIfMissing = async () => {}; + + const turn = await session.beginTurn({ + kind: "custom", + customKey: "session-missing-file", + configHash, + primeIfMissing, + }); + mod.__kvCacheSessionTestHooks.setSavedCountForTest(turn.cachePath, 2); + // Intentionally do NOT create the file. Cancelled mid-write + // turns hit this branch — the session must still clear the + // in-memory state cleanly when there's nothing to unlink. + + await session.rollback(turn); + + t.is( + mod.__kvCacheSessionTestHooks.getSavedCount(turn.cachePath), + undefined, + "in-memory state cleared even when the unlink fails", + ); + t.is( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "session-missing-file", + ), + false, + "init flag cleared even when the unlink fails", + ); + } finally { + cleanup(); + } + }, +); + +bareTest("kv-cache-session: double-rollback is idempotent", async (t: T) => { + const { fs, mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + const primeIfMissing = async () => {}; + + const turn = await session.beginTurn({ + kind: "custom", + customKey: "session-double", + configHash, + primeIfMissing, + }); + fs.writeFileSync(turn.cachePath, "bytes"); + + await session.rollback(turn); + await session.rollback(turn); + t.pass("second rollback completed without throwing"); + } finally { + cleanup(); + } +}); + +bareTest( + "kv-cache-session: dropStaleSavedCount forgets the count without touching the file or init flag", + async (t: T) => { + const { fs, mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + const primeIfMissing = async () => {}; + + const turn = await session.beginTurn({ + kind: "custom", + customKey: "session-stale", + configHash, + primeIfMissing, + }); + fs.writeFileSync(turn.cachePath, "good-bytes"); + mod.__kvCacheSessionTestHooks.setSavedCountForTest(turn.cachePath, 99); + + session.dropStaleSavedCount(turn); + + t.is( + mod.__kvCacheSessionTestHooks.getSavedCount(turn.cachePath), + undefined, + "stale saved count was forgotten", + ); + t.ok( + fs.existsSync(turn.cachePath), + "the on-disk cache file is preserved (still usable next turn)", + ); + t.ok( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "session-stale", + ), + "init flag is preserved (cache is still primed)", + ); + } finally { + cleanup(); + } + }, +); + +bareTest( + "kv-cache-session: deleteKvCacheState({ kvCacheKey }) wipes every layer for the targeted key", + async (t: T) => { + const { fs, mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + const primeIfMissing = async () => {}; + + const turn = await session.beginTurn({ + kind: "custom", + customKey: "delete-me", + configHash, + primeIfMissing, + }); + fs.writeFileSync(turn.cachePath, "bytes"); + mod.__kvCacheSessionTestHooks.setSavedCountForTest(turn.cachePath, 11); + + await mod.deleteKvCacheState({ kvCacheKey: "delete-me" }); + + t.is( + fs.existsSync(turn.cachePath), + false, + "on-disk file removed by the keyed delete", + ); + t.is( + mod.__kvCacheSessionTestHooks.getSavedCount(turn.cachePath), + undefined, + "in-memory saved count cleared by the keyed delete", + ); + t.is( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "delete-me", + ), + false, + "init flag cleared by the keyed delete", + ); + } finally { + cleanup(); + } + }, +); + +bareTest( + "kv-cache-session: deleteKvCacheState({ all: true }) wipes everything", + async (t: T) => { + const { fs, mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + const primeIfMissing = async () => {}; + + const t1 = await session.beginTurn({ + kind: "custom", + customKey: "wipe-a", + configHash, + primeIfMissing, + }); + const t2 = await session.beginTurn({ + kind: "custom", + customKey: "wipe-b", + configHash, + primeIfMissing, + }); + fs.writeFileSync(t1.cachePath, "a"); + fs.writeFileSync(t2.cachePath, "b"); + mod.__kvCacheSessionTestHooks.setSavedCountForTest(t1.cachePath, 1); + mod.__kvCacheSessionTestHooks.setSavedCountForTest(t2.cachePath, 2); + + await mod.deleteKvCacheState({ all: true }); + + t.is( + mod.__kvCacheSessionTestHooks.getSavedCount(t1.cachePath), + undefined, + "all-delete clears the first saved count", + ); + t.is( + mod.__kvCacheSessionTestHooks.getSavedCount(t2.cachePath), + undefined, + "all-delete clears the second saved count", + ); + t.is( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "wipe-a", + ), + false, + "all-delete clears the first init flag", + ); + t.is( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "wipe-b", + ), + false, + "all-delete clears the second init flag", + ); + } finally { + cleanup(); + } + }, +); + +bareTest( + "kv-cache-session: beginTurn throws if prime closure resolves but no cache file is on disk", + async (t: T) => { + // Mirrors the existing `verifySaveAndRecord` access-probe at + // commit time, applied at prime time. The addon's + // `model.run({ saveSessionPath })` swallows save errors silently + // and can also be interrupted before save runs — both cases + // resolve the prime closure cleanly while leaving no file on + // disk. The session must NOT mark such a prime as initialised + // because the next existence probe would see no file and + // re-prime, but the in-memory init flag would already say + // "primed". `verifyPrimedFile` turns this into a propagated error. + const { mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + + let observedPath: string | null = null; + const primeIfMissing = async (cachePath: string) => { + observedPath = cachePath; + // Resolve cleanly without touching disk — simulates the + // addon being interrupted before its save call. + }; + + let caught: unknown = null; + try { + await session.beginTurn({ + kind: "custom", + customKey: "prime-no-file", + configHash, + primeIfMissing, + }); + } catch (err) { + caught = err; + } + + t.ok(observedPath, "primeIfMissing observed a cache path"); + t.ok( + caught instanceof Error, + "beginTurn rejected because verifyPrimedFile threw", + ); + t.ok( + caught instanceof Error && + caught.message.includes("no cache file was written"), + "error message identifies the missing-file failure mode", + ); + t.is( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "prime-no-file", + ), + false, + "init flag NOT set when verifyPrimedFile rejects", + ); + } finally { + cleanup(); + } + }, +); + +bareTest( + "kv-cache-session: beginTurn throws and removes the empty file when prime resolves with a zero-byte cache", + async (t: T) => { + // The addon ignores `llama_state_save_file`'s return value, so an + // out-of-space / fs flap mid-save can leave an empty file on + // disk while the prime closure still resolves cleanly. Trusting + // that file as a primed cache would later cause the addon's + // `loadCache` to skip it (its own `isFileInitialized` checks + // size > 0) and silently fall back to re-priming inline — but the + // session's `initializedCaches` flag would mistakenly say + // "primed". `verifyPrimedFile` removes the empty file and + // surfaces the failure to the handler. + const { fs, mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + + let observedPath: string | null = null; + const primeIfMissing = async (cachePath: string) => { + observedPath = cachePath; + fs.writeFileSync(cachePath, ""); + }; + + let caught: unknown = null; + try { + await session.beginTurn({ + kind: "custom", + customKey: "prime-empty-file", + configHash, + primeIfMissing, + }); + } catch (err) { + caught = err; + } + + t.ok(observedPath, "primeIfMissing observed a cache path"); + t.ok( + caught instanceof Error && + caught.message.includes("cache file is empty"), + "error message identifies the empty-file failure mode", + ); + t.is( + fs.existsSync(observedPath as unknown as string), + false, + "empty cache file was removed so the next probe doesn't trust it", + ); + t.is( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "prime-empty-file", + ), + false, + "init flag NOT set on the empty-prime path", + ); + } finally { + cleanup(); + } + }, +); + +bareTest( + "kv-cache-session: commitTurn rolls back if the addon did not persist the file", + async (t: T) => { + // The addon currently swallows save errors silently — a missing + // file after a save-disk turn means the next turn must NOT slice + // against a stale saved count. The session's + // `verifySaveAndRecord` probe turns this into a rollback instead + // of a phantom commit. + const { mod, cleanup } = await loadSession(); + try { + const session = mod.createKvCacheSession("test-model"); + const configHash = mod.generateConfigHash("sys", []); + const primeIfMissing = async () => {}; + + const turn = await session.beginTurn({ + kind: "custom", + customKey: "missing-save", + configHash, + primeIfMissing, + }); + // Intentionally do NOT create the file — simulate a swallowed + // addon save error. + + await session.commitTurn(turn, { kind: "static", messageCount: 5 }); + + t.is( + mod.__kvCacheSessionTestHooks.getSavedCount(turn.cachePath), + undefined, + "no saved count recorded for a missing file", + ); + t.is( + mod.__kvCacheSessionTestHooks.hasInitializedKey( + "test-model", + configHash, + "missing-save", + ), + false, + "init flag rolled back when commit failed verification", + ); + } finally { + cleanup(); + } + }, +); diff --git a/packages/sdk/test/unit/runtime/request-registry.test.ts b/packages/sdk/test/unit/runtime/request-registry.test.ts index 86fa0f4385..c20c0f060e 100644 --- a/packages/sdk/test/unit/runtime/request-registry.test.ts +++ b/packages/sdk/test/unit/runtime/request-registry.test.ts @@ -1,12 +1,15 @@ // @ts-expect-error brittle has no type declarations import test from "brittle"; -import { createRequestRegistry } from "@/server/bare/runtime/request-registry"; +import { + createRequestRegistry, + __requestRegistryTestHooks, +} from "@/server/bare/runtime/request-registry"; import { RequestIdConflictError } from "@/utils/errors-server"; // ----------------------------------------------------------------------------- // RequestRegistry unit tests. // -// Covers the contract M1 hands to handler authors: +// Covers the contract the registry hands to handler authors: // - begin / get / list reflect a coherent in-flight set. // - cancel-by-requestId targets exactly one entry. // - cancel-by-modelId predicate fans out across entries with optional @@ -181,6 +184,10 @@ test("registry: parentSignal already aborted aborts the new context", async (t: parentSignal: parent.signal, }); t.is(ctx.signal.aborted, true); + // The controller is aborted at begin time, so observers must see + // `cancelling` rather than the momentarily-`running` state the + // pre-cancel branch was already guarding against. + t.is(ctx.state, "cancelling"); }); test("registry: parentSignal aborts propagate to children", async (t: T) => { @@ -278,19 +285,24 @@ test("registry: end() detaches parent listener so long-lived parents don't accum ); }); -test("registry: same-tick cancel-before-begin returns 0 and does not retroactively abort the later begin()", async (t: T) => { - // Documents the current M1 behavior of the Stop-button race the - // synchronous-`requestId` design property allows: client generates a - // `requestId` and immediately fires `cancel({ requestId })` before the - // server-side `begin(...)` lands. The registry has nothing to match, - // so the cancel is a no-op — the subsequent `begin(...)` runs to - // completion. M2's typed-cancel outcomes will close this gap (likely - // via a small bounded "cancelled-before-begin" set checked by - // `begin(...)`); this test pins the current contract so the M2 change - // surfaces here. +test("registry: same-tick cancel-before-begin retroactively aborts the later begin() (Stop-button race close)", async (t: T) => { + // Stop-button race: the client generates a `requestId` + // and the user clicks Stop before the server-side `begin(...)` for + // that id has landed. The registry has nothing to abort, so the + // immediate `cancel(...)` still returns 0 ("no in-flight match" is + // still the truth on the wire). The id is recorded in a bounded + // "cancelled-before-begin" set, and the subsequent `begin(...)` + // checks the set: if its id is present, the new controller is + // aborted before the context is returned and the entry is consumed. + // The surface contract is documented in + // `request-lifecycle-system.mdc`. const r = createRequestRegistry(); - const cancelled = r.cancel({ requestId: "r-1" }); - t.is(cancelled, 0, "no entry yet — cancel returns 0"); + const cancelled = r.cancel({ requestId: "r-1", reason: "stop-button" }); + t.is( + cancelled, + 0, + "no entry yet — cancel still returns 0 (race remembered, not retroactively counted)", + ); await using ctx = r.begin({ requestId: "r-1", @@ -299,10 +311,103 @@ test("registry: same-tick cancel-before-begin returns 0 and does not retroactive }); t.is( ctx.signal.aborted, + true, + "subsequent begin() is retroactively aborted by the pre-begin cancel", + ); + t.is( + ctx.state, + "cancelling", + "context starts in 'cancelling' so observers see a coherent state", + ); + t.is( + String((ctx.signal as { reason?: unknown }).reason), + "stop-button", + "the recorded cancel reason is forwarded to the aborted controller", + ); +}); + +test("registry: a second begin() with the same id (UUID retry) after the race is consumed runs cleanly", async (t: T) => { + // The Stop-button race close consumes its entry on the matching + // `begin(...)`. In practice ids are UUIDv4 and never reused, but a + // buggy client could retry an id whose first attempt was already + // aborted (and its scope torn down). The second begin must NOT see + // a phantom pre-cancel — entries are single-use. + const r = createRequestRegistry(); + r.cancel({ requestId: "r-1" }); + + async function firstAttempt() { + await using ctx = r.begin({ + requestId: "r-1", + kind: "completion", + modelId: "m1", + }); + t.is( + ctx.signal.aborted, + true, + "first attempt is aborted by the race close", + ); + } + await firstAttempt(); + + await using second = r.begin({ + requestId: "r-1", + kind: "completion", + modelId: "m1", + }); + t.is( + second.signal.aborted, + false, + "second attempt with the same id is unaffected — pre-cancel entry was consumed", + ); +}); + +test("registry: bounded cancel-before-begin set does not grow past its cap (TTL + size eviction)", async (t: T) => { + // The race-close map must be bounded so a malicious / buggy client + // can't fire 100k `cancel({ requestId: })` calls and grow + // the registry's memory unboundedly. The cap is documented at the + // module top (`CANCEL_BEFORE_BEGIN_MAX_ENTRIES`) and exported via + // `__requestRegistryTestHooks` for assertion stability. + const r = createRequestRegistry(); + const cap = __requestRegistryTestHooks.cancelBeforeBeginMaxEntries; + const overshoot = cap + 64; // fire well past the cap + + const sizeProbe = r as unknown as { __cancelBeforeBeginSize: () => number }; + + for (let i = 0; i < overshoot; i++) { + r.cancel({ requestId: `r-${i}` }); + } + t.is( + sizeProbe.__cancelBeforeBeginSize() <= cap, + true, + `internal map stays within the documented cap of ${cap} entries`, + ); + + // The oldest entries should have been evicted; the most recently + // inserted id should still be honoured on the matching begin(...). + const newestId = `r-${overshoot - 1}`; + await using newest = r.begin({ + requestId: newestId, + kind: "completion", + modelId: "m1", + }); + t.is( + newest.signal.aborted, + true, + "the freshest pre-cancel still wins the race (oldest entries evicted, newest preserved)", + ); + + // And one of the early (presumed-evicted) ids should NOT trigger a + // retroactive abort, because its entry was bumped out by the cap. + await using ancient = r.begin({ + requestId: "r-0", + kind: "completion", + modelId: "m1", + }); + t.is( + ancient.signal.aborted, false, - "subsequent begin() is not retroactively aborted by the pre-begin cancel", + "an evicted pre-cancel no longer affects later begin() — bound holds", ); - t.is(ctx.state, "running"); }); test("registry: derived terminal state is 'cancelled' if signal aborted, 'completed' otherwise", async (t: T) => { diff --git a/packages/sdk/tests-qvac/tests/cancellation-tests.ts b/packages/sdk/tests-qvac/tests/cancellation-tests.ts new file mode 100644 index 0000000000..34633bd23b --- /dev/null +++ b/packages/sdk/tests-qvac/tests/cancellation-tests.ts @@ -0,0 +1,80 @@ +// Cancellation-path e2e tests. +// +// Covers the three observable contracts that typed cancel outcomes + +// KvCacheSession introduce: +// +// 1. `cancel-mid-stream-completion` — mid-stream cancel: +// - `events` stream ends with `stopReason: "cancelled"` +// - `run.final` rejects with `InferenceCancelledError` +// - `error.partial.text` equals the concatenated `contentDelta` +// content that arrived before the cancel landed. +// +// 2. `cancel-before-begin-completion` — synthetic same-tick race: +// fire `cancel({ requestId })` immediately after `completion(...)` +// returns, before the worker has had a chance to call +// `registry.begin(...)`. Asserts that the cancelled-before-begin +// map applies the cancel retroactively (events end with +// `stopReason: "cancelled"`, `final` rejects with +// `InferenceCancelledError`). +// +// 3. `cancel-then-resume-kv-cache` — mid-stream cancel on a `kvCache: +// "session-..."` turn followed by a fresh turn on the same key: +// asserts that `KvCacheSession.rollback(...)` wiped the three KV +// layers atomically and the next turn re-primes cleanly. Distinct +// from `kv-cache-cancel-then-new-prompt` because this one asserts +// the typed cancel outcome on the first run *and* the clean reprime +// on the second run together. +import type { TestDefinition } from "@tetherto/qvac-test-suite"; + +export const cancelMidStreamCompletion: TestDefinition = { + testId: "cancel-mid-stream-completion", + params: { + prompt: "Tell me a long story about dragons, in many sentences.", + cancelAfterTokens: 3, + }, + expectation: { validation: "function", fn: () => true }, + suites: ["verify"], + metadata: { + category: "completion", + dependency: "llm", + estimatedDurationMs: 20000, + }, +}; + +export const cancelBeforeBeginCompletion: TestDefinition = { + testId: "cancel-before-begin-completion", + params: { + prompt: "Write a paragraph about the history of cryptography.", + }, + expectation: { validation: "function", fn: () => true }, + suites: ["verify"], + metadata: { + category: "completion", + dependency: "llm", + estimatedDurationMs: 20000, + }, +}; + +export const cancelThenResumeKvCache: TestDefinition = { + testId: "cancel-then-resume-kv-cache", + params: { + cacheKey: "cancel-then-resume-kvcache", + firstUserMessage: "Tell me a long story about wizards.", + secondUserMessage: "What is 2+2? Answer with just the number.", + expectedAnswerContains: "4", + cancelAfterTokens: 3, + }, + expectation: { validation: "function", fn: () => true }, + suites: ["verify"], + metadata: { + category: "completion", + dependency: "llm", + estimatedDurationMs: 30000, + }, +}; + +export const cancellationTests = [ + cancelMidStreamCompletion, + cancelBeforeBeginCompletion, + cancelThenResumeKvCache, +]; diff --git a/packages/sdk/tests-qvac/tests/desktop/consumer.ts b/packages/sdk/tests-qvac/tests/desktop/consumer.ts index 44dd1fd2cf..d93c89b583 100644 --- a/packages/sdk/tests-qvac/tests/desktop/consumer.ts +++ b/packages/sdk/tests-qvac/tests/desktop/consumer.ts @@ -74,6 +74,7 @@ import { LifecycleExecutor } from "../shared/executors/lifecycle-executor.js"; import { ConfigExecutor } from "../shared/executors/config-executor.js"; import { NoLingeringBareExecutor } from "./executors/no-lingering-bare-executor.js"; import { MultiGpuExecutor } from "../shared/executors/multi-gpu-executor.js"; +import { CancellationExecutor } from "../shared/executors/cancellation-executor.js"; const resources = new ResourceManager(); @@ -409,6 +410,7 @@ export const executor = createExecutor({ new ConfigExecutor(), new NoLingeringBareExecutor(), new MultiGpuExecutor(resources), + new CancellationExecutor(resources), ], profiling: { init: () => profiler.enable({ mode: "summary", includeServerBreakdown: true }), diff --git a/packages/sdk/tests-qvac/tests/mobile/consumer.ts b/packages/sdk/tests-qvac/tests/mobile/consumer.ts index fbe26fd490..3b7a8451dd 100644 --- a/packages/sdk/tests-qvac/tests/mobile/consumer.ts +++ b/packages/sdk/tests-qvac/tests/mobile/consumer.ts @@ -68,6 +68,7 @@ import { DelegatedInferenceExecutor } from "../shared/executors/delegated-infere import { MobileDiffusionExecutor } from "./executors/diffusion-executor.js"; import { LifecycleExecutor } from "../shared/executors/lifecycle-executor.js"; import { ConfigExecutor } from "../shared/executors/config-executor.js"; +import { CancellationExecutor } from "../shared/executors/cancellation-executor.js"; const resources = new ResourceManager({ // Mobile (iOS + Android) needs a tick after each unloadModel for the @@ -422,6 +423,7 @@ export const executor = createExecutor({ new MobileDiffusionExecutor(resources), new LifecycleExecutor(resources), new ConfigExecutor(), + new CancellationExecutor(resources), ], profiling: { init: () => profiler.enable({ mode: "summary", includeServerBreakdown: true }), diff --git a/packages/sdk/tests-qvac/tests/shared/executors/cancellation-executor.ts b/packages/sdk/tests-qvac/tests/shared/executors/cancellation-executor.ts new file mode 100644 index 0000000000..48c2efdf78 --- /dev/null +++ b/packages/sdk/tests-qvac/tests/shared/executors/cancellation-executor.ts @@ -0,0 +1,454 @@ +import { + cancel, + completion, + deleteCache, + InferenceCancelledError, +} from "@qvac/sdk"; +import { + type Expectation, + type TestResult, +} from "@tetherto/qvac-test-suite"; +import { AbstractModelExecutor } from "./abstract-model-executor.js"; +import { cancellationTests } from "../../cancellation-tests.js"; + +interface MidStreamParams { + prompt: string; + cancelAfterTokens?: number; +} + +interface CancelBeforeBeginParams { + prompt: string; +} + +interface CancelThenResumeKvCacheParams { + cacheKey: string; + firstUserMessage: string; + secondUserMessage: string; + expectedAnswerContains: string; + cancelAfterTokens?: number; +} + +export class CancellationExecutor extends AbstractModelExecutor< + typeof cancellationTests +> { + pattern = /^cancel-(mid-stream-completion|before-begin-completion|then-resume-kv-cache)$/; + + protected handlers = Object.fromEntries( + cancellationTests.map((test) => { + if (test.testId === "cancel-mid-stream-completion") { + return [test.testId, this.cancelMidStream.bind(this)]; + } + if (test.testId === "cancel-before-begin-completion") { + return [test.testId, this.cancelBeforeBegin.bind(this)]; + } + if (test.testId === "cancel-then-resume-kv-cache") { + return [test.testId, this.cancelThenResumeKvCache.bind(this)]; + } + return [test.testId, this.cancelMidStream.bind(this)]; + }), + ) as never; + + async cancelMidStream( + params: MidStreamParams, + _expectation: Expectation, + ): Promise { + const modelId = await this.resources.ensureLoaded("llm"); + const cancelAfterTokens = params.cancelAfterTokens ?? 3; + + const run = completion({ + modelId, + history: [ + { role: "system", content: "You are a helpful assistant." }, + { role: "user", content: params.prompt }, + ], + stream: true, + }); + + // Concurrently accumulate contentDelta text from `run.events`. This + // is the "source of truth" the SDK's aggregator builds `partial.text` + // from — equality between this and the rejected promise's + // `.partial.text` is the cancellation contract under test. + let accumulatedText = ""; + let lastEventType: string | null = null; + let lastStopReason: string | null = null; + let eventsCount = 0; + let cancelInvoked = false; + // Object container so TS keeps `Error | null` across the closure. + const cancelState: { error: Error | null } = { error: null }; + + const eventsTask = (async () => { + for await (const event of run.events) { + eventsCount++; + lastEventType = event.type; + if (event.type === "contentDelta") { + accumulatedText += event.text; + if (!cancelInvoked && eventsCount >= cancelAfterTokens) { + cancelInvoked = true; + try { + await cancel({ requestId: run.requestId }); + } catch (err) { + cancelState.error = + err instanceof Error ? err : new Error(String(err)); + } + } + } else if (event.type === "completionDone") { + lastStopReason = event.stopReason ?? null; + } + } + })(); + + let finalError: unknown = null; + let finalResolved = false; + try { + await run.final; + finalResolved = true; + } catch (err) { + finalError = err; + } + + await eventsTask; + + if (cancelState.error) { + return { + passed: false, + output: `cancel({ requestId }) rejected mid-stream: ${cancelState.error.message}`, + }; + } + + if (!cancelInvoked) { + return { + passed: false, + output: `Stream ended before ${cancelAfterTokens} contentDelta events arrived (only ${eventsCount} events seen) — cancel was never fired`, + }; + } + + // (a) The `events` stream must terminate normally with a + // `completionDone` event carrying `stopReason: "cancelled"`. The + // server still flushes a terminal event when the registry signal + // aborts so consumers iterating `run.events` exit naturally. + if (lastEventType !== "completionDone") { + return { + passed: false, + output: `Expected last event to be completionDone, got ${lastEventType} after ${eventsCount} events`, + }; + } + if (lastStopReason !== "cancelled") { + return { + passed: false, + output: `Expected completionDone.stopReason === "cancelled", got ${JSON.stringify(lastStopReason)}`, + }; + } + + // (b) `run.final` must reject with InferenceCancelledError. + if (finalResolved) { + return { + passed: false, + output: "run.final resolved instead of rejecting with InferenceCancelledError", + }; + } + if (!(finalError instanceof InferenceCancelledError)) { + const msg = finalError instanceof Error ? finalError.message : String(finalError); + const ctor = + finalError && typeof finalError === "object" + ? (finalError.constructor as { name?: string }).name ?? "unknown" + : typeof finalError; + return { + passed: false, + output: `run.final rejected with ${ctor}, expected InferenceCancelledError: ${msg}`, + }; + } + + // (c) `error.partial.text` must equal the concatenated + // `contentDelta` text we observed on the events stream. The + // SDK always populates `partial.text` for cancelled completions + // (even an empty string), but the schema marks it optional so we + // default to `""` here for the comparison. + const partialText = finalError.partial.text ?? ""; + if (partialText !== accumulatedText) { + return { + passed: false, + output: + `InferenceCancelledError.partial.text (${JSON.stringify( + partialText.slice(0, 80), + )}, len=${partialText.length}) did not match ` + + `concatenated contentDelta text (${JSON.stringify( + accumulatedText.slice(0, 80), + )}, len=${accumulatedText.length})`, + }; + } + + if (finalError.requestId !== run.requestId) { + return { + passed: false, + output: `InferenceCancelledError.requestId mismatch: error=${finalError.requestId}, run=${run.requestId}`, + }; + } + + return { + passed: true, + output: + `Mid-stream cancel OK: events=${eventsCount}, stopReason=cancelled, ` + + `partial.text length=${partialText.length} (matches concatenated contentDelta)`, + }; + } + + async cancelBeforeBegin( + params: CancelBeforeBeginParams, + _expectation: Expectation, + ): Promise { + const modelId = await this.resources.ensureLoaded("llm"); + + // Construct the run synchronously and fire the cancel on the very + // next turn, before awaiting anything from the run. The client-side + // `requestId` is generated before any RPC round-trip, so the cancel + // RPC can land while the server has not yet seen the + // `completionStream` request — or with `bare-rpc`'s request + // multiplexing, even slightly after begin() ran. The bounded + // cancelled-before-begin map handles both orderings: the eventual + // outcome is always a typed `cancelled` result on the wire. + const run = completion({ + modelId, + history: [ + { role: "system", content: "You are a helpful assistant." }, + { role: "user", content: params.prompt }, + ], + stream: true, + }); + + // Synchronous cancel-fire. We do NOT await the original `completion(...)` + // RPC settling before issuing this — that's the whole point of the + // racy code path. + // Object container so TS keeps `Error | null` across the .catch closure. + const cancelState: { error: Error | null } = { error: null }; + const cancelTask = cancel({ requestId: run.requestId }).catch((err) => { + cancelState.error = + err instanceof Error ? err : new Error(String(err)); + }); + + let lastEventType: string | null = null; + let lastStopReason: string | null = null; + let contentEvents = 0; + let totalEvents = 0; + let accumulatedText = ""; + for await (const event of run.events) { + totalEvents++; + lastEventType = event.type; + if (event.type === "contentDelta") { + contentEvents++; + accumulatedText += event.text; + } else if (event.type === "completionDone") { + lastStopReason = event.stopReason ?? null; + } + } + + await cancelTask; + + if (cancelState.error) { + return { + passed: false, + output: `cancel({ requestId }) rejected for an unknown id: ${cancelState.error.message}`, + }; + } + + if (lastEventType !== "completionDone") { + return { + passed: false, + output: `Expected last event to be completionDone, got ${lastEventType}. totalEvents=${totalEvents}, contentEvents=${contentEvents}`, + }; + } + + // Contract: the eventual outcome is a typed `cancelled` result, + // regardless of whether the cancel landed before or after begin() + // ran on the server. + if (lastStopReason !== "cancelled") { + return { + passed: false, + output: + `Expected stopReason "cancelled" (cancel-before-begin replayed retroactively), ` + + `got ${JSON.stringify(lastStopReason)}. ` + + `totalEvents=${totalEvents}, contentEvents=${contentEvents}`, + }; + } + + let finalRejected: unknown = null; + try { + await run.final; + return { + passed: false, + output: "run.final resolved instead of rejecting after cancel-before-begin", + }; + } catch (err) { + finalRejected = err; + } + + if (!(finalRejected instanceof InferenceCancelledError)) { + const msg = + finalRejected instanceof Error + ? finalRejected.message + : String(finalRejected); + return { + passed: false, + output: `Expected run.final to reject with InferenceCancelledError, got: ${msg}`, + }; + } + + if (finalRejected.requestId !== run.requestId) { + return { + passed: false, + output: `InferenceCancelledError.requestId mismatch: error=${finalRejected.requestId}, run=${run.requestId}`, + }; + } + + return { + passed: true, + output: + `Cancel-before-begin OK: totalEvents=${totalEvents}, contentEvents=${contentEvents}, ` + + `partial.text length=${accumulatedText.length}, stopReason=cancelled, ` + + `final rejected with InferenceCancelledError`, + }; + } + + async cancelThenResumeKvCache( + params: CancelThenResumeKvCacheParams, + _expectation: Expectation, + ): Promise { + const modelId = await this.resources.ensureLoaded("llm"); + const cancelAfterTokens = params.cancelAfterTokens ?? 3; + + try { + try { + await deleteCache({ kvCacheKey: params.cacheKey }); + } catch { + // ignore ENOENT — the test owns the cache key + } + + const firstRun = completion({ + modelId, + history: [ + { role: "system", content: "You are a helpful assistant." }, + { role: "user", content: params.firstUserMessage }, + ], + stream: true, + kvCache: params.cacheKey, + }); + + let receivedTokens = 0; + let firstCancelInvoked = false; + let firstCancelError: Error | null = null; + let firstStopReason: string | null = null; + + for await (const event of firstRun.events) { + if (event.type === "contentDelta") { + receivedTokens++; + if (!firstCancelInvoked && receivedTokens >= cancelAfterTokens) { + firstCancelInvoked = true; + try { + await cancel({ requestId: firstRun.requestId }); + } catch (err) { + firstCancelError = + err instanceof Error ? err : new Error(String(err)); + break; + } + } + } else if (event.type === "completionDone") { + firstStopReason = event.stopReason ?? null; + } + } + + if (firstCancelError) { + return { + passed: false, + output: `cancel({ requestId }) on first turn rejected: ${firstCancelError.message}`, + }; + } + if (!firstCancelInvoked) { + return { + passed: false, + output: `First completion ended before cancel (received ${receivedTokens} tokens, expected >=${cancelAfterTokens})`, + }; + } + + // Drain `final` so the run is fully settled before issuing the + // second turn. KvCacheSession.rollback runs inside the + // scope.defer chain, which is triggered when the response + // generator exits; we need that to have happened before turn 2. + let firstRejected: unknown = null; + try { + await firstRun.final; + } catch (err) { + firstRejected = err; + } + if (!(firstRejected instanceof InferenceCancelledError)) { + const msg = + firstRejected instanceof Error + ? firstRejected.message + : String(firstRejected); + return { + passed: false, + output: `First turn did not reject with InferenceCancelledError: ${msg}`, + }; + } + if (firstStopReason !== "cancelled") { + return { + passed: false, + output: `Expected first completionDone.stopReason === "cancelled", got ${JSON.stringify(firstStopReason)}`, + }; + } + + // KvCacheSession.rollback should have wiped the in-memory message + // counter, the initialized-cache flag, and the on-disk .bin — + // the second turn must therefore re-prime cleanly and produce a + // correct answer for an unrelated prompt. + const secondRun = completion({ + modelId, + history: [ + { role: "system", content: "You are a helpful assistant." }, + { role: "user", content: params.secondUserMessage }, + ], + stream: true, + kvCache: params.cacheKey, + }); + + let secondText = ""; + for await (const token of secondRun.tokenStream) { + secondText += token; + } + + const trimmed = secondText.trim(); + if (trimmed.length === 0) { + return { + passed: false, + output: + "Second completion on the same kvCache key returned an empty response " + + "after cancelling the previous streaming turn. Rollback likely left " + + "stale state in the KvCacheSession.", + }; + } + + const expected = params.expectedAnswerContains; + if (!trimmed.toLowerCase().includes(expected.toLowerCase())) { + return { + passed: false, + output: + `Second completion did not include expected token ${JSON.stringify(expected)} ` + + `after cancelling the previous turn. Got ${secondText.length} chars: ` + + `${JSON.stringify(secondText.slice(0, 200))}`, + }; + } + + return { + passed: true, + output: + `Cancel-then-resume KV-cache OK: cancelled after ${receivedTokens} tokens, ` + + `first turn rejected with InferenceCancelledError, second turn produced ` + + `${secondText.length} chars containing ${JSON.stringify(expected)}`, + }; + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error); + return { + passed: false, + output: `Cancel-then-resume KV-cache failed: ${errorMsg}`, + }; + } + } +} diff --git a/packages/sdk/tests-qvac/tests/test-definitions.ts b/packages/sdk/tests-qvac/tests/test-definitions.ts index 5def0da805..e67e64ea8a 100644 --- a/packages/sdk/tests-qvac/tests/test-definitions.ts +++ b/packages/sdk/tests-qvac/tests/test-definitions.ts @@ -34,6 +34,7 @@ import { configTests } from "./config-tests.js"; import { noLingeringBareTests } from "./no-lingering-bare-tests.js"; import { wrongModelTests } from "./wrong-model-tests.js"; import { multiGpuTests } from "./multi-gpu-tests.js"; +import { cancellationTests } from "./cancellation-tests.js"; // Model loading tests export const modelLoadLlm: TestDefinition = { @@ -290,6 +291,9 @@ export const tests = [ // Multi-GPU config smoke (verifies split-mode and main-gpu flow through stack) ...multiGpuTests, + // Typed cancel outcomes + KvCacheSession rollback e2e + ...cancellationTests, + // Additional model tests modelSwitchLlm, modelReloadAfterError, diff --git a/packages/sdk/utils/aggregate-events.ts b/packages/sdk/utils/aggregate-events.ts index 12c3ec3bc4..00a497d018 100644 --- a/packages/sdk/utils/aggregate-events.ts +++ b/packages/sdk/utils/aggregate-events.ts @@ -18,6 +18,14 @@ export type AggregatedEvents = { toolCalls: ToolCall[]; rawFullText: string | undefined; error: CompletionError | undefined; + /** + * True when the terminal `completionDone` carried + * `stopReason: "cancelled"`. The client wrapper rejects the + * promise-aggregates (`final` / `text` / `toolCalls` / `stats`) with + * `InferenceCancelledError` carrying the partial state when this is + * set; the `events` stream itself still ends normally. + */ + cancelled: boolean; }; export function aggregateEvents(events: CompletionEvent[]): AggregatedEvents { @@ -26,6 +34,7 @@ export function aggregateEvents(events: CompletionEvent[]): AggregatedEvents { let stats: CompletionStats | undefined; let rawFullText: string | undefined; let error: CompletionError | undefined; + let cancelled = false; const toolCalls: ToolCall[] = []; for (const event of events) { @@ -41,21 +50,45 @@ export function aggregateEvents(events: CompletionEvent[]): AggregatedEvents { if ("raw" in event && event.raw) { rawFullText = event.raw.fullText; } + // Error wins over cancelled if a wire event ever carries both + // signals: a mid-stream addon failure makes the partial state + // unsafe to expose, regardless of why the loop exited. if (event.stopReason === "error" && "error" in event) { error = event.error; + } else if (event.stopReason === "cancelled") { + cancelled = true; } } } - return { contentText, thinkingText, stats, toolCalls, rawFullText, error }; + return { + contentText, + thinkingText, + stats, + toolCalls, + rawFullText, + error, + cancelled, + }; } export function buildFinalFromEvents( events: CompletionEvent[], handlers: ToolHandlerMap, -): { final: CompletionFinal; error: CompletionError | undefined } { - const { contentText, thinkingText, stats, toolCalls, rawFullText, error } = - aggregateEvents(events); +): { + final: CompletionFinal; + error: CompletionError | undefined; + cancelled: boolean; +} { + const { + contentText, + thinkingText, + stats, + toolCalls, + rawFullText, + error, + cancelled, + } = aggregateEvents(events); const attachedToolCalls = attachHandlersToToolCalls(toolCalls, handlers); const fullText = rawFullText ?? contentText; @@ -75,5 +108,5 @@ export function buildFinalFromEvents( }), }; - return { final, error }; + return { final, error, cancelled }; } diff --git a/packages/sdk/utils/errors-server.ts b/packages/sdk/utils/errors-server.ts index 0384da54ee..dd98cbdb60 100644 --- a/packages/sdk/utils/errors-server.ts +++ b/packages/sdk/utils/errors-server.ts @@ -1,7 +1,25 @@ import { QvacErrorBase } from "@qvac/error"; import { SDK_SERVER_ERROR_CODES } from "@/schemas/sdk-errors-server"; +import type { CompletionStats, ToolCallWithCall } from "@/schemas"; import { createErrorOptions } from "./errors-base"; +/** + * Partial completion payload attached to `InferenceCancelledError` when a + * cancel hits mid-stream. Mirrors the named fields on `CompletionFinal` + * so callers who want the partial output can read `.partial.text`, + * `.partial.toolCalls`, `.partial.stats` directly without reaching for + * a `Partial` import. + * + * Fields are all optional: a same-tick cancel-before-begin races every + * event; a cancel after the first content chunk carries `text` but no + * `stats`; a cancel after a tool-call frame carries both. + */ +export interface InferenceCancelledPartial { + text?: string; + toolCalls?: ToolCallWithCall[]; + stats?: CompletionStats; +} + // ============== Model Registry Errors ============== export class ModelAlreadyRegisteredError extends QvacErrorBase { @@ -303,6 +321,56 @@ export class RequestNotFoundError extends QvacErrorBase { } } +/** + * Thrown when a long-running inference request was cancelled before + * completion. The `events` stream on `CompletionRun` ends normally with + * `stopReason: "cancelled"` on the last `completionDone`, but the + * promise-aggregates on the same run (`final` / `text` / `toolCalls` / + * `stats`) reject with this error so callers can't accidentally treat a + * cancelled run as a successful one. + * + * Carries: + * - `requestId` — correlates with `run.requestId` so callers know which + * in-flight request was cancelled when they fan out multiple cancels. + * - `partial` — whatever the aggregator accumulated up to the cancel + * point. Optional fields so consumers can opt into "show partial": + * + * try { await run.text } catch (err) { + * if (err instanceof InferenceCancelledError) { + * renderPartial(err.partial.text); + * } + * } + * + * The error is constructed client-side in + * `client/api/completion-stream.ts` when the wire stream ends with + * `stopReason: "cancelled"` — the partial payload comes from the + * client's own event aggregator. The class lives in `errors-server.ts` + * (and is re-exported from the package root) because the *semantic* + * origin of the cancel is server-side, and other handlers + * (embeddings, transcribe, …) will reuse the same class once their + * cancel surface lands. + */ +export class InferenceCancelledError extends QvacErrorBase { + readonly requestId: string; + readonly partial: InferenceCancelledPartial; + + constructor( + requestId: string, + partial: InferenceCancelledPartial = {}, + cause?: unknown, + ) { + super( + createErrorOptions( + SDK_SERVER_ERROR_CODES.INFERENCE_CANCELLED, + [requestId], + cause, + ), + ); + this.requestId = requestId; + this.partial = partial; + } +} + export class AsyncDisposeUnavailableError extends QvacErrorBase { constructor(cause?: unknown) { super(