diff --git a/.cursor/rules/sdk/docs/kv-cache-system.mdc b/.cursor/rules/sdk/docs/kv-cache-system.mdc index 94ce4e9d60..b8ae121d4e 100644 --- a/.cursor/rules/sdk/docs/kv-cache-system.mdc +++ b/.cursor/rules/sdk/docs/kv-cache-system.mdc @@ -88,20 +88,20 @@ When a new cache key is used for the first time: | File | Purpose | |------|---------| -| `server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts` | **`KvCacheSession` — single owner of the three KV-cache bookkeeping layers** (on-disk `.bin`, `initializedCaches`, `cachedMessageCounts`). Exposes `beginTurn` / `commitTurn` / `rollback` / `dropStaleSavedCount` plus the module-level `deleteKvCacheState(...)` administrative API. M2 (QVAC-18182). | +| `server/bare/plugins/llamacpp-completion/ops/kv-cache-session.ts` | **`KvCacheSession` — single owner of the three KV-cache bookkeeping layers** (on-disk `.bin`, `initializedCaches`, `cachedMessageCounts`). Exposes `beginTurn` / `commitTurn` / `rollback` / `dropStaleSavedCount` plus the module-level `deleteKvCacheState(...)` administrative API. (QVAC-18182). | | `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts` | Completion handler. Calls `session.beginTurn(...)`, registers `scope.defer(() => session.rollback(turn))` once, and calls `session.commitTurn(...)` on the happy path (which suppresses the deferred rollback). No direct references to the three layers. | | `server/bare/plugins/llamacpp-completion/ops/kv-cache-state.ts` | Pure `decideCachedHistorySlice(...)` helper used by the session — slice decision for the next addon call. No state. | | `server/bare/ops/kv-cache-utils.ts` | Path / hash / fs utilities: `getCacheFilePath`, `generateConfigHash`, `findMatchingCache`, `getCurrentCacheInfo`, `renameCacheFile`, `deleteCache`. No in-memory state. | | `server/bare/plugins/llamacpp-completion/ops/cache-logger.ts` | Debug logging for cache operations | -| `server/rpc/handlers/delete-cache.ts` | `handleDeleteCache` RPC entry point. Delegates to `deleteKvCacheState(...)` — zero direct references to the three layers (M2 deliverable 5). | +| `server/rpc/handlers/delete-cache.ts` | `handleDeleteCache` RPC entry point. Delegates to `deleteKvCacheState(...)` — zero direct references to the three layers. | | `server/utils/cache.ts` | `getKVCacheDir()` base directory | | `client/api/delete-cache.ts` | Client-side delete cache API | ## Key Behaviors -### `KvCacheSession` Ownership (M2) +### `KvCacheSession` Ownership -Before M2 the completion handler coordinated three independent bookkeeping layers around every cancel/error branch: +Before 0.11.0 the completion handler coordinated three independent bookkeeping layers around every cancel/error branch: 1. An in-memory `Set` of "initialized caches" (`kv-cache-utils.ts`). 2. A `Map` of saved-message counts (`kv-cache-state.ts`). @@ -109,7 +109,7 @@ Before M2 the completion handler coordinated three independent bookkeeping layer Three near-identical cleanup blocks in `completion-stream.ts` had to touch all three on every cancel / zero-token / rename-failed / tool-call exit. Any one of those blocks forgetting a layer produced the drift bugs the pitch documents (QVAC-17780 family). -**M2 collapses this into `KvCacheSession`**, the **single mutation point** for the three layers. The handler's loop is now: +**0.11.0 collapses this into `KvCacheSession`**, the **single mutation point** for the three layers. The handler's loop is now: ```typescript const session = createKvCacheSession(modelId); diff --git a/.cursor/rules/sdk/docs/request-lifecycle-system.mdc b/.cursor/rules/sdk/docs/request-lifecycle-system.mdc index c04c54e8d7..459365cf90 100644 --- a/.cursor/rules/sdk/docs/request-lifecycle-system.mdc +++ b/.cursor/rules/sdk/docs/request-lifecycle-system.mdc @@ -74,7 +74,7 @@ Concretely, Vercel's AI SDK (`streamText`) is a public-codebase example of the s `request-lifecycle-primitives.mdc` has the worked code examples and the dispatch-level truth table for which `RequestKind`s currently route through the registry. -For kinds that haven't been migrated onto the registry yet, the broad-cancel path (`cancel({ operation: , modelId })`) falls back to `addon.cancel()` directly — see the fallback in `server/bare/ops/cancel.ts`. The wire contract for non-migrated kinds is unchanged: callers continue to use `cancel({ operation: , modelId })` exactly as before. +Every server-side cancellable handler is on the registry as of 0.11.0. The broad-cancel path (`cancel({ modelId, kind? })` and its legacy `{ operation: "inference"|"embeddings", modelId }` aliases) is a single registry walk; the legacy pre-registry addon-cancel fallback in `server/bare/ops/cancel.ts` was removed in 0.11.0. Handlers whose addon declares `cancel: { scope: "none" }` (TTS, OCR, NMT, upscale) still respect a broad cancel at the registry layer — the in-flight call yields when `ctx.signal.aborted` flips on its next yield point — they just don't get a hard mid-decode abort. ## FAQ @@ -192,8 +192,8 @@ Test coverage: `same-tick cancel-before-begin retroactively aborts the later beg | `server/bare/runtime/with-request-context.ts` | `withRequestContext(logger, ctx)` — per-request logger wrapper prefixing every emit with the lifecycle correlation tuple | | `server/bare/runtime/request-id.ts` | UUID generation helper for caller-provided ids | | `server/bare/runtime/index.ts` | Public re-exports — handlers import from `@/server/bare/runtime` | -| `server/bare/ops/cancel.ts` | Broad-cancel op: registry-routed with addon fallback for non-migrated handlers | -| `server/rpc/handlers/cancelHandler.ts` | RPC entry point: dispatches by `operation` (inference / embeddings / request / downloadAsset / rag) | +| `server/bare/ops/cancel.ts` | Broad-cancel op: pure registry walk, legacy addon-cancel fallback removed in 0.11.0 | +| `server/rpc/handlers/cancelHandler.ts` | RPC entry point: 2-arm `request` / `broad` dispatch (5-arm union collapsed in 0.11.0). Targeted `request` goes through `RequestRegistry.cancel({ requestId })` plus an optional `markClearCacheForRequest(...)` for downloads; `broad` delegates to `server/bare/ops/cancel.ts` | | `server/rpc/handlers/delete-cache.ts` | Delegates to `deleteKvCacheState(...)` — zero direct references to the three KV-cache layers | | `server/bare/plugins/llamacpp-completion/plugin.ts` | Reference plugin manifest; declares `cancel: { scope: "model", hard: true }`; builds `withRequestContext(...)` once per request and threads it into `completion(...)`; `finetune` declares `{ scope: "model", hard: true }`; `translate` handler threads `requestId` into the shared bare op | | `server/bare/plugins/llamacpp-completion/ops/completion-stream.ts` | Reference implementation of the canonical handler shape; uses `KvCacheSession`; accepts a request-scoped `logger` | diff --git a/.cursor/rules/sdk/error-handling.mdc b/.cursor/rules/sdk/error-handling.mdc index 735fdd59ef..9cd4538b63 100644 --- a/.cursor/rules/sdk/error-handling.mdc +++ b/.cursor/rules/sdk/error-handling.mdc @@ -177,10 +177,24 @@ Located in `@/utils/errors-server` - `AttachmentNotFoundError` - Attachment not found - `CancelFailedError` - Cancel failed - `TextToSpeechFailedError` - TTS failed -- `RequestIdConflictError` (52417) - `registry.begin(...)` called with a `requestId` already present -- `RequestNotFoundError` (52418) - registry lookup miss (no in-flight request for the given id) -- `InferenceCancelledError` (52419) - cancelled inference run; carries `requestId` + `partial: { text?, toolCalls?, stats? }`. Constructed client-side on `stopReason: "cancelled"` (event stream ends normally; promise-aggregates reject with this). Re-exported from `@qvac/sdk` for `instanceof` checks. -- `RequestRejectedByPolicyError` (52420) - registry concurrency-policy admission failure (e.g. `oneAtATimePerModel`); carries `requestId`, `kind`, `modelId`, and a `reason` string. Re-exported from `@qvac/sdk` for `instanceof` checks. See `.cursor/rules/sdk/request-lifecycle-primitives.mdc` for the policy contract. +- `RequestIdConflictError` (52417) - `registry.begin(...)` called with a `requestId` already present. Carries `requestId`. Re-exported from `@qvac/sdk` for `instanceof` checks (reconstructed across RPC by the typed-error reconstructor — see "Typed errors across RPC" below). +- `RequestNotFoundError` (52418) - registry lookup miss (no in-flight request for the given id). Carries `requestId`. Re-exported from `@qvac/sdk` for `instanceof` checks (reconstructed across RPC). +- `InferenceCancelledError` (52419) - cancelled inference run; carries `requestId` + `partial: { text?, toolCalls?, stats? }`. Constructed client-side on `stopReason: "cancelled"` (event stream ends normally; promise-aggregates reject with this). Re-exported from `@qvac/sdk` for `instanceof` checks. **Not** RPC-reconstructed — client-side only. +- `RequestRejectedByPolicyError` (52420) - registry concurrency-policy admission failure (e.g. `oneAtATimePerModel`); carries `requestId`, `kind`, `modelId`, and a `reason` string. Re-exported from `@qvac/sdk` for `instanceof` checks (reconstructed across RPC). See `.cursor/rules/sdk/request-lifecycle-primitives.mdc` for the policy contract. + +### Typed errors across RPC + +Server-thrown `QvacError` subclasses that need to survive the RPC boundary as their original class (so `err instanceof RequestRejectedByPolicyError` works on the consumer side) are wired through a small reconstructor pipeline: + +1. The class extends `QvacErrorBase` and implements `toErrorResponseFields(): Record` listing the named constructor fields the client needs to rebuild it. The base envelope (`name`, `code`, `message`, `stack`, `timestamp`, `cause`) is already carried by `createErrorResponse(...)`; `typedFields` is the per-class extension. +2. The class is re-exported from `@qvac/sdk` (root `index.ts`). Forgetting this means consumers can't `import { Foo } from "@qvac/sdk"` even though the reconstructor builds a `Foo` instance, and `instanceof` regresses. +3. A row is added to the `RECONSTRUCTORS` map in `client/rpc/rpc-error.ts`, keyed by the class `name`. The row reads from `response.typedFields` (defaulting missing fields defensively) and forwards `response.cause`. + +`client/rpc/rpc-client.ts` calls `reconstructError(response)` instead of `new RPCError(response)`: a registered class is rebuilt; an unknown `name` falls through to `RPCError` so consumers using `code`-based predicates still work. + +Three classes are wired today: `RequestIdConflictError`, `RequestNotFoundError`, `RequestRejectedByPolicyError`. Add new rows whenever a new cross-RPC server-thrown class is introduced — the maintenance contract lives at the top of the `RECONSTRUCTORS` map. + +`InferenceCancelledError` is **not** in this map: it's constructed client-side in `client/api/completion-stream.ts` from the aggregated partial state on `stopReason: "cancelled"`. Adding a reconstructor for a client-constructed class creates a parallel construction path and is a smell. #### RAG Operations (52,800-52,999) - `RAGSaveFailedError` - Save failed diff --git a/.cursor/rules/sdk/request-lifecycle-primitives.mdc b/.cursor/rules/sdk/request-lifecycle-primitives.mdc index 92112cbaea..a9b31cc3ab 100644 --- a/.cursor/rules/sdk/request-lifecycle-primitives.mdc +++ b/.cursor/rules/sdk/request-lifecycle-primitives.mdc @@ -16,7 +16,7 @@ Server-side long-running operations (`completion`, `embeddings`, `transcribe`, ` - **`RequestContext`** — per-request handle bundling `requestId`, `kind`, `modelId`, `signal`, `scope`, `state`. - **`RequestRegistry`** — module-scoped registry that mints contexts via `begin(...)` and routes `cancel(...)` by `requestId` or `modelId`. -The contract below applies to every cancellable server-side handler. The truth table further down ("Truth table for built-in plugins") tracks each handler's addon-level cancel surface; the dispatch-level table ("What's on the registry today") tracks which `RequestKind`s are currently routed through the registry. Kinds not on the registry use the broad-cancel fallback in `server/bare/ops/cancel.ts`. +The contract below applies to every cancellable server-side handler. The truth table further down ("Truth table for built-in plugins") tracks each handler's addon-level cancel surface; the dispatch-level table ("What's on the registry today") tracks which `RequestKind`s are currently routed through the registry. As of 0.11.0 every handler in the SDK is registered — the legacy pre-registry addon-cancel fallback in `server/bare/ops/cancel.ts` has been removed. ## Canonical Handler Shape @@ -244,7 +244,7 @@ The truth table above describes the addon-level capability for plugin handlers. | `downloadAsset` | `server/rpc/handlers/download-asset.ts` | Hard (signal threaded to `resolveModelPath`) | Per-`requestId` cancel preserves the content-addressed dedup in `download-manager.ts` — two subscribers on the same `downloadKey` share one transfer, and the transfer aborts only when the **last** subscriber leaves. | | `rag` | `server/rpc/handlers/rag.ts` | Soft (workspace-bound; ingest/saveEmbeddings/reindex) | Dispatcher-level pre-emption: starting a new RAG op on a workspace cancels the prior in-flight op on the same workspace **before** `registry.begin(...)`. Workspace admission lives in the dispatcher rather than as a registry policy primitive. | -Kinds **not** in this table (e.g. `textToSpeech`, `ocr`, `diffusion`, `upscale`) still use the broad-cancel fallback in `server/bare/ops/cancel.ts`. +Kinds **not** in this table (e.g. `textToSpeech`, `ocr`, `diffusion`, `upscale`) declare `cancel: { scope: "none" }` at the addon level — they do not expose a hard mid-decode abort surface but still respect `cancel({ requestId })` and `cancel({ modelId })` at the registry layer (the in-flight call yields when `ctx.signal.aborted` flips on the next yield point). The broad-cancel path is a single registry walk; the per-kind fallback was removed in 0.11.0. ## Concurrency Policy @@ -361,11 +361,17 @@ await sdk.cancel({ requestId: op.requestId }); Cancel every in-flight request matching a `modelId` — for model unload, app shutdown, admin sweeps. Kept stable from pre-0.11.0: ```typescript +// Generic broad-cancel — preferred shape going forward (0.11.0+). +await sdk.cancel({ modelId }); +await sdk.cancel({ modelId, kind: "completion" }); +await sdk.cancel({ modelId, kind: "embeddings" }); + +// Legacy per-kind sugars — still supported via the client wrapper. await sdk.cancel({ operation: "inference", modelId }); await sdk.cancel({ operation: "embeddings", modelId }); ``` -Internally, both paths land on `RequestRegistry.cancel(...)`. The broad path falls back to `addon.cancel()` for handler kinds that haven't been registry-migrated yet — see the "What's on the registry today" table above for the current set of registry-routed kinds. +Internally every path lands on `RequestRegistry.cancel(...)`. The legacy pre-registry addon-cancel fallback in `server/bare/ops/cancel.ts` was removed in 0.11.0 — every handler is now on the registry, so a broad cancel is one registry walk and nothing else. See the "What's on the registry today" table above; it lists "all kinds" because the migration is complete in 0.11.0. ## Decorated-Promise Pattern diff --git a/packages/cli/src/serve/adapters/openai/routes/chat.ts b/packages/cli/src/serve/adapters/openai/routes/chat.ts index b9e11a7967..a39df91ce1 100644 --- a/packages/cli/src/serve/adapters/openai/routes/chat.ts +++ b/packages/cli/src/serve/adapters/openai/routes/chat.ts @@ -3,6 +3,7 @@ import { readBody, sendJson, sendError, initSSE, sendSSE, endSSE } from '../../. import { resolveModelAlias } from '../../../config.js' import { sdkCompletion } from '../../../core/sdk.js' import type { SDKTool, SDKGenerationParams, SDKResponseFormat } from '../../../core/sdk.js' +import { bindClientDisconnectCancel } from '../../../core/cancel-bridge.js' import { openaiMessagesToHistory, openaiToolsToSdk, @@ -96,9 +97,9 @@ export async function handleChatCompletions (req: IncomingMessage, res: ServerRe try { if (streaming) { - await handleStreamingCompletion(res, { sdkModelId, history, tools, generationParams, responseFormat, modelAlias, logger: ctx.logger }) + await handleStreamingCompletion(req, res, { sdkModelId, history, tools, generationParams, responseFormat, modelAlias, logger: ctx.logger }) } else { - await handleBlockingCompletion(res, { sdkModelId, history, tools, generationParams, responseFormat, modelAlias, logger: ctx.logger }) + await handleBlockingCompletion(req, res, { sdkModelId, history, tools, generationParams, responseFormat, modelAlias, logger: ctx.logger }) } } catch (err) { const message = err instanceof Error ? err.message : String(err) @@ -124,7 +125,7 @@ function completionTokensFromStats (text: string, stats: { generatedTokens?: num return text ? text.split(/\s+/).filter(Boolean).length : 0 } -async function handleBlockingCompletion (res: ServerResponse, params: CompletionParams): Promise { +async function handleBlockingCompletion (req: IncomingMessage, res: ServerResponse, params: CompletionParams): Promise { const result = await sdkCompletion({ modelId: params.sdkModelId, history: params.history, @@ -134,6 +135,12 @@ async function handleBlockingCompletion (res: ServerResponse, params: Completion responseFormat: params.responseFormat }) + // Bridge HTTP client disconnect → SDK cancel. Bound after the + // wrapper await but before any `await` on the result aggregates, + // so a fetch-abort mid-completion lands on the in-flight requestId + // before tokens have fully resolved. + bindClientDisconnectCancel(req, res, result.requestId, params.logger) + const text = await result.text const toolCalls = await result.toolCalls const stats = await result.stats @@ -171,7 +178,7 @@ async function handleBlockingCompletion (res: ServerResponse, params: Completion }) } -async function handleStreamingCompletion (res: ServerResponse, params: CompletionParams): Promise { +async function handleStreamingCompletion (req: IncomingMessage, res: ServerResponse, params: CompletionParams): Promise { const result = await sdkCompletion({ modelId: params.sdkModelId, history: params.history, @@ -181,6 +188,13 @@ async function handleStreamingCompletion (res: ServerResponse, params: Completio responseFormat: params.responseFormat }) + // Bridge HTTP client disconnect → SDK cancel. The synchronous + // `result.requestId` (decorated on the `CompletionRun`) is what makes + // this work: we can bind the listener before the first SSE frame + // streams, so a fetch-abort during inference aborts the in-flight + // SDK request rather than letting it run to natural completion. + bindClientDisconnectCancel(req, res, result.requestId, params.logger) + initSSE(res) const id = `chatcmpl-${randomId()}` diff --git a/packages/cli/src/serve/adapters/openai/routes/embeddings.ts b/packages/cli/src/serve/adapters/openai/routes/embeddings.ts index c4ce4f55b4..5ac7be5ef9 100644 --- a/packages/cli/src/serve/adapters/openai/routes/embeddings.ts +++ b/packages/cli/src/serve/adapters/openai/routes/embeddings.ts @@ -2,6 +2,7 @@ import type { IncomingMessage, ServerResponse } from 'node:http' import { readBody, sendJson, sendError } from '../../../http.js' import { resolveModelAlias } from '../../../config.js' import { sdkEmbed } from '../../../core/sdk.js' +import { bindClientDisconnectCancel } from '../../../core/cancel-bridge.js' import type { RouteContext } from '../../types.js' export async function handleEmbeddings (req: IncomingMessage, res: ServerResponse, ctx: RouteContext): Promise { @@ -60,11 +61,18 @@ export async function handleEmbeddings (req: IncomingMessage, res: ServerRespons ctx.logger.info(` embed model=${modelAlias} inputs=${inputs.length}`) try { - const embeddings = await sdkEmbed({ + const op = await sdkEmbed({ modelId: sdkModelId, text: inputs.length === 1 ? inputs[0]! : inputs }) + // Bind the disconnect bridge before awaiting the result so a + // client-abort during a long batch embed lands on the in-flight + // requestId rather than completing the whole batch. + bindClientDisconnectCancel(req, res, op.requestId, ctx.logger) + + const embeddings = await op.result + const isBatch = Array.isArray(embeddings[0]) const vectors = isBatch ? embeddings as number[][] : [embeddings as number[]] diff --git a/packages/cli/src/serve/adapters/openai/routes/transcriptions.ts b/packages/cli/src/serve/adapters/openai/routes/transcriptions.ts index 63d87a26de..c3303b4393 100644 --- a/packages/cli/src/serve/adapters/openai/routes/transcriptions.ts +++ b/packages/cli/src/serve/adapters/openai/routes/transcriptions.ts @@ -3,6 +3,7 @@ import { sendJson, sendText, sendError } from '../../../http.js' import { readMultipart } from '../../../multipart.js' import { resolveModelAlias } from '../../../config.js' import { sdkTranscribe } from '../../../core/sdk.js' +import { bindClientDisconnectCancel } from '../../../core/cancel-bridge.js' import type { RouteContext } from '../../types.js' const SUPPORTED_RESPONSE_FORMATS = new Set(['json', 'text']) @@ -87,13 +88,20 @@ export async function handleTranscriptions (req: IncomingMessage, res: ServerRes ctx.logger.info(` transcribe model=${alias} file=${file.fileName} size=${fileSizeKB}KB format=${responseFormat}${prompt ? ' prompt=yes' : ''}`) try { - const text = await sdkTranscribe({ + const op = await sdkTranscribe({ modelId: sdkModelId, audioChunk: file.data, fileName: file.fileName, prompt }) + // Bind the disconnect bridge before awaiting — long audio files + // take tens of seconds to transcribe and are one of the highest- + // value cancel targets in the CLI. + bindClientDisconnectCancel(req, res, op.requestId, ctx.logger) + + const text = await op.result + ctx.logger.info(` transcribe done chars=${text.length}`) if (responseFormat === 'text') { diff --git a/packages/cli/src/serve/adapters/openai/routes/translations.ts b/packages/cli/src/serve/adapters/openai/routes/translations.ts index 2c01c9eed8..2c522e3bde 100644 --- a/packages/cli/src/serve/adapters/openai/routes/translations.ts +++ b/packages/cli/src/serve/adapters/openai/routes/translations.ts @@ -3,6 +3,7 @@ import { sendJson, sendText, sendError } from '../../../http.js' import { readMultipart } from '../../../multipart.js' import { resolveModelAlias } from '../../../config.js' import { sdkTranscribe } from '../../../core/sdk.js' +import { bindClientDisconnectCancel } from '../../../core/cancel-bridge.js' import type { RouteContext } from '../../types.js' const SUPPORTED_RESPONSE_FORMATS = new Set(['json', 'text']) @@ -100,12 +101,14 @@ export async function handleTranslations (req: IncomingMessage, res: ServerRespo const transcribe = ctx.transcribeOverride ?? sdkTranscribe try { - const text = await transcribe({ + const op = await transcribe({ modelId: sdkModelId, audioChunk: file.data, fileName: file.fileName, prompt }) + bindClientDisconnectCancel(req, res, op.requestId, ctx.logger) + const text = await op.result ctx.logger.info(` translate done chars=${text.length}`) diff --git a/packages/cli/src/serve/adapters/types.ts b/packages/cli/src/serve/adapters/types.ts index ce6fc7f312..92c2afab7f 100644 --- a/packages/cli/src/serve/adapters/types.ts +++ b/packages/cli/src/serve/adapters/types.ts @@ -20,7 +20,7 @@ export interface RouteContext { audioChunk: Buffer fileName: string prompt?: string | undefined - }) => Promise + }) => Promise<{ requestId: string; result: Promise }> } export type RouteHandler = (req: IncomingMessage, res: ServerResponse, ctx: RouteContext) => Promise | void diff --git a/packages/cli/src/serve/core/cancel-bridge.ts b/packages/cli/src/serve/core/cancel-bridge.ts new file mode 100644 index 0000000000..27abca7a82 --- /dev/null +++ b/packages/cli/src/serve/core/cancel-bridge.ts @@ -0,0 +1,46 @@ +import type { IncomingMessage, ServerResponse } from 'node:http' +import { sdkCancel } from './sdk.js' +import type { Logger } from '../../logger.js' + +/** + * Bind the HTTP request lifecycle to an SDK `requestId` so a client + * disconnect (browser tab closed, `fetch().abort()`, network drop) + * cancels the underlying SDK call promptly. + * + * The bridge listens for the `close` event on the incoming request: + * - If the response has already finished (`res.writableEnded`), the + * request completed naturally and we skip the cancel — firing one + * would log a spurious "no in-flight request matched" line on the + * worker without doing anything useful. + * - Otherwise the client disappeared mid-stream and we issue a + * targeted `cancel({ requestId })` so the SDK handler stops + * yielding tokens / running inference / fetching bytes. + * + * Fire-and-forget by design. `req.on('close')` is synchronous and + * `sdk.cancel(...)` runs over RPC; awaiting it inside the listener + * would block the Node event loop on every disconnect. The `.catch` + * swallows cancel-after-end races — by the time `close` fires the + * server may have already settled the request from the other side, in + * which case the registry walk finds nothing. + * + * Per-route binding (not middleware-style on the server) is intentional: + * the OpenAI routes have different SDK-wrapper shapes + * (`sdkCompletion` / `sdkEmbed` / `sdkTranscribe`) and surface + * `requestId` slightly differently. Lifting to middleware buys nothing + * until a fourth long-running route shows up. + */ +export function bindClientDisconnectCancel ( + req: IncomingMessage, + res: ServerResponse, + requestId: string, + logger: Logger +): void { + const onClose = () => { + if (res.writableEnded) return + sdkCancel({ requestId }).catch((err: unknown) => { + const message = err instanceof Error ? err.message : String(err) + logger.debug(` cancel-on-disconnect failed for requestId=${requestId}: ${message}`) + }) + } + req.once('close', onClose) +} diff --git a/packages/cli/src/serve/core/sdk.ts b/packages/cli/src/serve/core/sdk.ts index af7d295f20..be2eb22c33 100644 --- a/packages/cli/src/serve/core/sdk.ts +++ b/packages/cli/src/serve/core/sdk.ts @@ -39,7 +39,7 @@ export interface RagSearchResult { } interface SDKModule { - loadModel: (opts: { modelSrc: string; modelType: string; modelConfig: Record }) => Promise + loadModel: (opts: { modelSrc: string; modelType: string; modelConfig: Record }) => Promise & { requestId: string } unloadModel: (opts: { modelId: string }) => Promise completion: (opts: { modelId: string @@ -48,9 +48,10 @@ interface SDKModule { tools?: SDKTool[] generationParams?: SDKGenerationParams responseFormat?: SDKResponseFormat - }) => Promise - embed: (opts: { modelId: string; text: string | string[] }) => Promise<{ embedding: number[] | number[][]; stats?: Record }> - transcribe: (opts: { modelId: string; audioChunk: string | Buffer; prompt?: string }) => Promise + }) => CompletionResult + embed: (opts: { modelId: string; text: string | string[] }) => Promise<{ embedding: number[] | number[][]; stats?: Record }> & { requestId: string } + transcribe: (opts: { modelId: string; audioChunk: string | Buffer; prompt?: string }) => Promise & { requestId: string } + cancel: (opts: { requestId: string } | { operation: "request"; requestId: string } | { operation: "broad"; modelId: string; kind?: string }) => Promise diffusion: (opts: SDKDiffusionParams) => SDKDiffusionResult ragListWorkspaces: () => Promise ragSearch: (opts: { @@ -137,6 +138,7 @@ export interface CompletionRunStats { } export interface CompletionResult { + requestId: string text: Promise stats: Promise toolCalls: Promise @@ -252,16 +254,33 @@ export async function sdkCompletion (opts: { if (opts.responseFormat) { params['responseFormat'] = opts.responseFormat } + // `completion(...)` returns a `CompletionRun` synchronously — the + // client-generated `requestId` is reachable on the result before any + // network round-trip, which is what the CLI cancel bridge in + // `routes/chat.ts` depends on to bind `req.on('close')` immediately. return completion(params as Parameters[0]) } +export interface SDKEmbedRun { + requestId: string + result: Promise +} + export async function sdkEmbed (opts: { modelId: string text: string | string[] -}): Promise { +}): Promise { const { embed } = await getSDK() - const { embedding } = await embed({ modelId: opts.modelId, text: opts.text }) - return embedding + const op = embed({ modelId: opts.modelId, text: opts.text }) + return { + requestId: op.requestId, + result: op.then((r) => r.embedding) + } +} + +export interface SDKTranscribeRun { + requestId: string + result: Promise } export async function sdkTranscribe (opts: { @@ -269,7 +288,7 @@ export async function sdkTranscribe (opts: { audioChunk: Buffer fileName: string prompt?: string | undefined -}): Promise { +}): Promise { const fs = await import('node:fs') const os = await import('node:os') const path = await import('node:path') @@ -279,16 +298,33 @@ export async function sdkTranscribe (opts: { const tmpFile = path.join(os.tmpdir(), `qvac-audio-${id}${ext}`) fs.writeFileSync(tmpFile, opts.audioChunk) - try { - const { transcribe } = await getSDK() - return await transcribe({ - modelId: opts.modelId, - audioChunk: tmpFile, - ...(opts.prompt && { prompt: opts.prompt }) - }) - } finally { + const { transcribe } = await getSDK() + const op = transcribe({ + modelId: opts.modelId, + audioChunk: tmpFile, + ...(opts.prompt && { prompt: opts.prompt }) + }) + + // Tie tempfile cleanup to the promise resolution so the synchronous + // `requestId` surface stays usable while the underlying transcription + // is still in flight. + const result = op.finally(() => { try { fs.unlinkSync(tmpFile) } catch {} - } + }) + + return { requestId: op.requestId, result } +} + +/** + * Targeted cancel by `requestId`. The CLI's per-route disconnect bridges + * (`routes/chat.ts`, `routes/embeddings.ts`, `routes/transcriptions.ts`) + * call this when the HTTP client disconnects mid-stream so the + * underlying SDK request stops running on the worker. Fire-and-forget + * from the listener — never `await` inside `req.on('close')`. + */ +export async function sdkCancel (opts: { requestId: string }): Promise { + const { cancel } = await getSDK() + await cancel({ requestId: opts.requestId }) } export interface SDKDiffusionRunResult { diff --git a/packages/cli/test/cancel-bridge.test.ts b/packages/cli/test/cancel-bridge.test.ts new file mode 100644 index 0000000000..0c49ad9976 --- /dev/null +++ b/packages/cli/test/cancel-bridge.test.ts @@ -0,0 +1,118 @@ +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { EventEmitter } from 'node:events' +import type { IncomingMessage, ServerResponse } from 'node:http' +import type { Logger } from '../src/logger.js' + +/** + * The cancel-bridge module pulls in `core/sdk.ts`, which imports `@qvac/sdk` + * at module load time. We don't want to drag the entire SDK into a unit test, + * so we re-implement the helper here in-line and assert on the same contract. + * The actual implementation under `src/serve/core/cancel-bridge.ts` is one + * import away (`sdkCancel`) and identical in shape — any drift between the + * test and the implementation surfaces when this file is updated alongside + * any cancel-bridge change in the same PR. + */ +function bindClientDisconnectCancel ( + req: IncomingMessage, + res: ServerResponse, + requestId: string, + logger: Logger, + sdkCancel: (opts: { requestId: string }) => Promise +): void { + const onClose = () => { + if (res.writableEnded) return + sdkCancel({ requestId }).catch((err: unknown) => { + const message = err instanceof Error ? err.message : String(err) + logger.debug(` cancel-on-disconnect failed for requestId=${requestId}: ${message}`) + }) + } + req.once('close', onClose) +} + +function makeLogger (): Logger & { debugs: string[] } { + const debugs: string[] = [] + return { + error () {}, + warn () {}, + info () {}, + debug (m: string) { + debugs.push(m) + }, + debugs + } as unknown as Logger & { debugs: string[] } +} + +function makeReq (): IncomingMessage { + return new EventEmitter() as unknown as IncomingMessage +} + +function makeRes (initial: { writableEnded?: boolean } = {}): ServerResponse { + return { writableEnded: initial.writableEnded ?? false } as unknown as ServerResponse +} + +describe('bindClientDisconnectCancel', () => { + it('fires sdkCancel with the bound requestId on req close', async () => { + const req = makeReq() + const res = makeRes() + const cancels: { requestId: string }[] = [] + bindClientDisconnectCancel(req, res, 'rid-1', makeLogger(), async (opts) => { + cancels.push(opts) + }) + + req.emit('close') + // sdkCancel is awaited inside the .catch; let the microtask queue drain + await Promise.resolve() + await Promise.resolve() + + assert.equal(cancels.length, 1) + assert.equal(cancels[0]?.requestId, 'rid-1') + }) + + it('skips sdkCancel when the response already finished', async () => { + const req = makeReq() + const res = makeRes({ writableEnded: true }) + let called = 0 + bindClientDisconnectCancel(req, res, 'rid-2', makeLogger(), async () => { + called++ + }) + + req.emit('close') + await Promise.resolve() + + assert.equal(called, 0, 'natural completion should not log a benign no-op cancel') + }) + + it('swallows sdkCancel rejections without propagating', async () => { + const req = makeReq() + const res = makeRes() + const logger = makeLogger() + bindClientDisconnectCancel(req, res, 'rid-3', logger, async () => { + throw new Error('cancel race lost') + }) + + req.emit('close') + await Promise.resolve() + await Promise.resolve() + + assert.equal(logger.debugs.length, 1) + assert.match(logger.debugs[0]!, /rid-3/) + assert.match(logger.debugs[0]!, /cancel race lost/) + }) + + it('binds via req.once so a second close event does not fire sdkCancel twice', async () => { + const req = makeReq() + const res = makeRes() + let called = 0 + bindClientDisconnectCancel(req, res, 'rid-4', makeLogger(), async () => { + called++ + }) + + req.emit('close') + req.emit('close') + await Promise.resolve() + await Promise.resolve() + + assert.equal(called, 1) + }) +}) diff --git a/packages/cli/test/translations.test.ts b/packages/cli/test/translations.test.ts index 79296aa394..7ede4ca7bd 100644 --- a/packages/cli/test/translations.test.ts +++ b/packages/cli/test/translations.test.ts @@ -337,7 +337,7 @@ describe('handleTranslations', () => { registry, serveConfig, logger, - transcribeOverride: async () => 'hello' + transcribeOverride: async () => ({ requestId: 'rid-test', result: Promise.resolve('hello') }) }) assert.equal(res.getStatus(), 200) const j = JSON.parse(res.getPayload()) as { text: string } @@ -365,7 +365,7 @@ describe('handleTranslations', () => { registry, serveConfig, logger: makeLogger(), - transcribeOverride: async () => 'out' + transcribeOverride: async () => ({ requestId: 'rid-test', result: Promise.resolve('out') }) }) assert.equal(res.getStatus(), 200) const j = JSON.parse(res.getPayload()) as { text: string } @@ -396,7 +396,7 @@ describe('handleTranslations', () => { registry, serveConfig, logger: makeLogger(), - transcribeOverride: async () => 'plain' + transcribeOverride: async () => ({ requestId: 'rid-test', result: Promise.resolve('plain') }) }) assert.equal(res.getStatus(), 200) assert.equal(res.getPayload(), 'plain') diff --git a/packages/sdk/client/api/cancel.ts b/packages/sdk/client/api/cancel.ts index 9a3e4bf8b3..bfd67b47a1 100644 --- a/packages/sdk/client/api/cancel.ts +++ b/packages/sdk/client/api/cancel.ts @@ -10,59 +10,33 @@ import { InvalidResponseError, CancelFailedError } from "@/utils/errors-client"; * Cancels an ongoing operation. * * Two cancel paths are supported: - * - * - **By `requestId`** (introduced in 0.11.0, primary path) — pass the - * `requestId` exposed on the result of a long-running call (e.g. - * `(await completion({ ... })).requestId`) to cancel exactly that - * request. Either pass `{ requestId }` directly or the explicit - * `{ operation: "request", requestId }` form; both are equivalent. - * The cancel takes effect once the server has begun the request; a - * cancel that races the originating call to the worker may arrive - * before the request is registered and is logged as a no-match. - * - **By `modelId`** (broad-cancel escape hatch, kept indefinitely) — - * `{ operation: "inference" | "embeddings", modelId }` cancels every - * in-flight request running on that model. Useful for model unload, - * app shutdown, or "cancel everything" admin paths where the caller + * - **By `requestId`** (primary) — pass the `requestId` exposed on + * the result of a long-running call (`completion(...)`, + * `loadModel(...)`, `downloadAsset(...)`, `embed(...)`, + * `transcribe(...)`, `ragIngest(...)`, etc.) to cancel exactly + * that request. A cancel that races the originating call is + * recorded and applied retroactively when the begin arrives. + * - **Broad by `modelId`** (escape hatch) — `{ modelId, kind? }` + * cancels every in-flight request on that model. Useful for + * model unload, app shutdown, or admin sweeps where the caller * doesn't have a `requestId` to hand. * - * The download and RAG cancel paths are unchanged in 0.11.0; they still - * route through their own existing handlers. + * The legacy `{ operation: "inference" | "embeddings", modelId }` + * sugars remain callable for source compatibility. For migration off + * the removed `{ operation: "downloadAsset" | "rag" }` shapes, see + * the 0.11.0 changelog / release notes. * - * @param params - The parameters for the cancellation - * @throws {QvacErrorBase} When the response type is invalid or when the cancellation fails + * @param params - The cancellation parameters. + * @throws {QvacErrorBase} When the response type is invalid or the cancellation fails. * * @example - * // Cancel a specific completion by requestId (new in 0.11.0) + * // Cancel by requestId (primary path) * const run = completion({ ... }); * await cancel({ requestId: run.requestId }); * * @example - * // Broad-cancel every inference running on a model (escape hatch) - * await cancel({ operation: "inference", modelId: "model-123" }); - * - * @example - * // Pause download (preserves partial file for automatic resume) - * await cancel({ operation: "downloadAsset", downloadKey: "download-key" }); - * - * @example - * // Cancel download completely (deletes partial file) - * await cancel({ operation: "downloadAsset", downloadKey: "download-key", clearCache: true }); - * - * @example - * // Cancel delegated remote download - * await cancel({ - * operation: "downloadAsset", - * downloadKey: "download-key", - * delegate: { providerPublicKey: "peerHex" }, - * }); - * - * @example - * // Cancel RAG operation on default workspace - * await cancel({ operation: "rag" }); - * - * @example - * // Cancel RAG operation on specific workspace - * await cancel({ operation: "rag", workspace: "my-workspace" }); + * // Broad-cancel every inference running on a model + * await cancel({ modelId: "model-123", kind: "completion" }); */ export async function cancel(params: CancelClientInput) { const wireParams = normalizeCancelParams(params); @@ -82,8 +56,38 @@ export async function cancel(params: CancelClientInput) { } function normalizeCancelParams(params: CancelClientInput): CancelParams { - if (!("operation" in params) && "requestId" in params) { - return { operation: "request", requestId: params.requestId }; + if ("operation" in params) { + if (params.operation === "request" || params.operation === "broad") { + return params; + } + // Legacy per-kind sugar: { operation: "inference"|"embeddings", modelId } + if (params.operation === "inference") { + return { + operation: "broad", + modelId: params.modelId, + kind: "completion", + }; + } + return { operation: "broad", modelId: params.modelId, kind: "embeddings" }; + } + + if ("requestId" in params) { + const wire: CancelParams = { + operation: "request", + requestId: params.requestId, + }; + if (params.clearCache !== undefined) { + wire.clearCache = params.clearCache; + } + return wire; + } + + const broad: CancelParams = { + operation: "broad", + modelId: params.modelId, + }; + if (params.kind !== undefined) { + broad.kind = params.kind; } - return params; + return broad; } diff --git a/packages/sdk/client/api/embed.ts b/packages/sdk/client/api/embed.ts index ce69c5d8f3..c0bb689d95 100644 --- a/packages/sdk/client/api/embed.ts +++ b/packages/sdk/client/api/embed.ts @@ -6,6 +6,8 @@ import { type RPCOptions, } from "@/schemas"; import { InvalidResponseError } from "@/utils/errors-client"; +import { decoratePromise } from "@/utils/decorate-promise"; +import { generateClientRequestId } from "@/client/api/client-request-id"; /** * Generates embeddings for a single text using a specified model. @@ -15,13 +17,13 @@ import { InvalidResponseError } from "@/utils/errors-client"; * @param params.modelId - The identifier of the embedding model to use * @param params.text - The input text to embed * @param options - Optional RPC options including per-call profiling - * @returns A promise resolving to an object with `embedding` (a single `number[]` vector) and optional `stats` performance data. + * @returns A promise (decorated with `requestId`) resolving to an object with `embedding` (a single `number[]` vector) and optional `stats` performance data. * @throws {QvacErrorBase} When the response type is invalid or when the embedding fails */ -export async function embed( +export function embed( params: { modelId: string; text: string }, options?: RPCOptions, -): Promise<{ embedding: number[]; stats?: EmbedStats }>; +): Promise<{ embedding: number[]; stats?: EmbedStats }> & { requestId: string }; /** * Generates embeddings for multiple texts using a specified model. @@ -31,21 +33,41 @@ export async function embed( * @param params.modelId - The identifier of the embedding model to use * @param params.text - The input texts to embed * @param options - Optional RPC options including per-call profiling - * @returns A promise resolving to an object with `embedding` (one `number[]` vector per input text, i.e. `number[][]`) and optional `stats` performance data. + * @returns A promise (decorated with `requestId`) resolving to an object with `embedding` (one `number[]` vector per input text, i.e. `number[][]`) and optional `stats` performance data. * @throws {QvacErrorBase} When the response type is invalid or when the embedding fails */ -export async function embed( +export function embed( params: { modelId: string; text: string[] }, options?: RPCOptions, -): Promise<{ embedding: number[][]; stats?: EmbedStats }>; +): Promise<{ embedding: number[][]; stats?: EmbedStats }> & { + requestId: string; +}; -export async function embed( +export function embed( params: EmbedParams, options?: RPCOptions, +): Promise<{ embedding: number[] | number[][]; stats?: EmbedStats }> & { + requestId: string; +} { + // Client-generated `requestId` is surfaced synchronously on the + // returned promise so the caller can `cancel({ requestId })` before + // `await` resolves. The same id is threaded onto the wire envelope so + // the server's registry entry uses it as the canonical key — + // matching the `loadModel` / `downloadAsset` / `completion` shape. + const requestId = generateClientRequestId(); + const inner = runEmbed(params, requestId, options); + return decoratePromise(inner, { requestId }); +} + +async function runEmbed( + params: EmbedParams, + requestId: string, + options?: RPCOptions, ): Promise<{ embedding: number[] | number[][]; stats?: EmbedStats }> { const request: EmbedRequest = { type: "embed", ...params, + requestId, }; const response = await send(request, options); diff --git a/packages/sdk/client/api/rag.ts b/packages/sdk/client/api/rag.ts index 3bb55dab51..bb859046a0 100644 --- a/packages/sdk/client/api/rag.ts +++ b/packages/sdk/client/api/rag.ts @@ -31,6 +31,8 @@ import { RAGCloseWorkspaceFailedError, RAGListWorkspacesFailedError, } from "@/utils/errors-client"; +import { generateClientRequestId } from "@/client/api/client-request-id"; +import { decoratePromise } from "@/utils/decorate-promise"; // ============== Chunk ============== @@ -125,9 +127,21 @@ export async function ragChunk( * }); * ``` */ -export async function ragIngest( +export function ragIngest( params: RagIngestParams, options?: RPCOptions, +): Promise<{ processed: RagSaveEmbeddingsResult[]; droppedIndices: number[] }> & { + requestId: string; +} { + const requestId = generateClientRequestId(); + const inner = runRagIngest(params, requestId, options); + return decoratePromise(inner, { requestId }); +} + +async function runRagIngest( + params: RagIngestParams, + requestId: string, + options?: RPCOptions, ): Promise<{ processed: RagSaveEmbeddingsResult[]; droppedIndices: number[] }> { const { onProgress, ...requestParams } = params; @@ -137,6 +151,7 @@ export async function ragIngest( ...requestParams, chunk: requestParams.chunk ?? true, withProgress: onProgress ? true : undefined, + requestId, }; if (onProgress) { @@ -221,8 +236,18 @@ export async function ragIngest( * }); * ``` */ -export async function ragSaveEmbeddings( +export function ragSaveEmbeddings( + params: RagSaveEmbeddingsParams, + options?: RPCOptions, +): Promise & { requestId: string } { + const requestId = generateClientRequestId(); + const inner = runRagSaveEmbeddings(params, requestId, options); + return decoratePromise(inner, { requestId }); +} + +async function runRagSaveEmbeddings( params: RagSaveEmbeddingsParams, + requestId: string, options?: RPCOptions, ): Promise { const { onProgress, ...requestParams } = params; @@ -232,6 +257,7 @@ export async function ragSaveEmbeddings( operation: "saveEmbeddings", ...requestParams, withProgress: onProgress ? true : undefined, + requestId, }; if (onProgress) { @@ -420,8 +446,18 @@ export async function ragDeleteEmbeddings( * }); * ``` */ -export async function ragReindex( +export function ragReindex( + params: RagReindexParams, + options?: RPCOptions, +): Promise & { requestId: string } { + const requestId = generateClientRequestId(); + const inner = runRagReindex(params, requestId, options); + return decoratePromise(inner, { requestId }); +} + +async function runRagReindex( params: RagReindexParams, + requestId: string, options?: RPCOptions, ): Promise { const { onProgress, ...requestParams } = params; @@ -431,6 +467,7 @@ export async function ragReindex( operation: "reindex", ...requestParams, withProgress: onProgress ? true : undefined, + requestId, }; if (onProgress) { diff --git a/packages/sdk/client/api/transcribe.ts b/packages/sdk/client/api/transcribe.ts index b7f9472359..13eca3e865 100644 --- a/packages/sdk/client/api/transcribe.ts +++ b/packages/sdk/client/api/transcribe.ts @@ -17,10 +17,15 @@ import { import { stream, duplex, type DuplexReadable } from "@/client/rpc/rpc-client"; import { getClientLogger } from "@/logging"; import { TranscriptionFailedError } from "@/utils/errors-client"; +import { decoratePromise } from "@/utils/decorate-promise"; +import { generateClientRequestId } from "@/client/api/client-request-id"; const logger = getClientLogger(); -function buildTranscribeRequest(params: TranscribeClientParams): TranscribeRequest { +function buildTranscribeRequest( + params: TranscribeClientParams, + requestId: string, +): TranscribeRequest { return { type: "transcribe", modelId: params.modelId, @@ -30,6 +35,7 @@ function buildTranscribeRequest(params: TranscribeClientParams): TranscribeReque : { type: "base64", value: params.audioChunk.toString("base64") }, ...(params.prompt && { prompt: params.prompt }), ...(params.metadata === true && { metadata: true }), + requestId, }; } @@ -45,22 +51,41 @@ function buildTranscribeRequest(params: TranscribeClientParams): TranscribeReque * segments (`{ text, startMs, endMs, append, id }`) * instead of joined text. Whisper engine only. * @param options - Optional RPC options including per-call profiling - * @returns The complete transcribed text, or — when `metadata` is true — - * the list of transcript segments in emission order. + * @returns A promise (decorated with `requestId`) resolving to the + * complete transcribed text, or — when `metadata` is true — + * the list of transcript segments in emission order. The + * `requestId` is reachable synchronously so callers can target + * this in-flight transcription with `cancel({ requestId })` + * before `await` resolves. */ export function transcribe( params: TranscribeClientParams & { metadata: true }, options?: RPCOptions, -): Promise; +): Promise & { requestId: string }; +export function transcribe( + params: TranscribeClientParams, + options?: RPCOptions, +): Promise & { requestId: string }; export function transcribe( params: TranscribeClientParams, options?: RPCOptions, -): Promise; -export async function transcribe( +): Promise & { requestId: string } { + // Client-generated id surfaced synchronously on the returned promise + // — same shape as `loadModel` / `downloadAsset` / `completion`. The + // CLI cancel bridge in `qvac serve` binds `req.on('close')` to + // `cancel({ requestId })` immediately after the call returns so a + // client disconnect aborts the in-flight transcription. + const requestId = generateClientRequestId(); + const inner = runTranscribe(params, requestId, options); + return decoratePromise(inner, { requestId }); +} + +async function runTranscribe( params: TranscribeClientParams, + requestId: string, options?: RPCOptions, ): Promise { - const request = buildTranscribeRequest(params); + const request = buildTranscribeRequest(params, requestId); if (params.metadata === true) { const segments: TranscribeSegment[] = []; @@ -189,7 +214,7 @@ async function* streamTranscribeValues( options: RPCOptions | undefined, extract: (parsed: TranscribeResponse) => T | undefined, ): AsyncGenerator { - const request = buildTranscribeRequest(params); + const request = buildTranscribeRequest(params, generateClientRequestId()); for await (const response of stream(request, options)) { if (response.type === "transcribe") { diff --git a/packages/sdk/client/rpc/rpc-client.ts b/packages/sdk/client/rpc/rpc-client.ts index fbb0283583..cfbbef6233 100644 --- a/packages/sdk/client/rpc/rpc-client.ts +++ b/packages/sdk/client/rpc/rpc-client.ts @@ -7,7 +7,7 @@ import { type Response, type RPCOptions, } from "@/schemas"; -import { RPCError } from "./rpc-error"; +import { reconstructError } from "./rpc-error"; import { withTimeout, withTimeoutStream } from "@/utils/withTimeout"; import { getClientLogger, summarizeRequest } from "@/logging"; import { getRPC, close as closeRPC, createDuplexSession } from "#rpc"; @@ -46,7 +46,12 @@ function getNextCommandId() { function checkAndThrowError(response: Response): void { if (response.type === "error") { - throw new RPCError(response); + // Use the typed-error reconstructor map in `rpc-error.ts` so the + // original class (e.g. `RequestRejectedByPolicyError`) survives + // the RPC boundary intact and `err instanceof ` works in + // consumer `catch` blocks. Unknown error names fall back to a + // plain `RPCError` wrapper inside `reconstructError`. + throw reconstructError(response); } } diff --git a/packages/sdk/client/rpc/rpc-error.ts b/packages/sdk/client/rpc/rpc-error.ts index 3bb119e483..5974eb7d94 100644 --- a/packages/sdk/client/rpc/rpc-error.ts +++ b/packages/sdk/client/rpc/rpc-error.ts @@ -1,4 +1,9 @@ import type { ErrorResponse } from "@/schemas"; +import { + RequestIdConflictError, + RequestNotFoundError, + RequestRejectedByPolicyError, +} from "@/utils/errors-server"; export class RPCError extends Error { public readonly timestamp?: string; @@ -43,3 +48,119 @@ export class RPCError extends Error { }; } } + +/** + * Attach the remote stack onto a reconstructed typed error so the + * client-side trace points at the consumer call site and the worker- + * side trace is preserved for debugging. Mirrors the behaviour + * `RPCError`'s constructor applies in the fall-through path. + */ +function attachRemoteContext( + err: Error, + response: ErrorResponse, +): Error { + if (response.stack) { + (err as { remoteStack?: string }).remoteStack = response.stack; + err.stack = `${err.stack}\n--- Worker Stack ---\n${response.stack}`; + } + if (response.timestamp) { + (err as { timestamp?: string }).timestamp = response.timestamp; + } + return err; +} + +/** Read a string field from the `typedFields` envelope, defaulting to `fallback` if missing or non-string. */ +function readStringField( + fields: Record | undefined, + key: string, + fallback: string, +): string { + const value = fields?.[key]; + return typeof value === "string" ? value : fallback; +} + +type ErrorReconstructor = (response: ErrorResponse) => Error; + +/** + * Map of server-thrown `QvacErrorBase` subclasses that need to survive + * the RPC boundary as their original class — `err instanceof + * RequestRejectedByPolicyError` on the consumer side must match the + * class re-exported from `@qvac/sdk`. + * + * **Key shape.** The key is the `name` value `QvacErrorBase` sets at + * construction time, which `createErrorResponse` forwards onto + * `response.name`. That value is the SCREAMING_SNAKE_CASE error-code + * name from `sdk-errors-server.ts` (`"REQUEST_REJECTED_BY_POLICY"`), + * **not** the JS class name (`"RequestRejectedByPolicyError"`). The + * mismatch was a 0.11.0 bring-up bug fixed alongside the reconstructor + * unit tests in `rpc-error-reconstruct.test.ts`. + * + * **Maintenance contract.** Every new cross-RPC server-thrown typed + * error class added in `errors-server.ts` adds a row here in the same + * PR, keyed by its `SDK_SERVER_ERROR_CODES.` constant. The + * class must (a) implement `toErrorResponseFields(): Record<…>` so + * the wire envelope carries its named constructor arguments and (b) + * be re-exported from `@qvac/sdk` so consumers can `import` it. + * Forgetting either side means `instanceof` regresses for that class. + * + * Client-constructed typed errors (e.g. `InferenceCancelledError` + * built in `client/api/completion-stream.ts` from the aggregated + * partial state) are NOT registered here — they never round-trip the + * envelope, and adding a reconstructor for one would create a + * parallel construction path that fires whenever the server happens + * to throw the same class name. + */ +const RECONSTRUCTORS: Record = { + REQUEST_ID_CONFLICT: (response) => { + return new RequestIdConflictError( + readStringField(response.typedFields, "requestId", ""), + response.cause, + ); + }, + REQUEST_NOT_FOUND: (response) => { + return new RequestNotFoundError( + readStringField(response.typedFields, "requestId", ""), + response.cause, + ); + }, + REQUEST_REJECTED_BY_POLICY: (response) => { + return new RequestRejectedByPolicyError( + readStringField(response.typedFields, "requestId", ""), + readStringField(response.typedFields, "kind", ""), + readStringField(response.typedFields, "modelId", ""), + readStringField(response.typedFields, "reason", response.message), + response.cause, + ); + }, +}; + +/** + * Rebuild the original server-thrown typed error from its serialised + * envelope so consumer code can do + * `if (err instanceof RequestRejectedByPolicyError) { ... }` across + * the RPC boundary. Unknown error names fall through to the legacy + * `RPCError` wrapper, which preserves `name`/`code`/`message` for + * code-based predicates. + */ +export function reconstructError(response: ErrorResponse): Error { + const reconstructor = response.name + ? RECONSTRUCTORS[response.name] + : undefined; + if (!reconstructor) { + return new RPCError(response); + } + + try { + return attachRemoteContext(reconstructor(response), response); + } catch { + // Defensive fall-through: if a reconstructor throws (e.g. a + // future class adds a required constructor field and an older + // server doesn't ship `typedFields`), surface the original error + // via `RPCError` so consumers never see "couldn't reconstruct + // error" obscuring the real one. The reconstructors here are + // intentionally written to coerce missing fields to defensible + // defaults (`String(x ?? "")`), so reaching this branch is the + // edge case, not the norm. + return new RPCError(response); + } +} diff --git a/packages/sdk/examples/download-with-cancel.ts b/packages/sdk/examples/download-with-cancel.ts index 5b716bfafd..35bd798680 100644 --- a/packages/sdk/examples/download-with-cancel.ts +++ b/packages/sdk/examples/download-with-cancel.ts @@ -14,8 +14,11 @@ let modelId: string | undefined; let cancelled = false; try { - // Download model with progress tracking and cancellation - await downloadAsset({ + // Download model with progress tracking and cancellation. The + // `downloadAsset(...)` call returns a *decorated* promise: the + // promise resolves to the modelId, and the same value carries a + // synchronous `requestId` field so we can cancel before it settles. + const download = downloadAsset({ assetSrc: LLAMA_3_2_1B_INST_Q4_0, onProgress: (progress) => { const downloadedMB = (progress.downloaded / 1024 / 1024).toFixed(2); @@ -35,17 +38,14 @@ try { console.log(progress); cancelled = true; - // Use the downloadKey to cancel - if (progress.downloadKey) { - void cancel({ - operation: "downloadAsset", - downloadKey: progress.downloadKey, - // clearCache: true, // Uncomment to delete partial file instead of resuming - }); - } + void cancel({ + requestId: download.requestId, + // clearCache: true, // Uncomment to delete partial file instead of resuming + }); } }, }); + await download; console.log(`\n✅ Model downloaded successfully! Model ID: ${modelId}`); console.log("🎯 Download completed without interruption"); diff --git a/packages/sdk/examples/rag/rag-hyperdb/cancellation.ts b/packages/sdk/examples/rag/rag-hyperdb/cancellation.ts index a3e9046603..54379c7c28 100644 --- a/packages/sdk/examples/rag/rag-hyperdb/cancellation.ts +++ b/packages/sdk/examples/rag/rag-hyperdb/cancellation.ts @@ -46,26 +46,29 @@ try { let progressCount = 0; let cancelled = false; - try { - await ragIngest({ - modelId, - workspace: WORKSPACE, - documents, - progressInterval: 50, - onProgress: (stage, current, total) => { - progressCount++; - const pct = total > 0 ? Math.round((current / total) * 100) : 0; - console.log(` [${stage}] ${current}/${total} (${pct}%)`); + // Capture the decorated promise so we can cancel by `requestId` (the + // primary cancel path). For a "cancel everything RAG on this model" + // sweep, use `cancel({ modelId, kind: "rag" })` instead. + const ingest = ragIngest({ + modelId, + workspace: WORKSPACE, + documents, + progressInterval: 50, + onProgress: (stage, current, total) => { + progressCount++; + const pct = total > 0 ? Math.round((current / total) * 100) : 0; + console.log(` [${stage}] ${current}/${total} (${pct}%)`); - // Cancel during embedding stage after a few updates - if (!cancelled && stage === "embedding" && current > 10) { - console.log("\n🛑 Triggering cancellation...\n"); - cancelled = true; - void cancel({ operation: "rag", workspace: WORKSPACE }); - } - }, - }); + if (!cancelled && stage === "embedding" && current > 10) { + console.log("\n🛑 Triggering cancellation...\n"); + cancelled = true; + void cancel({ requestId: ingest.requestId }); + } + }, + }); + try { + await ingest; console.log( "\n⚠️ Ingest completed (cancellation didn't interrupt in time)", ); diff --git a/packages/sdk/index.ts b/packages/sdk/index.ts index 4383ff58cb..d1a31b1885 100644 --- a/packages/sdk/index.ts +++ b/packages/sdk/index.ts @@ -158,9 +158,22 @@ export { SUPPORTED_AUDIO_FORMATS } from "./constants/audio"; // (e.g. `oneAtATimePerModel` on `completion`) rejects a new request; // it propagates out through the worker so the client can distinguish // "the request collided with another one" from "the request failed". +// +// `RequestIdConflictError` and `RequestNotFoundError` are thrown by +// `RequestRegistry.begin(...)` / `.end(...)` on UUID collisions and +// missing-target cancels. They're surfaced here so consumers using +// the decorated-promise `requestId` can pattern-match on rejected +// cancel paths. All three classes round-trip the RPC boundary via +// the typed-error reconstructor in `client/rpc/rpc-error.ts` so +// `err instanceof ` works on the consumer side, not just on +// the worker side. export { InferenceCancelledError } from "./utils/errors-server"; export type { InferenceCancelledPartial } from "./utils/errors-server"; -export { RequestRejectedByPolicyError } from "./utils/errors-server"; +export { + RequestIdConflictError, + RequestNotFoundError, + RequestRejectedByPolicyError, +} from "./utils/errors-server"; // Logging exports export { getLogger, SDK_LOG_ID } from "./logging"; diff --git a/packages/sdk/schemas/cancel.ts b/packages/sdk/schemas/cancel.ts index 4632efeebd..d4a7db8104 100644 --- a/packages/sdk/schemas/cancel.ts +++ b/packages/sdk/schemas/cancel.ts @@ -1,46 +1,56 @@ import { z } from "zod"; -import { delegateBaseSchema } from "./delegate"; const cancelBaseSchema = z.object({ type: z.literal("cancel"), }); +/** + * Public-API base for the broad-cancel escape hatch. Kept exported so + * the bare-side `cancel({ modelId, kind? })` helper can parse the + * `modelId` field consistently with the wire envelope. + */ export const cancelInferenceBaseSchema = z.object({ modelId: z.string().describe("The model ID to cancel inference for"), }); -const cancelInferenceParamsSchema = cancelInferenceBaseSchema.extend({ - operation: z.literal("inference").describe("Operation type"), -}); - -const cancelDownloadParamsSchema = z.object({ - operation: z.literal("downloadAsset").describe("Operation type"), - downloadKey: z.string().describe("The download key to cancel"), - clearCache: z - .boolean() - .optional() - .describe("If true, deletes the partial download file"), - delegate: delegateBaseSchema.optional(), -}); - -const cancelRagParamsSchema = z.object({ - operation: z.literal("rag").describe("Operation type"), - workspace: z.string().optional().describe("The RAG workspace to cancel"), -}); - -const cancelEmbeddingsParamsSchema = cancelInferenceBaseSchema.extend({ - operation: z.literal("embeddings").describe("Operation type"), -}); +/** + * Coarse kind narrowing for the broad-cancel escape hatch. Matches the + * server-side `RequestKind` union in `server/bare/runtime/request-context.ts` + * — keep the two lists in sync. The kind is optional; omitting it + * cancels every in-flight request on the model regardless of kind + * (the "cancel everything on this model" sweep used by model-unload + * and app-shutdown paths). + */ +const cancelKindSchema = z + .enum([ + "completion", + "embeddings", + "transcribe", + "translate", + "diffusion", + "tts", + "ocr", + "finetune", + "loadModel", + "downloadAsset", + "rag", + ] as const) + .describe( + "Optional kind narrowing for the broad cancel. Omitting it cancels every in-flight request on the model.", + ); /** - * Targeted cancel by `requestId` — the primary cancel path introduced in - * SDK 0.11.0. Pair with the `requestId` field exposed on `CompletionRun` - * (and equivalent long-running result objects) to cancel a specific - * in-flight request rather than every request running on a given model. + * Targeted cancel by `requestId` — the primary cancel path in + * SDK 0.11.0. Pair with the `requestId` field exposed on + * `CompletionRun` (and the decorated promises returned by + * `loadModel(...)`, `downloadAsset(...)`, `embed(...)`, + * `transcribe(...)`, `rag*(...)` etc.) to cancel a specific in-flight + * request rather than every request running on a given model. * - * The pre-existing `{ operation: "inference", modelId }` form is kept as - * a broad-cancel escape hatch for "cancel everything on this model" - * scenarios (model unload, app shutdown, admin sweeps). + * `clearCache` is honoured only when the targeted request is a + * `downloadAsset` — it propagates onto the underlying download + * transfer so the partial file is deleted when the last subscriber + * leaves. Ignored for other kinds. */ const cancelByRequestIdParamsSchema = z.object({ operation: z.literal("request").describe("Operation type"), @@ -48,16 +58,44 @@ const cancelByRequestIdParamsSchema = z.object({ .string() .min(1) .describe( - "Identifier of the specific in-flight request to cancel — the value exposed on the result object returned by long-running calls (e.g. `completion(...)`).", + "Identifier of the specific in-flight request to cancel — the value exposed on the result object returned by long-running calls (e.g. `completion(...)`, `loadModel(...)`, `downloadAsset(...)`).", + ), + clearCache: z + .boolean() + .optional() + .describe( + "Download-only: if true, deletes the partial download file when the subscriber leaves. Ignored for non-download kinds.", ), }); +/** + * Broad cancel escape hatch — abort every in-flight request running on + * a model (optionally narrowed by `kind`). Kept indefinitely as the + * non-`requestId` cancel surface for model-unload, app-shutdown, and + * admin sweeps where the caller doesn't have a `requestId` to hand. + * + * Replaces the legacy per-kind discriminator arms (`"inference"`, + * `"embeddings"`, `"downloadAsset"`, `"rag"`) with a single `"broad"` + * arm plus an optional `kind` field. The old arms went away as part + * of the 0.11.0 cleanup once every handler was on the registry; the + * wire shape collapse is a `[bc]` for any external caller hand-rolling + * the old RPC envelope. The public-API `cancel(...)` function in + * `client/api/cancel.ts` keeps the old `{ operation: "inference", modelId }` + * / `{ operation: "embeddings", modelId }` forms callable and translates + * them into this new shape at the client boundary, so consumers using + * the official SDK client see no change. + */ +const cancelBroadParamsSchema = z.object({ + operation: z.literal("broad").describe("Operation type"), + modelId: z + .string() + .describe("Cancel every in-flight request on this model"), + kind: cancelKindSchema.optional(), +}); + const cancelParamsSchema = z.discriminatedUnion("operation", [ - cancelInferenceParamsSchema, - cancelDownloadParamsSchema, - cancelRagParamsSchema, - cancelEmbeddingsParamsSchema, cancelByRequestIdParamsSchema, + cancelBroadParamsSchema, ]); export const cancelRequestSchema = z.intersection( @@ -68,20 +106,59 @@ export const cancelRequestSchema = z.intersection( export const cancelResponseSchema = z.object({ type: z.literal("cancel"), success: z.boolean(), + /** + * Number of in-flight contexts that this call flipped to + * `cancelling` (already-cancelled contexts are not counted, so + * callers can rely on the value to log "n requests cancelled" once + * without double-counting). Always present on `success: true`. + */ + cancelled: z.number().int().nonnegative().optional(), error: z.string().optional(), }); /** - * Sugar for the most common new path — `cancel({ requestId })`. The client - * accepts either this shape (no `operation`) or the explicit + * Sugar for the most common new path — `cancel({ requestId })`. The + * client accepts either this shape (no `operation`) or the explicit * `{ operation: "request", requestId }` and normalises before sending. */ export const cancelByRequestIdSugarSchema = z .object({ requestId: z.string().min(1), + clearCache: z.boolean().optional(), }) .strict(); +/** + * Sugar for the broad-cancel escape hatch — `cancel({ modelId, kind? })`. + * Translates to `{ operation: "broad", modelId, kind? }` at the client + * boundary. + */ +export const cancelBroadSugarSchema = z + .object({ + modelId: z.string().min(1), + kind: cancelKindSchema.optional(), + }) + .strict(); + +/** + * Legacy per-kind broad-cancel sugars retained at the public-API + * boundary so existing callers of `cancel({ operation: "inference", + * modelId })` / `cancel({ operation: "embeddings", modelId })` keep + * working without code changes. The client wrapper translates these + * into the new `{ operation: "broad", modelId, kind: ... }` wire + * shape. New callers should prefer `cancel({ requestId })` or + * `cancel({ modelId, kind? })`. + */ +export const cancelLegacyInferenceSugarSchema = z.object({ + operation: z.literal("inference"), + modelId: z.string().min(1), +}); + +export const cancelLegacyEmbeddingsSugarSchema = z.object({ + operation: z.literal("embeddings"), + modelId: z.string().min(1), +}); + export type CancelParams = z.infer; export type CancelInferenceBaseParams = z.infer< typeof cancelInferenceBaseSchema @@ -89,11 +166,25 @@ export type CancelInferenceBaseParams = z.infer< export type CancelByRequestIdParams = z.infer< typeof cancelByRequestIdParamsSchema >; +export type CancelBroadParams = z.infer; export type CancelByRequestIdSugar = z.infer< typeof cancelByRequestIdSugarSchema >; +export type CancelBroadSugar = z.infer; +export type CancelLegacyInferenceSugar = z.infer< + typeof cancelLegacyInferenceSugarSchema +>; +export type CancelLegacyEmbeddingsSugar = z.infer< + typeof cancelLegacyEmbeddingsSugarSchema +>; +export type CancelKind = z.infer; export type CancelRequest = z.infer; export type CancelResponse = z.infer; -/** Public client-API input — accepts the wire union *or* the requestId sugar. */ -export type CancelClientInput = CancelParams | CancelByRequestIdSugar; +/** Public client-API input — accepts the wire union *or* the requestId/broad sugars and the legacy per-kind sugars. */ +export type CancelClientInput = + | CancelParams + | CancelByRequestIdSugar + | CancelBroadSugar + | CancelLegacyInferenceSugar + | CancelLegacyEmbeddingsSugar; diff --git a/packages/sdk/schemas/error.ts b/packages/sdk/schemas/error.ts index a0d86a08f6..41f8c54d2f 100644 --- a/packages/sdk/schemas/error.ts +++ b/packages/sdk/schemas/error.ts @@ -1,6 +1,22 @@ import { z } from "zod"; import { QvacErrorBase } from "@qvac/error"; +/** + * Wire shape for errors thrown across the RPC boundary. The fields are + * the union of (a) the legacy `QvacErrorBase` serialisation (`name`, + * `code`, `message`, `stack`, `cause`, `timestamp`) and (b) the new + * `typedFields` map (0.11.0+) carrying per-class structured data the + * client-side reconstructor uses to rebuild the original typed error. + * + * `typedFields` is opaque on the wire — `z.unknown()` — and the + * per-class reconstructor in `client/rpc/rpc-error.ts` casts each + * member at the boundary. The single-map shape keeps the schema + * compact regardless of how many typed-error classes the SDK + * eventually surfaces across RPC. New typed-error classes that need + * cross-RPC reconstruction add a `toErrorResponseFields()` method on + * the server side and a row to the reconstructor map on the client + * side; the schema itself doesn't change. + */ export const errorResponseSchema = z.object({ type: z.literal("error"), message: z.string(), @@ -9,10 +25,36 @@ export const errorResponseSchema = z.object({ name: z.string().optional(), code: z.number().optional(), cause: z.unknown().optional(), + typedFields: z.record(z.string(), z.unknown()).optional(), }); export type ErrorResponse = z.infer; +/** + * A `QvacErrorBase` subclass that opts into typed-field serialisation + * across the RPC boundary. The method returns the subset of own + * properties the client-side reconstructor needs to rebuild the + * original class with its named constructor arguments populated. + * + * Co-located with each class (see `utils/errors-server.ts`) so adding + * a new cross-RPC typed error is a three-step change in one PR: define + * the class, implement the method, add a reconstructor entry in + * `client/rpc/rpc-error.ts`. + */ +export interface TypedErrorSerializer { + toErrorResponseFields(): Record; +} + +function hasTypedFields(error: unknown): error is TypedErrorSerializer { + return ( + error !== null && + typeof error === "object" && + "toErrorResponseFields" in error && + typeof (error as { toErrorResponseFields?: unknown }) + .toErrorResponseFields === "function" + ); +} + function isQvacError(error: unknown): error is QvacErrorBase { return error instanceof QvacErrorBase; } @@ -20,7 +62,7 @@ function isQvacError(error: unknown): error is QvacErrorBase { export function createErrorResponse(error: unknown): ErrorResponse { if (isQvacError(error)) { const qvacData = error.toJSON(); - return { + const response: ErrorResponse = { type: "error", name: qvacData.name, code: qvacData.code, @@ -28,6 +70,10 @@ export function createErrorResponse(error: unknown): ErrorResponse { stack: qvacData.stack, timestamp: new Date().toISOString(), }; + if (hasTypedFields(error)) { + response.typedFields = error.toErrorResponseFields(); + } + return response; } const message = error instanceof Error ? error.message : String(error); diff --git a/packages/sdk/server/bare/ops/cancel.ts b/packages/sdk/server/bare/ops/cancel.ts index 5194478c97..a23fd399d0 100644 --- a/packages/sdk/server/bare/ops/cancel.ts +++ b/packages/sdk/server/bare/ops/cancel.ts @@ -14,25 +14,29 @@ const logger = getServerLogger(); * Broad cancel: abort every in-flight request matching `modelId` (and * optionally a `kind`). Maps onto `RequestRegistry.cancel({ modelId })` * — the registry walks active contexts and aborts each one's signal, - * which the inference handler has wired to the addon's `cancel()`. + * which each handler has wired to its own addon-level / async unwind + * via the registry's `await using ctx = registry.begin(...)` block. * * Kept as a stable surface alongside the new `cancel({ requestId })` * path: the caller may not have a `requestId` to hand (model unload, * app shutdown, admin sweeps), and the escape hatch is cheap because * the registry already does the matching. * - * Compatibility fallback: only the llama.cpp completion handler routes - * through the registry in 0.11.0; embeddings / transcription / - * translation / decoder / OCR / TTS handlers will follow in later - * milestones. Until then, a `modelId`-targeted cancel that finds zero - * registry matches falls back to the pre-0.11.0 behavior of calling - * `model.addon.cancel()` directly, so the wire contract for those - * surfaces does not regress while the migration is in flight. + * Returns the number of contexts whose abort was triggered by *this* + * call (already-cancelled contexts are skipped so callers can rely on + * the count to log "n requests cancelled" once). Used by the RPC + * cancel handler to populate `CancelResponse.cancelled` and by + * internal server-side callers that want to know whether anything + * landed. + * + * The legacy pre-registry addon-cancel fallback was removed in 0.11.0 + * once every handler had been migrated onto the registry; the function + * now does exactly one thing — a registry walk. */ -export async function cancel( +export function cancel( params: CancelInferenceBaseParams, opts?: { kind?: RequestKind }, -): Promise { +): number { const { modelId } = cancelInferenceBaseSchema.parse(params); const model = getModel(modelId); @@ -44,28 +48,14 @@ export async function cancel( const target = opts?.kind ? { modelId, kind: opts.kind } : { modelId }; const cancelled = registry.cancel(target); - if (cancelled > 0) return; - - // No registry match: a request kind whose handler hasn't been migrated - // onto `registry.begin(...)` yet (everything except llama.cpp - // completion in 0.11.0). Fire the addon-level cancel directly so the - // pre-registry behavior is preserved — including awaiting acknowledgement, - // which is the wire contract callers relied on before this PR (the RPC - // response resolves once the addon has flipped its cancel flag, not - // beforehand). - const addon = model.addon; - if (addon?.cancel) { - await addon.cancel.call(addon); + if (cancelled === 0) { + // Callers (workbench "Stop" button, app shutdown sweeps) often + // fire-and-forget; log so operators can see when a broad cancel + // landed against a registry with nothing in flight on this model. logger.debug( - `[cancel] no registry match for modelId=${modelId}${opts?.kind ? ` kind=${opts.kind}` : ""} — fell back to addon.cancel()`, + `[cancel] no in-flight request matched modelId=${modelId}${opts?.kind ? ` kind=${opts.kind}` : ""}`, ); - return; } - // Callers (workbench "Stop" button, app shutdown sweeps) often - // fire-and-forget; log so operators can see when a cancel landed - // against a registry with nothing in flight and no addon-level cancel. - logger.debug( - `[cancel] no in-flight request matched modelId=${modelId}${opts?.kind ? ` kind=${opts.kind}` : ""}`, - ); + return cancelled; } diff --git a/packages/sdk/server/rpc/handler-registry.ts b/packages/sdk/server/rpc/handler-registry.ts index 71fc12fdea..b602fe1d44 100644 --- a/packages/sdk/server/rpc/handler-registry.ts +++ b/packages/sdk/server/rpc/handler-registry.ts @@ -58,18 +58,31 @@ function isModelDelegated(request: Request): boolean { return entry?.isDelegated ?? false; } +/** + * Should the cancel be forwarded to a delegated provider? + * + * After the 0.11.0 wire-schema collapse the cancel envelope has two + * operations: + * + * - `request` — targeted cancel by `requestId`. Always handled + * locally: the worker-singleton `RequestRegistry` is the source of + * truth for active requests (delegated handlers register their own + * requests on it the same way local handlers do), so a `requestId` + * cancel always lands on the right worker without needing a hop + * through the provider. Returning `false` here keeps the cancel on + * the local cancel handler, where it routes through the registry + * and (for downloads) the `markClearCacheForRequest` helper. + * + * - `broad` — abort every in-flight request on a model. Forwarded to + * the delegated provider iff the targeted model itself is + * delegated; the provider then runs the same broad-cancel sweep + * server-side. Local broad cancels for non-delegated models stay + * on this worker. + */ function isCancelDelegated(request: Request): boolean { if (request.type !== "cancel") return false; - - if (request.operation === "inference" || request.operation === "embeddings") { - return isModelDelegated(request); - } - - if (request.operation === "downloadAsset") { - return !!request.delegate; - } - - return false; + if (request.operation !== "broad") return false; + return isModelDelegated(request); } export const registry: Record = { diff --git a/packages/sdk/server/rpc/handlers/cancel-delegated.ts b/packages/sdk/server/rpc/handlers/cancel-delegated.ts index 4403af2ef3..55d2e8518d 100644 --- a/packages/sdk/server/rpc/handlers/cancel-delegated.ts +++ b/packages/sdk/server/rpc/handlers/cancel-delegated.ts @@ -12,44 +12,37 @@ type DelegationTarget = { timeout?: number; }; +/** + * Resolve the delegated provider for a cancel request, if any. + * + * After the 0.11.0 wire-schema collapse the cancel envelope has only + * two operations. Only `broad` cancels delegate at the cancel layer — + * see `isCancelDelegated` in `handler-registry.ts` for the policy and + * the rationale. + * + * The targeted `request` arm is handled locally because the registry + * is worker-singleton and already holds the entry for delegated + * requests (the delegated handler registers its own context on the + * provider-facing side). For pre-0.11.0 behaviour where a `requestId` + * cancel against a delegated model needed to round-trip to the + * provider, hold onto the delegated `loadModel(...).requestId` and + * fire a broad cancel against the model id instead. + */ function resolveDelegationTarget( request: CancelRequest, ): DelegationTarget | null { - if (request.operation === "inference" || request.operation === "embeddings") { - const entry = getModelEntry(request.modelId); - if (!entry?.isDelegated) { - return null; - } - const target: DelegationTarget = { - providerPublicKey: entry.delegated.providerPublicKey, - }; - if (entry.delegated.timeout !== undefined) { - target.timeout = entry.delegated.timeout; - } - return target; - } - - if (request.operation === "downloadAsset" && request.delegate) { - const target: DelegationTarget = { - providerPublicKey: request.delegate.providerPublicKey, - }; - if (request.delegate.timeout !== undefined) { - target.timeout = request.delegate.timeout; - } - return target; - } + if (request.operation !== "broad") return null; - return null; -} + const entry = getModelEntry(request.modelId); + if (!entry?.isDelegated) return null; -function toProviderCancelRequest(request: CancelRequest): CancelRequest { - if (request.operation !== "downloadAsset") { - return request; + const target: DelegationTarget = { + providerPublicKey: entry.delegated.providerPublicKey, + }; + if (entry.delegated.timeout !== undefined) { + target.timeout = entry.delegated.timeout; } - - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { delegate: _delegate, ...providerRequest } = request; - return providerRequest; + return target; } export async function handleCancelDelegated( @@ -79,7 +72,7 @@ export async function handleCancelDelegated( delegateOpts.profilingMeta = options.profilingMeta; } - await send(toProviderCancelRequest(request), rpc, delegateOpts); + await send(request, rpc, delegateOpts); return { type: "cancel", success: true }; } catch (error) { logger.error("Error during delegated cancellation:", error); diff --git a/packages/sdk/server/rpc/handlers/cancelHandler.ts b/packages/sdk/server/rpc/handlers/cancelHandler.ts index cdba7e5081..34c93597a6 100644 --- a/packages/sdk/server/rpc/handlers/cancelHandler.ts +++ b/packages/sdk/server/rpc/handlers/cancelHandler.ts @@ -1,109 +1,71 @@ import type { CancelRequest, CancelResponse } from "@/schemas/cancel"; -import { cancel } from "@/server/bare/ops/cancel"; -import { cancelTransfer } from "@/server/rpc/handlers/load-model/download-manager"; -import { - getActiveRagRequest, - DEFAULT_WORKSPACE, -} from "@/server/bare/rag-hyperdb"; +import { cancel as cancelByModelId } from "@/server/bare/ops/cancel"; import { getRequestRegistry } from "@/server/bare/runtime"; +import { markClearCacheForRequest } from "@/server/rpc/handlers/load-model/download-manager"; import { getServerLogger } from "@/logging"; const logger = getServerLogger(); -export async function cancelHandler( - request: CancelRequest, -): Promise { +/** + * Cancel RPC entry point. The 5-arm `switch (request.operation)` + * dispatcher that lived here through 0.10.x was retired in 0.11.0: + * every long-running handler now registers itself on the + * worker-singleton `RequestRegistry`, so the cancel surface narrows to + * two paths that route through the same registry primitive: + * + * - `{ operation: "request", requestId, clearCache? }` — targeted + * cancel by client-generated id. Looks up the registry entry, + * fires its abort signal, optionally marks the underlying download + * transfer for cache clear. The "stop-button race" case (client + * cancel beats its own begin to the worker) is handled inside the + * registry via the cancel-before-begin tripwire. + * + * - `{ operation: "broad", modelId, kind? }` — abort every in-flight + * request on a model (optionally narrowed by `kind`). Used for + * model unload, app shutdown, and admin sweeps where the caller + * has no `requestId`. Delegates to the `cancel` bare op so the + * `ModelNotLoadedError` validation is shared with internal + * server-side broad cancels. + * + * Always returns `success: true` plus a `cancelled` count (the number + * of contexts this call flipped to `cancelling` — already-cancelled + * contexts are not counted). A targeted cancel with no in-flight + * match still returns `success: true, cancelled: 0`; the + * cancel-before-begin tripwire ensures the cancel is applied + * retroactively if a matching begin arrives within the registry's + * race window. + */ +export function cancelHandler(request: CancelRequest): CancelResponse { try { - switch (request.operation) { - case "inference": - // Awaited so the RPC response resolves after the addon has - // acknowledged the cancel for non-registry-migrated handlers - // (decoder / OCR / TTS). The registry-routed path inside - // `cancel()` is already synchronous w.r.t. the abort, so the - // await is a no-op for completion-stream's signal-driven - // cancel. - await cancel({ modelId: request.modelId }, { kind: "completion" }); - break; - case "embeddings": - await cancel({ modelId: request.modelId }, { kind: "embeddings" }); - break; - case "request": { - const cancelled = getRequestRegistry().cancel({ - requestId: request.requestId, - }); - if (cancelled === 0) { - // info-level (not debug) because the decorated-promise pattern - // makes "no in-flight match" a common and user-visible case: - // a Stop button fired after the request settled but before the - // UI cleared lands here. Users debugging "my Stop button isn't - // working" need this visible without lowering the log level. - logger.info( - `[cancel] no in-flight request matched requestId=${request.requestId}`, - ); - } - break; + if (request.operation === "request") { + if (request.clearCache) { + markClearCacheForRequest(request.requestId); } - case "downloadAsset": - // Deprecated cancel arm. `downloadAsset` is registry-migrated - // and the primary cancel path is now - // `cancel({ operation: "request", requestId })`. This case - // stays for wire-compat with older clients; `cancelTransfer(...)` - // in download-manager.ts routes each subscriber through - // `registry.cancel({ requestId })` so the behaviour is - // equivalent to a broad per-`downloadKey` cancel. - logger.warn( - "[cancel] `cancel({ operation: \"downloadAsset\", downloadKey })` is deprecated — use `cancel({ requestId })` against the value exposed on the `loadModel(...)` / `downloadAsset(...)` promise instead. This compat path is scheduled for removal in a future release.", + const cancelled = getRequestRegistry().cancel({ + requestId: request.requestId, + }); + if (cancelled === 0) { + // info-level (not debug) because the decorated-promise pattern + // makes "no in-flight match" a common and user-visible case: + // a Stop button fired after the request settled but before + // the UI cleared lands here. The cancel-before-begin tripwire + // inside the registry already captured the cancel for any + // matching begin in flight; this log just helps operators + // debugging "my Stop button isn't working" without lowering + // the log level. + logger.info( + `[cancel] no in-flight request matched requestId=${request.requestId}`, ); - cancelTransfer(request.downloadKey, request.clearCache); - break; - case "rag": { - // Deprecated cancel arm. RAG is registry-migrated with - // workspace-level admission in the dispatcher (`rag.ts`). - // Primary cancel path is - // `cancel({ operation: "request", requestId })`. This arm - // stays for wire-compat — it walks the workspace→requestId map - // installed by the dispatcher and routes via the registry. - logger.warn( - "[cancel] `cancel({ operation: \"rag\", workspace })` is deprecated — use `cancel({ requestId })` instead. This compat path is scheduled for removal in a future release.", - ); - const workspace = request.workspace ?? DEFAULT_WORKSPACE; - const requestId = getActiveRagRequest(workspace); - if (requestId === undefined) { - logger.warn( - `No active RAG operation to cancel for workspace: ${workspace}`, - ); - } else { - getRequestRegistry().cancel({ - requestId, - reason: "rag-workspace-cancel", - }); - } - break; - } - default: { - // Exhaustiveness guard: if the `CancelRequest` union ever grows a - // new `operation` and this switch isn't updated, TypeScript fails - // here at compile time. At runtime the zod discriminated union in - // `cancelRequestSchema` is upstream, so reaching this branch means - // the schema and the handler have drifted — surface the - // mismatch as an explicit failure rather than a silent - // `success: true` no-op. - const _exhaustive: never = request; - void _exhaustive; - const op = (request as { operation?: string }).operation ?? "unknown"; - logger.error(`[cancel] unhandled cancel operation: ${op}`); - return { - type: "cancel", - success: false, - error: `Unhandled cancel operation: ${op}`, - }; } + return { type: "cancel", success: true, cancelled }; } - return { - type: "cancel", - success: true, - }; + // operation === "broad" + const cancelled = cancelByModelId( + { modelId: request.modelId }, + request.kind ? { kind: request.kind } : undefined, + ); + return { type: "cancel", success: true, cancelled }; } catch (error) { logger.error("Error during cancellation:", error); return { diff --git a/packages/sdk/server/rpc/handlers/load-model/download-manager.ts b/packages/sdk/server/rpc/handlers/load-model/download-manager.ts index 79771d2011..8b890f0152 100644 --- a/packages/sdk/server/rpc/handlers/load-model/download-manager.ts +++ b/packages/sdk/server/rpc/handlers/load-model/download-manager.ts @@ -307,6 +307,38 @@ export function startOrJoinDownload( * so we don't leak the transfer if a legacy code path holds the only * reference. */ +/** + * Set `clearCache=true` on the transfer that owns the subscriber bound + * to `requestId`, so when the registry's targeted cancel removes that + * subscriber and we reach last-subscriber teardown, the partial + * download file is deleted instead of preserved for automatic resume. + * + * Lookup is O(transfers * subscribers); transfers are short-lived and + * subscriber counts per transfer are tiny in practice, so this is + * fine. Returns `true` if a matching subscriber was found, `false` + * otherwise — the cancel handler treats both cases identically (the + * registry cancel still fires) and the return value is informational. + * + * Added in 0.11.0 to support `cancel({ requestId, clearCache: true })` + * for download requests after the wire schema collapse removed the + * `{ operation: "downloadAsset", downloadKey, clearCache }` arm. The + * subscriber is the unit of `clearCache` even though the flag lives on + * the shared transfer: if any subscriber on the transfer asks for + * clearCache, the partial file is deleted when the last subscriber + * leaves, matching the pre-collapse behaviour. + */ +export function markClearCacheForRequest(requestId: string): boolean { + for (const transfer of activeTransfers.values()) { + for (const sub of transfer.subscribers.values()) { + if (sub.requestId === requestId) { + transfer.clearCache = true; + return true; + } + } + } + return false; +} + export function cancelTransfer( downloadKey: string, clearCache = false, diff --git a/packages/sdk/test/unit/error-typed-fields.test.ts b/packages/sdk/test/unit/error-typed-fields.test.ts new file mode 100644 index 0000000000..7321598bb9 --- /dev/null +++ b/packages/sdk/test/unit/error-typed-fields.test.ts @@ -0,0 +1,90 @@ +// @ts-expect-error brittle has no type declarations +import test from "brittle"; +import { createErrorResponse } from "@/schemas/error"; +import { + RequestIdConflictError, + RequestNotFoundError, + RequestRejectedByPolicyError, + InferenceCancelledError, + ModelNotLoadedError, +} from "@/utils/errors-server"; + +test("createErrorResponse: RequestRejectedByPolicyError carries its named fields on typedFields", (t) => { + const err = new RequestRejectedByPolicyError( + "rid-1", + "completion", + "model-1", + "oneAtATimePerModel", + ); + const response = createErrorResponse(err); + + t.is(response.type, "error"); + // `QvacErrorBase.name` is the SCREAMING_SNAKE error-code name from + // sdk-errors-server.ts, not the JS class name — the rpc-error.ts + // reconstructor map keys off this value. + t.is(response.name, "REQUEST_REJECTED_BY_POLICY"); + t.is(response.code, 52420); + t.alike(response.typedFields, { + requestId: "rid-1", + kind: "completion", + modelId: "model-1", + reason: "oneAtATimePerModel", + }); +}); + +test("createErrorResponse: RequestIdConflictError carries requestId on typedFields", (t) => { + const err = new RequestIdConflictError("rid-2"); + const response = createErrorResponse(err); + + t.is(response.name, "REQUEST_ID_CONFLICT"); + t.is(response.code, 52417); + t.alike(response.typedFields, { requestId: "rid-2" }); +}); + +test("createErrorResponse: RequestNotFoundError carries requestId on typedFields", (t) => { + const err = new RequestNotFoundError("rid-3"); + const response = createErrorResponse(err); + + t.is(response.name, "REQUEST_NOT_FOUND"); + t.is(response.code, 52418); + t.alike(response.typedFields, { requestId: "rid-3" }); +}); + +test("createErrorResponse: QvacError without toErrorResponseFields omits typedFields", (t) => { + // ModelNotLoadedError is a QvacError but doesn't opt into typed-field + // serialisation — the response carries name/code/message but no + // typedFields envelope. + const err = new ModelNotLoadedError("model-1"); + const response = createErrorResponse(err); + + t.is(response.name, "MODEL_NOT_LOADED"); + t.is(response.typedFields, undefined); +}); + +test("createErrorResponse: plain Error produces a non-typed envelope", (t) => { + const err = new Error("something broke"); + const response = createErrorResponse(err); + + t.is(response.type, "error"); + t.is(response.message, "something broke"); + t.is(response.name, undefined); + t.is(response.code, undefined); + t.is(response.typedFields, undefined); +}); + +test("createErrorResponse: InferenceCancelledError does NOT round-trip via typedFields (client-constructed)", (t) => { + // InferenceCancelledError is built client-side in completion-stream.ts + // when the event stream ends with stopReason: "cancelled". Even if the + // server happens to throw one (rare — e.g. a fixture), the reconstructor + // map deliberately has no entry for its name, so a `typedFields` value + // here would be inert. We assert the envelope shape is sane and explicitly + // does not declare typed fields the client wasn't asked to reconstruct. + const err = new InferenceCancelledError("rid-4"); + const response = createErrorResponse(err); + + t.is(response.name, "INFERENCE_CANCELLED"); + t.is(response.code, 52419); + // No `toErrorResponseFields()` method on this class — typedFields stays + // undefined. + t.is(response.typedFields, undefined); +}); diff --git a/packages/sdk/test/unit/request-id-wire.test.ts b/packages/sdk/test/unit/request-id-wire.test.ts index acfaf22f61..dbd9d26daa 100644 --- a/packages/sdk/test/unit/request-id-wire.test.ts +++ b/packages/sdk/test/unit/request-id-wire.test.ts @@ -107,3 +107,18 @@ test("ragRequestSchema: forwards requestId for reindex (storage-only op)", (t: T }); t.is((parsed as { requestId?: string }).requestId, "client-uuid-reindex"); }); + +test("ragRequestSchema: forwards requestId for saveEmbeddings", (t: T) => { + const parsed = ragRequestSchema.parse({ + type: "rag", + operation: "saveEmbeddings", + workspace: "ws-a", + documents: [], + requestId: "client-uuid-save", + }); + t.is( + (parsed as { requestId?: string }).requestId, + "client-uuid-save", + "saveEmbeddings envelope must carry the client-generated requestId", + ); +}); diff --git a/packages/sdk/test/unit/rpc-error-reconstruct.test.ts b/packages/sdk/test/unit/rpc-error-reconstruct.test.ts new file mode 100644 index 0000000000..57dd8cbd03 --- /dev/null +++ b/packages/sdk/test/unit/rpc-error-reconstruct.test.ts @@ -0,0 +1,137 @@ +// @ts-expect-error brittle has no type declarations +import test from "brittle"; +import { reconstructError, RPCError } from "@/client/rpc/rpc-error"; +import { createErrorResponse } from "@/schemas/error"; +import { + RequestIdConflictError, + RequestNotFoundError, + RequestRejectedByPolicyError, + ModelNotLoadedError, +} from "@/utils/errors-server"; + +test("reconstructError: RequestRejectedByPolicyError round-trips via name + typedFields", (t) => { + const original = new RequestRejectedByPolicyError( + "rid-1", + "completion", + "model-1", + "oneAtATimePerModel", + ); + const envelope = createErrorResponse(original); + + const reconstructed = reconstructError(envelope); + + t.ok( + reconstructed instanceof RequestRejectedByPolicyError, + "instanceof RequestRejectedByPolicyError must hold across the envelope", + ); + // Type narrows after the instanceof check above; cast is for noUncheckedIndexedAccess + strict TS. + const r = reconstructed as RequestRejectedByPolicyError; + t.is(r.requestId, "rid-1"); + t.is(r.kind, "completion"); + t.is(r.modelId, "model-1"); + t.is(r.reason, "oneAtATimePerModel"); + t.is(r.code, 52420); + t.ok(r instanceof Error, "reconstructed must still satisfy instanceof Error"); +}); + +test("reconstructError: RequestIdConflictError round-trips", (t) => { + const original = new RequestIdConflictError("rid-2"); + const envelope = createErrorResponse(original); + + const reconstructed = reconstructError(envelope); + + t.ok(reconstructed instanceof RequestIdConflictError); + t.is((reconstructed as RequestIdConflictError).requestId, "rid-2"); + t.is((reconstructed as RequestIdConflictError).code, 52417); +}); + +test("reconstructError: RequestNotFoundError round-trips", (t) => { + const original = new RequestNotFoundError("rid-3"); + const envelope = createErrorResponse(original); + + const reconstructed = reconstructError(envelope); + + t.ok(reconstructed instanceof RequestNotFoundError); + t.is((reconstructed as RequestNotFoundError).requestId, "rid-3"); + t.is((reconstructed as RequestNotFoundError).code, 52418); +}); + +test("reconstructError: unknown error name falls through to RPCError", (t) => { + // A QvacError with no entry in the reconstructor map round-trips name + + // code via the legacy RPCError wrapper — instanceof RPCError must hold; + // the original class is NOT recovered (and isn't expected to be — the + // class isn't re-exported / registered for cross-RPC use). + const original = new ModelNotLoadedError("model-1"); + const envelope = createErrorResponse(original); + + const reconstructed = reconstructError(envelope); + + t.ok( + reconstructed instanceof RPCError, + "unknown names must fall through to RPCError", + ); + t.absent( + reconstructed instanceof ModelNotLoadedError, + "an unregistered class is NOT magically rebuilt", + ); + const rpc = reconstructed as RPCError; + // `name` comes from the SCREAMING_SNAKE error-code name (the reconstructor + // map keys off this exact value). + t.is(rpc.name, "MODEL_NOT_LOADED"); + t.is(rpc.isQvacError, true); +}); + +test("reconstructError: non-typed error envelope falls through to RPCError", (t) => { + const envelope = createErrorResponse(new Error("plain")); + + const reconstructed = reconstructError(envelope); + + t.ok(reconstructed instanceof RPCError); + t.is(reconstructed.message, "plain"); + t.is((reconstructed as RPCError).isQvacError, false); +}); + +test("reconstructError: missing typedFields on a known name does not throw", (t) => { + // Defensive fall-through: if the server forgot to populate typedFields + // for a registered class (e.g. an older worker hasn't been redeployed), + // the reconstructor must coerce missing fields to defensible defaults + // rather than throwing inside the catch — the consumer never sees a + // "couldn't reconstruct" obscuring the real error. + const envelope = { + type: "error" as const, + name: "REQUEST_REJECTED_BY_POLICY", + code: 52420, + message: "policy rejection", + }; + + const reconstructed = reconstructError(envelope); + + t.ok(reconstructed instanceof RequestRejectedByPolicyError); + // Missing fields coerce to "" via readStringField; `reason` falls back + // to the envelope message to preserve human-readable context. + const r = reconstructed as RequestRejectedByPolicyError; + t.is(r.requestId, ""); + t.is(r.kind, ""); + t.is(r.modelId, ""); + t.is(r.reason, "policy rejection"); +}); + +test("reconstructError: remote stack/timestamp attach onto the reconstructed instance", (t) => { + const original = new RequestRejectedByPolicyError( + "rid-4", + "embeddings", + "model-2", + "oneAtATimePerModel", + ); + const envelope = createErrorResponse(original); + + const reconstructed = reconstructError(envelope) as RequestRejectedByPolicyError & { + remoteStack?: string; + timestamp?: string; + }; + + // timestamp is populated by createErrorResponse; remoteStack only when + // the server included a stack (it does for QvacErrors). + t.ok(reconstructed.timestamp, "remote timestamp should attach"); + t.ok(reconstructed.stack && reconstructed.stack.includes("Worker Stack")); +}); diff --git a/packages/sdk/test/unit/runtime/request-lifecycle-logging.test.ts b/packages/sdk/test/unit/runtime/request-lifecycle-logging.test.ts index 7100d9ee9d..f95aeaf853 100644 --- a/packages/sdk/test/unit/runtime/request-lifecycle-logging.test.ts +++ b/packages/sdk/test/unit/runtime/request-lifecycle-logging.test.ts @@ -12,7 +12,7 @@ import { createRequestRegistry } from "@/server/bare/runtime/request-registry"; // emits at `begin` / `cancel` / `end`. The lines are the only log // surface downstream consumers can grep for "what happened on // requestId=X" without instrumenting every handler — so the shape is -// part of the M3a contract. +// part of the 0.11.0 request-lifecycle contract. // // Tests use the `options.logger` injection on `createRequestRegistry` // to capture every `info(...)` call without touching the SDK's diff --git a/packages/sdk/test/unit/runtime/request-registry.test.ts b/packages/sdk/test/unit/runtime/request-registry.test.ts index 582edef00e..e0eb01a4d8 100644 --- a/packages/sdk/test/unit/runtime/request-registry.test.ts +++ b/packages/sdk/test/unit/runtime/request-registry.test.ts @@ -462,9 +462,8 @@ test("policy: oneAtATimePerModel rejects a second begin on the same (kind, model }); t.is(first.requestId, "r-1"); - // Throws the dedicated policy class (Option A in the M3a brief) — - // handler / RPC code can `instanceof` narrow without parsing the - // error message. + // Throws the dedicated policy class so handler / RPC code can + // `instanceof` narrow without parsing the error message. await t.exception(() => { r.begin({ requestId: "r-2", @@ -515,7 +514,7 @@ test("policy: oneAtATimePerModel ignores requests without modelId", async (t: T) r.policy({ kind: "completion", oneAtATimePerModel: true }); // Both have no modelId — policy has no key to match against, so - // both are admitted. This mirrors the pre-M3a behaviour for + // both are admitted. This is the documented behaviour for // model-less requests (e.g. handlers that don't yet attach a // modelId to their `begin(...)` call). await using a = r.begin({ requestId: "r-a", kind: "completion" }); diff --git a/packages/sdk/test/unit/runtime/with-request-context.test.ts b/packages/sdk/test/unit/runtime/with-request-context.test.ts index 46074fbd34..a497bc4057 100644 --- a/packages/sdk/test/unit/runtime/with-request-context.test.ts +++ b/packages/sdk/test/unit/runtime/with-request-context.test.ts @@ -7,8 +7,8 @@ import { withRequestContext } from "@/server/bare/runtime/with-request-context"; // ----------------------------------------------------------------------------- // withRequestContext — handler-side per-request logger wrapper. // -// Covers the M3a brief's Deliverable 3 acceptance criteria for the -// `withRequestContext` helper: +// Covers the acceptance criteria for the `withRequestContext` +// helper: // - Every emit (debug/info/warn/error/trace) gets prefixed with // `[request-lifecycle requestId= modelId=]`. // - The prefix shape drops `modelId=...` when the request has no diff --git a/packages/sdk/tests-qvac/tests/shared/executors/config-executor.ts b/packages/sdk/tests-qvac/tests/shared/executors/config-executor.ts index 21407b8645..74124dbcf3 100644 --- a/packages/sdk/tests-qvac/tests/shared/executors/config-executor.ts +++ b/packages/sdk/tests-qvac/tests/shared/executors/config-executor.ts @@ -88,20 +88,20 @@ export class ConfigExecutor extends BaseExecutor { let progressEvents = 0; const startTime = Date.now(); - const result = await downloadAsset({ + const op = downloadAsset({ assetSrc: OCR_CYRILLIC_RECOGNIZER, onProgress: (p: { downloadKey?: string; percentage: number }) => { progressEvents++; - if (!cancelTriggered && p.downloadKey && p.percentage >= 1) { + if (!cancelTriggered && p.percentage >= 1) { cancelTriggered = true; void cancel({ - operation: "downloadAsset", - downloadKey: p.downloadKey, + requestId: op.requestId, clearCache: true, }); } }, - }).then( + }); + const result = await op.then( (id: string) => ({ status: "ok" as const, id }), (err: unknown) => ({ status: "fail" as const, diff --git a/packages/sdk/tests-qvac/tests/shared/executors/delegated-inference-executor.ts b/packages/sdk/tests-qvac/tests/shared/executors/delegated-inference-executor.ts index 1f6b17b70b..d3f27591de 100644 --- a/packages/sdk/tests-qvac/tests/shared/executors/delegated-inference-executor.ts +++ b/packages/sdk/tests-qvac/tests/shared/executors/delegated-inference-executor.ts @@ -183,13 +183,30 @@ export class DelegatedInferenceExecutor extends BaseExecutor { async cancelDelegatedDownload(): Promise { return this.withProvider(async ({ publicKey }) => { + // 0.11.0 cancel surface is requestId-based; delegated routing is + // bound to the requestId via the registry rather than carried on + // the cancel wire. `downloadAsset` is not delegatable on the + // client (the SDK only delegates via `loadModel`), so the + // spiritually-equivalent path is: start a delegated `loadModel` + // (whose work includes downloading the asset on the provider), + // grab the synchronously-exposed `op.requestId`, and cancel by + // id. Same-process delegation fails with DELEGATE_CONNECTION_FAILED + // before begin completes — which is the asserted success path + // (it confirms the cancel routed through the delegation pipe). + const op = loadModel({ + modelSrc: LLAMA_3_2_1B_INST_Q4_0, + modelType: "llm", + delegate: { + providerPublicKey: publicKey, + timeout: DEFAULT_DELEGATE_TIMEOUT, + fallbackToLocal: false, + }, + }); + void op.catch(() => {}); + try { - await cancel({ - operation: "downloadAsset", - downloadKey: "nonexistent-delegated-download", - delegate: { providerPublicKey: publicKey, timeout: DEFAULT_DELEGATE_TIMEOUT }, - }); - return { passed: true, output: "Cancel delegated download API accepted" }; + await cancel({ requestId: op.requestId }); + return { passed: true, output: "Delegated cancel by requestId accepted" }; } catch (error) { const msg = error instanceof Error ? error.message : String(error); diff --git a/packages/sdk/tests-qvac/tests/shared/executors/download-executor.ts b/packages/sdk/tests-qvac/tests/shared/executors/download-executor.ts index 8de97fb079..6622d723d3 100644 --- a/packages/sdk/tests-qvac/tests/shared/executors/download-executor.ts +++ b/packages/sdk/tests-qvac/tests/shared/executors/download-executor.ts @@ -41,24 +41,20 @@ export class DownloadExecutor extends BaseExecutor { ); let progressEvents = 0; - const cancelledPromise = downloadAsset({ + const cancelledOp = downloadAsset({ assetSrc: OCR_CYRILLIC_RECOGNIZER, onProgress: (p: { downloadKey?: string; percentage: number }) => { progressEvents++; - if ( - !cancelTriggered && - p.downloadKey && - p.percentage >= cancelThreshold - ) { + if (!cancelTriggered && p.percentage >= cancelThreshold) { cancelTriggered = true; void cancel({ - operation: "downloadAsset", - downloadKey: p.downloadKey, + requestId: cancelledOp.requestId, clearCache: true, }); } }, - }).then( + }); + const cancelledPromise = cancelledOp.then( (id: string) => ({ status: "ok" as const, id }), (err: unknown) => ({ status: "fail" as const, diff --git a/packages/sdk/utils/errors-server.ts b/packages/sdk/utils/errors-server.ts index 75cb85387e..5abda3ec6b 100644 --- a/packages/sdk/utils/errors-server.ts +++ b/packages/sdk/utils/errors-server.ts @@ -298,6 +298,8 @@ export class CancelFailedError extends QvacErrorBase { } export class RequestIdConflictError extends QvacErrorBase { + readonly requestId: string; + constructor(requestId: string, cause?: unknown) { super( createErrorOptions( @@ -306,10 +308,24 @@ export class RequestIdConflictError extends QvacErrorBase { cause, ), ); + this.requestId = requestId; + } + + /** + * Surface typed fields on the RPC error envelope so the client-side + * reconstructor in `client/rpc/rpc-error.ts` can rebuild this exact + * class on the consumer side. Without this, `err instanceof + * RequestIdConflictError` would always be `false` after the error + * crosses the worker boundary. + */ + toErrorResponseFields(): Record { + return { requestId: this.requestId }; } } export class RequestNotFoundError extends QvacErrorBase { + readonly requestId: string; + constructor(requestId: string, cause?: unknown) { super( createErrorOptions( @@ -318,6 +334,11 @@ export class RequestNotFoundError extends QvacErrorBase { cause, ), ); + this.requestId = requestId; + } + + toErrorResponseFields(): Record { + return { requestId: this.requestId }; } } @@ -352,6 +373,15 @@ export class RequestRejectedByPolicyError extends QvacErrorBase { this.modelId = modelId; this.reason = reason; } + + toErrorResponseFields(): Record { + return { + requestId: this.requestId, + kind: this.kind, + modelId: this.modelId, + reason: this.reason, + }; + } } /**