diff --git a/packages/sdk/CHANGELOG.md b/packages/sdk/CHANGELOG.md index 043ef6a3fe..f3f678c387 100644 --- a/packages/sdk/CHANGELOG.md +++ b/packages/sdk/CHANGELOG.md @@ -94,7 +94,12 @@ const model = await modelRegistryGetModel(registryPath, registrySource); Build custom model integrations with the new plugin architecture. Plugins support both request/reply and streaming patterns. ```typescript -import { invokePlugin, invokePluginStream, definePlugin, defineHandler } from "@qvac/sdk"; +import { + invokePlugin, + invokePluginStream, + definePlugin, + defineHandler, +} from "@qvac/sdk"; // Invoke a plugin handler const result = await invokePlugin({ @@ -157,7 +162,7 @@ const modelId = await loadModel({ ttsLatentDenoiserSrc, ttsVoiceDecoderSrc, ttsVoiceSrc, - ttsSpeed: 1.0, // Playback speed + ttsSpeed: 1.0, // Playback speed ttsNumInferenceSteps: 5, // Quality vs speed tradeoff }, }); @@ -198,7 +203,7 @@ Map between engine names and addon types: import { resolveCanonicalEngine, getAddonFromEngine } from "@qvac/sdk"; const engine = resolveCanonicalEngine("@qvac/llm-llamacpp"); // "llamacpp-completion" -const addon = getAddonFromEngine("llamacpp-completion"); // "llm" +const addon = getAddonFromEngine("llamacpp-completion"); // "llm" ``` --- @@ -300,6 +305,7 @@ await ragSaveEmbeddings({ ``` Other RAG changes: + - `ragSaveEmbeddings` no longer returns `droppedIndices` - `ragDeleteEmbeddings` now returns `void` instead of `boolean` (throws on failure) - `ragDeleteEmbeddings` no longer requires `modelId` (uses cached workspace) @@ -365,7 +371,7 @@ const modelId = await loadModel({ modelSrc: MARIAN_OPUS_EN_IT_Q0F32, modelType: "nmt", modelConfig: { - engine: "Opus", // Required: "Opus" | "Bergamot" | "IndicTrans" + engine: "Opus", // Required: "Opus" | "Bergamot" | "IndicTrans" from: "en", to: "it", }, @@ -390,7 +396,7 @@ const modelId = await loadModel({ engine: "Bergamot", from: "en", to: "fr", - normalize: 1, // Bergamot-specific option + normalize: 1, // Bergamot-specific option }, }); @@ -433,7 +439,11 @@ const result = await blocks; await done; // Or stream blocks as they're detected -const { blockStream, done } = ocr({ modelId, image: imageBuffer, stream: true }); +const { blockStream, done } = ocr({ + modelId, + image: imageBuffer, + stream: true, +}); for await (const blocks of blockStream) { console.log(blocks); // [{ text: "Hello", bbox: [10, 20, 100, 50], confidence: 0.95 }] @@ -447,7 +457,8 @@ Load large models split across multiple files, from URLs or archives. ```typescript // Pattern-based sharded URLs (auto-detects shard pattern) await loadModel({ - modelSrc: "https://huggingface.co/user/model/resolve/main/model-00001-of-00003.gguf", + modelSrc: + "https://huggingface.co/user/model/resolve/main/model-00001-of-00003.gguf", modelType: "llm", }); @@ -568,10 +579,10 @@ The SDK searches for configuration in this order: #### Supported Formats -| Format | Filename | Notes | -| ---------- | ------------------ | ---------------------------- | -| JSON | `qvac.config.json` | Simplest option | -| JavaScript | `qvac.config.js` | Use `export default` | +| Format | Filename | Notes | +| ---------- | ------------------ | ----------------------------- | +| JSON | `qvac.config.json` | Simplest option | +| JavaScript | `qvac.config.js` | Use `export default` | | TypeScript | `qvac.config.ts` | Fully typed with `QvacConfig` | **TypeScript example:** @@ -592,7 +603,7 @@ export default config; 1. Remove all `setConfig()` calls from your code 2. Create a config file in your project root -3. *(Optional)* For non-standard locations, set `QVAC_CONFIG_PATH` before importing the SDK +3. _(Optional)_ For non-standard locations, set `QVAC_CONFIG_PATH` before importing the SDK --- @@ -602,14 +613,14 @@ Some model constants have been renamed for clarity, and duplicate constants have **Changes:** -| Before | After | -| -------------------------- | ------------------------------------------------- | -| `WHISPER_SMALL` | `WHISPER_SMALL_Q8` | -| `WHISPER_NORWEGIAN_TINY_1` | *(removed — use `WHISPER_NORWEGIAN_TINY`)* | -| `WHISPER_TINY_SILERO` | *(removed — use `WHISPER_TINY`)* | -| `MARIAN_OPUS_EN_FR_Q4_0_1` | *(removed — use `MARIAN_OPUS_EN_FR_Q4_0`)* | -| `MARIAN_OPUS_FR_EN_Q4_0_1` | *(removed — use `MARIAN_OPUS_FR_EN_Q4_0`)* | -| `MARIAN_OPUS_IT_EN` | *(removed — use `MARIAN_OPUS_EN_IT`)* | +| Before | After | +| -------------------------- | ------------------------------------------ | +| `WHISPER_SMALL` | `WHISPER_SMALL_Q8` | +| `WHISPER_NORWEGIAN_TINY_1` | _(removed — use `WHISPER_NORWEGIAN_TINY`)_ | +| `WHISPER_TINY_SILERO` | _(removed — use `WHISPER_TINY`)_ | +| `MARIAN_OPUS_EN_FR_Q4_0_1` | _(removed — use `MARIAN_OPUS_EN_FR_Q4_0`)_ | +| `MARIAN_OPUS_FR_EN_Q4_0_1` | _(removed — use `MARIAN_OPUS_FR_EN_Q4_0`)_ | +| `MARIAN_OPUS_IT_EN` | _(removed — use `MARIAN_OPUS_EN_IT`)_ | All model metadata and hyperdrive keys remain unchanged—only the constant names were affected. diff --git a/packages/sdk/client/api/completion-stream.ts b/packages/sdk/client/api/completion-stream.ts index 8ffeca8a15..8617c84cb2 100644 --- a/packages/sdk/client/api/completion-stream.ts +++ b/packages/sdk/client/api/completion-stream.ts @@ -9,6 +9,7 @@ import { type Tool, type ToolCallEvent, type ToolCallWithCall, + type RPCOptions, } from "@/schemas"; import { getMcpToolsWithHandlers } from "@/utils/mcp-adapter"; import { @@ -23,6 +24,7 @@ const logger = getClientLogger(); type CompletionParams = Omit & { tools?: Tool[] | ToolInput[]; mcp?: McpClientInput[]; + rpcOptions?: RPCOptions; }; /** @@ -154,7 +156,11 @@ export function completion(params: CompletionParams): { stream: params.stream ?? true, }; - const responses: AsyncGenerator = streamRpc(request); + const responses: AsyncGenerator = streamRpc( + request, + undefined, + params.rpcOptions, + ); for await (const response of responses) { if ( diff --git a/packages/sdk/client/api/embed.ts b/packages/sdk/client/api/embed.ts index 7c07eee8db..f3cced19a7 100644 --- a/packages/sdk/client/api/embed.ts +++ b/packages/sdk/client/api/embed.ts @@ -1,5 +1,9 @@ import { send } from "@/client/rpc/rpc-client"; -import { type EmbedParams, type EmbedRequest } from "@/schemas"; +import { + type EmbedParams, + type EmbedRequest, + type RPCOptions, +} from "@/schemas"; import { InvalidResponseError } from "@/utils/errors-client"; /** @@ -8,12 +12,13 @@ import { InvalidResponseError } from "@/utils/errors-client"; * @param params - The parameters for the embedding * @param params.modelId - The identifier of the embedding model to use * @param params.text - The input text to embed + * @param options - Optional RPC options including per-call profiling * @throws {QvacErrorBase} When the response type is invalid or when the embedding fails */ -export async function embed(params: { - modelId: string; - text: string; -}): Promise; +export async function embed( + params: { modelId: string; text: string }, + options?: RPCOptions, +): Promise; /** * Generates embeddings for multiple texts using a specified model. @@ -21,22 +26,24 @@ export async function embed(params: { * @param params - The parameters for the embedding * @param params.modelId - The identifier of the embedding model to use * @param params.text - The input texts to embed + * @param options - Optional RPC options including per-call profiling * @throws {QvacErrorBase} When the response type is invalid or when the embedding fails */ -export async function embed(params: { - modelId: string; - text: string[]; -}): Promise; +export async function embed( + params: { modelId: string; text: string[] }, + options?: RPCOptions, +): Promise; export async function embed( params: EmbedParams, + options?: RPCOptions, ): Promise { const request: EmbedRequest = { type: "embed", ...params, }; - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "embed") { throw new InvalidResponseError("embed"); } diff --git a/packages/sdk/client/api/invoke-plugin.ts b/packages/sdk/client/api/invoke-plugin.ts index 7569096baa..a619f03b97 100644 --- a/packages/sdk/client/api/invoke-plugin.ts +++ b/packages/sdk/client/api/invoke-plugin.ts @@ -3,7 +3,8 @@ import type { PluginInvokeRequest, PluginInvokeStreamRequest, PluginInvokeStreamResponse, -} from "@/schemas/plugin"; + RPCOptions, +} from "@/schemas"; import { InvalidResponseError } from "@/utils/errors-client"; export interface InvokePluginOptions { @@ -17,6 +18,7 @@ export interface InvokePluginOptions { */ export async function invokePlugin( options: InvokePluginOptions, + rpcOptions?: RPCOptions, ): Promise { const request: PluginInvokeRequest = { type: "pluginInvoke", @@ -25,7 +27,7 @@ export async function invokePlugin( params: options.params, }; - const response = await send(request); + const response = await send(request, undefined, rpcOptions); if (response.type !== "pluginInvoke") { throw new InvalidResponseError("pluginInvoke"); @@ -40,7 +42,10 @@ export async function invokePlugin( export async function* invokePluginStream< TResponse = unknown, TParams = unknown, ->(options: InvokePluginOptions): AsyncGenerator { +>( + options: InvokePluginOptions, + rpcOptions?: RPCOptions, +): AsyncGenerator { const request: PluginInvokeStreamRequest = { type: "pluginInvokeStream", modelId: options.modelId, @@ -48,7 +53,7 @@ export async function* invokePluginStream< params: options.params, }; - for await (const chunk of stream(request)) { + for await (const chunk of stream(request, undefined, rpcOptions)) { const response = chunk as PluginInvokeStreamResponse; if (response.type !== "pluginInvokeStream") { throw new InvalidResponseError("pluginInvokeStream"); diff --git a/packages/sdk/client/api/rag.ts b/packages/sdk/client/api/rag.ts index d43d149d24..dcb5881a24 100644 --- a/packages/sdk/client/api/rag.ts +++ b/packages/sdk/client/api/rag.ts @@ -18,6 +18,7 @@ import type { RagWorkspaceInfo, RagCloseWorkspaceParams, RagDeleteWorkspaceParams, + RPCOptions, } from "@/schemas"; import { InvalidResponseError, @@ -55,14 +56,17 @@ import { * }); * ``` */ -export async function ragChunk(params: RagChunkParams): Promise { +export async function ragChunk( + params: RagChunkParams, + options?: RPCOptions, +): Promise { const request: RagRequest = { type: "rag", operation: "chunk", ...params, }; - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "rag") { throw new InvalidResponseError("rag"); @@ -121,6 +125,7 @@ export async function ragChunk(params: RagChunkParams): Promise { */ export async function ragIngest( params: RagIngestParams, + options?: RPCOptions, ): Promise<{ processed: RagSaveEmbeddingsResult[]; droppedIndices: number[] }> { const { onProgress, ...requestParams } = params; @@ -134,7 +139,7 @@ export async function ragIngest( if (onProgress) { // Use streaming for progress updates - for await (const event of stream(request)) { + for await (const event of stream(request, undefined, options)) { if (event.type === "rag:progress" && event.operation === "ingest") { const progress: RagProgressUpdate = event; onProgress( @@ -160,7 +165,7 @@ export async function ragIngest( throw new StreamEndedError(); } - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "rag") { throw new InvalidResponseError("rag"); } @@ -215,6 +220,7 @@ export async function ragIngest( */ export async function ragSaveEmbeddings( params: RagSaveEmbeddingsParams, + options?: RPCOptions, ): Promise { const { onProgress, ...requestParams } = params; @@ -226,7 +232,7 @@ export async function ragSaveEmbeddings( }; if (onProgress) { - for await (const event of stream(request)) { + for await (const event of stream(request, undefined, options)) { if ( event.type === "rag:progress" && event.operation === "saveEmbeddings" @@ -252,7 +258,7 @@ export async function ragSaveEmbeddings( throw new StreamEndedError(); } - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "rag") { throw new InvalidResponseError("rag"); @@ -298,6 +304,7 @@ export async function ragSaveEmbeddings( */ export async function ragSearch( params: RagSearchParams, + options?: RPCOptions, ): Promise { const request: RagRequest = { type: "rag", @@ -307,7 +314,7 @@ export async function ragSearch( n: params.n ?? 3, }; - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "rag") { throw new InvalidResponseError("rag"); } @@ -345,6 +352,7 @@ export async function ragSearch( */ export async function ragDeleteEmbeddings( params: RagDeleteEmbeddingsParams, + options?: RPCOptions, ): Promise { const request: RagRequest = { type: "rag", @@ -352,7 +360,7 @@ export async function ragDeleteEmbeddings( ...params, }; - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "rag") { throw new InvalidResponseError("rag"); } @@ -408,6 +416,7 @@ export async function ragDeleteEmbeddings( */ export async function ragReindex( params: RagReindexParams, + options?: RPCOptions, ): Promise { const { onProgress, ...requestParams } = params; @@ -419,7 +428,7 @@ export async function ragReindex( }; if (onProgress) { - for await (const event of stream(request)) { + for await (const event of stream(request, undefined, options)) { if (event.type === "rag:progress" && event.operation === "reindex") { const progress: RagProgressUpdate = event; onProgress( @@ -442,7 +451,7 @@ export async function ragReindex( throw new StreamEndedError(); } - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "rag") { throw new InvalidResponseError("rag"); @@ -477,13 +486,15 @@ export async function ragReindex( * // [{ name: "default", open: true }, { name: "my-docs", open: false }] * ``` */ -export async function ragListWorkspaces(): Promise { +export async function ragListWorkspaces( + options?: RPCOptions, +): Promise { const request: RagRequest = { type: "rag", operation: "listWorkspaces", }; - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "rag") { throw new InvalidResponseError("rag"); @@ -525,6 +536,7 @@ export async function ragListWorkspaces(): Promise { */ export async function ragCloseWorkspace( params?: RagCloseWorkspaceParams, + options?: RPCOptions, ): Promise { const request: RagRequest = { type: "rag", @@ -532,7 +544,7 @@ export async function ragCloseWorkspace( ...params, }; - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "rag") { throw new InvalidResponseError("rag"); @@ -564,6 +576,7 @@ export async function ragCloseWorkspace( */ export async function ragDeleteWorkspace( params: RagDeleteWorkspaceParams, + options?: RPCOptions, ): Promise { const request: RagRequest = { type: "rag", @@ -571,7 +584,7 @@ export async function ragDeleteWorkspace( ...params, }; - const response = await send(request); + const response = await send(request, undefined, options); if (response.type !== "rag") { throw new InvalidResponseError("rag"); diff --git a/packages/sdk/client/api/text-to-speech.ts b/packages/sdk/client/api/text-to-speech.ts index 109f2dc430..41bf32fdec 100644 --- a/packages/sdk/client/api/text-to-speech.ts +++ b/packages/sdk/client/api/text-to-speech.ts @@ -2,10 +2,14 @@ import { ttsResponseSchema, type TtsClientParams, type TtsRequest, + type RPCOptions, } from "@/schemas"; import { stream as streamRpc } from "@/client/rpc/rpc-client"; -export function textToSpeech(params: TtsClientParams): { +export function textToSpeech( + params: TtsClientParams, + options?: RPCOptions, +): { bufferStream: AsyncGenerator; buffer: Promise; done: Promise; @@ -25,7 +29,7 @@ export function textToSpeech(params: TtsClientParams): { if (params.stream) { const bufferStream = (async function* () { - for await (const response of streamRpc(request)) { + for await (const response of streamRpc(request, undefined, options)) { if (response.type === "textToSpeech") { const streamResponse = ttsResponseSchema.parse(response); if (streamResponse.buffer.length > 0) { @@ -50,7 +54,7 @@ export function textToSpeech(params: TtsClientParams): { const bufferPromise = (async () => { let buffer: number[] = []; - for await (const response of streamRpc(request)) { + for await (const response of streamRpc(request, undefined, options)) { if (response.type === "textToSpeech") { const streamResponse = ttsResponseSchema.parse(response); buffer = buffer.concat(streamResponse.buffer); diff --git a/packages/sdk/client/api/transcribe.ts b/packages/sdk/client/api/transcribe.ts index 7be4152349..96cb33977a 100644 --- a/packages/sdk/client/api/transcribe.ts +++ b/packages/sdk/client/api/transcribe.ts @@ -2,6 +2,7 @@ import { transcribeStreamResponseSchema, type TranscribeStreamRequest, type TranscribeClientParams, + type RPCOptions, } from "@/schemas"; import { stream } from "@/client/rpc/rpc-client"; @@ -13,10 +14,14 @@ import { stream } from "@/client/rpc/rpc-client"; * @param params.modelId - The identifier of the transcription model to use * @param params.audioChunk - Audio input as either a file path (string) or audio buffer * @param params.prompt - Optional initial prompt to guide the transcription + * @param options - Optional RPC options including per-call profiling * @yields {string} Text chunks as they are transcribed * @throws {QvacErrorBase} When transcription fails with an error message */ -export async function* transcribeStream(params: TranscribeClientParams) { +export async function* transcribeStream( + params: TranscribeClientParams, + options?: RPCOptions, +) { const request: TranscribeStreamRequest = { type: "transcribeStream", modelId: params.modelId, @@ -27,7 +32,7 @@ export async function* transcribeStream(params: TranscribeClientParams) { ...(params.prompt && { prompt: params.prompt }), }; - for await (const response of stream(request)) { + for await (const response of stream(request, undefined, options)) { if (response.type === "transcribeStream") { const streamResponse = transcribeStreamResponseSchema.parse(response); @@ -50,14 +55,16 @@ export async function* transcribeStream(params: TranscribeClientParams) { * @param params.modelId - The identifier of the transcription model to use * @param params.audioChunk - Audio input as either a file path (string) or audio buffer * @param params.prompt - Optional initial prompt to guide the transcription + * @param options - Optional RPC options including per-call profiling * @returns {Promise} The complete transcribed text * @throws {QvacErrorBase} When transcription fails (propagated from transcribeStream) */ export async function transcribe( params: TranscribeClientParams, + options?: RPCOptions, ): Promise { let fullText = ""; - for await (const textChunk of transcribeStream(params)) { + for await (const textChunk of transcribeStream(params, options)) { fullText += textChunk; } return fullText; diff --git a/packages/sdk/client/api/translate.ts b/packages/sdk/client/api/translate.ts index e730f81ca4..4bc9d0ce6e 100644 --- a/packages/sdk/client/api/translate.ts +++ b/packages/sdk/client/api/translate.ts @@ -6,6 +6,7 @@ import { type TranslateRequest, type TranslateClientParams, type TranslationStats, + type RPCOptions, } from "@/schemas"; import { detectOne } from "@qvac/langdetect-text"; import { TranslationFailedError } from "@/utils/errors-client"; @@ -50,7 +51,10 @@ import { TranslationFailedError } from "@/utils/errors-client"; * console.log(await response.text); * ``` */ -export function translate(params: TranslateClientParams): { +export function translate( + params: TranslateClientParams, + options?: RPCOptions, +): { tokenStream: AsyncGenerator; stats: Promise; text: Promise; @@ -86,7 +90,7 @@ export function translate(params: TranslateClientParams): { if (params.stream) { const tokenStream = (async function* () { - for await (const response of streamRpc(request)) { + for await (const response of streamRpc(request, undefined, options)) { if (response.type === "translate") { const streamResponse = translateResponseSchema.parse(response); if (!streamResponse.done) { @@ -114,7 +118,7 @@ export function translate(params: TranslateClientParams): { const textPromise = (async () => { let buffer = ""; - for await (const response of streamRpc(request)) { + for await (const response of streamRpc(request, undefined, options)) { if (response.type === "translate") { const streamResponse = translateResponseSchema.parse(response); buffer += streamResponse.token; diff --git a/packages/sdk/client/rpc/node-rpc-client.ts b/packages/sdk/client/rpc/node-rpc-client.ts index 9611c6cf77..8738859b04 100644 --- a/packages/sdk/client/rpc/node-rpc-client.ts +++ b/packages/sdk/client/rpc/node-rpc-client.ts @@ -10,7 +10,6 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { fileURLToPath } from "node:url"; -import { RPCRequestNotSentError } from "@/utils/errors-client"; import { initializeConfig } from "@/client/init-hooks"; import { resolveConfig } from "@/client/config-loader/resolve-config.node"; import { getClientLogger } from "@/logging"; @@ -239,60 +238,8 @@ async function ensureRPC(): Promise { return rpc; } -const mockRPC = { - request: (command: number) => { - let sentData: { data: string; encoding: BufferEncoding } | null = null; - - return { - send: (data: string, encoding: BufferEncoding) => { - sentData = { data, encoding }; - }, - - reply: async (encoding: BufferEncoding): Promise => { - if (!sentData) { - throw new RPCRequestNotSentError(); - } - - const rpc = await ensureRPC(); - const req = rpc.request(command); - req.send( - sentData.data, - sentData.encoding as "utf-8" | "ascii" | "binary", - ); - - const response = await req.reply( - encoding as "utf-8" | "ascii" | "binary", - ); - return Buffer.isBuffer(response) - ? response - : Buffer.from(typeof response === "string" ? response : "", encoding); - }, - - createResponseStream: async function* () { - if (!sentData) { - throw new RPCRequestNotSentError(); - } - - const rpc = await ensureRPC(); - const req = rpc.request(command); - req.send( - sentData.data, - sentData.encoding as "utf-8" | "ascii" | "binary", - ); - const stream = req.createResponseStream({ - encoding: sentData.encoding as "utf-8" | "ascii" | "binary", - }); - - for await (const chunk of stream) { - yield chunk; - } - }, - }; - }, -}; - -export function getRPC() { - return mockRPC; +export async function getRPC() { + return ensureRPC(); } export async function close() { diff --git a/packages/sdk/client/rpc/profiling/index.ts b/packages/sdk/client/rpc/profiling/index.ts new file mode 100644 index 0000000000..f400962203 --- /dev/null +++ b/packages/sdk/client/rpc/profiling/index.ts @@ -0,0 +1 @@ +export * from "./profiler"; diff --git a/packages/sdk/client/rpc/profiling/profiler.ts b/packages/sdk/client/rpc/profiling/profiler.ts new file mode 100644 index 0000000000..d4d56d5336 --- /dev/null +++ b/packages/sdk/client/rpc/profiling/profiler.ts @@ -0,0 +1,214 @@ +import { + nowMs, + record, + recordPhase, + recordServerBreakdownPhases, + recordDelegationBreakdownPhases, + type BaseTimings, + type BaseEvent, +} from "@/profiling"; +import type { ProfilingResponseMeta } from "@/schemas"; +import { getGlobalSingleton } from "@/utils/global-singleton"; + +interface ConnectionTrackingState { + connectionTimeRecorded: boolean; + pendingConnectionTimeMs: number | undefined; +} + +const CONNECTION_TRACKING_STATE_KEY = Symbol.for( + "@qvac/sdk:rpc-connection-tracking-state", +); + +function getConnectionTrackingState(): ConnectionTrackingState { + return getGlobalSingleton(CONNECTION_TRACKING_STATE_KEY, () => { + return { + connectionTimeRecorded: false, + pendingConnectionTimeMs: undefined, + }; + }); +} + +interface BaseClientTimings extends BaseTimings { + requestZodValidationMs?: number; + requestStringifyMs?: number; + sendStart?: number; + requestEnd?: number; +} + +export interface ClientTimings extends BaseClientTimings { + firstResponseAt?: number; + responseJsonParseMs?: number; + responseZodValidationMs?: number; +} + +export interface ClientStreamTimings extends BaseClientTimings { + firstChunkAt?: number; + lastChunkAt?: number; + chunkCount: number; +} + +function recordRequestPhases( + base: BaseEvent, + timings: BaseClientTimings, +): void { + recordPhase(base, "request.zodValidation", timings.requestZodValidationMs); + recordPhase(base, "request.stringify", timings.requestStringifyMs); + if ( + timings.requestZodValidationMs !== undefined && + timings.requestStringifyMs !== undefined + ) { + recordPhase( + base, + "request.totalSerialization", + timings.requestZodValidationMs + timings.requestStringifyMs, + ); + } +} + +function createBaseTimings( + profileId: string, + requestType: string, +): BaseClientTimings { + return { profileId, requestType, requestStart: nowMs() }; +} + +export function createClientTimings( + profileId: string, + requestType: string, +): ClientTimings { + return createBaseTimings(profileId, requestType); +} + +export function createClientStreamTimings( + profileId: string, + requestType: string, +): ClientStreamTimings { + return { ...createBaseTimings(profileId, requestType), chunkCount: 0 }; +} + +export function cacheConnectionTime(durationMs: number): void { + const state = getConnectionTrackingState(); + if (state.connectionTimeRecorded) return; + if (state.pendingConnectionTimeMs !== undefined) return; + state.pendingConnectionTimeMs = durationMs; +} + +export function flushConnectionTime(): void { + const state = getConnectionTrackingState(); + if (state.connectionTimeRecorded) return; + if (state.pendingConnectionTimeMs === undefined) return; + + const durationMs = state.pendingConnectionTimeMs; + state.pendingConnectionTimeMs = undefined; + state.connectionTimeRecorded = true; + record({ + ts: nowMs(), + op: "rpc", + kind: "rpc", + phase: "connection", + ms: durationMs, + }); +} + +export function recordClientEvents( + timings: ClientTimings, + serverMeta?: ProfilingResponseMeta, +): void { + const now = nowMs(); + const totalClientTime = now - timings.requestStart; + const base: BaseEvent = { + ts: now, + op: timings.requestType, + kind: "rpc", + profileId: timings.profileId, + }; + + recordRequestPhases(base, timings); + + // For unary requests, this measures send → full response received + if ( + timings.sendStart !== undefined && + timings.firstResponseAt !== undefined + ) { + recordPhase( + base, + "serverWait", + timings.firstResponseAt - timings.sendStart, + ); + } + + recordPhase(base, "response.jsonParse", timings.responseJsonParseMs); + recordPhase(base, "response.zodValidation", timings.responseZodValidationMs); + if ( + timings.responseJsonParseMs !== undefined && + timings.responseZodValidationMs !== undefined + ) { + recordPhase( + base, + "response.totalParsing", + timings.responseJsonParseMs + timings.responseZodValidationMs, + ); + } + + recordPhase(base, "totalClientTime", totalClientTime); + + if (serverMeta?.server) { + recordServerBreakdownPhases(base, serverMeta.server, "server"); + if (serverMeta.server.totalServerMs !== undefined) { + recordPhase( + base, + "clientOverhead", + totalClientTime - serverMeta.server.totalServerMs, + ); + } + } + + if (serverMeta?.delegation) { + recordDelegationBreakdownPhases(base, serverMeta.delegation); + } +} + +export function recordClientStreamEvents( + timings: ClientStreamTimings, + serverMeta?: ProfilingResponseMeta, +): void { + const now = nowMs(); + const totalClientTime = now - timings.requestStart; + const base: BaseEvent = { + ts: now, + op: timings.requestType, + kind: "rpc", + profileId: timings.profileId, + }; + + recordRequestPhases(base, timings); + + if (timings.sendStart !== undefined && timings.firstChunkAt !== undefined) { + recordPhase(base, "ttfb", timings.firstChunkAt - timings.sendStart); + } + if (timings.firstChunkAt !== undefined && timings.lastChunkAt !== undefined) { + recordPhase( + base, + "streamDuration", + timings.lastChunkAt - timings.firstChunkAt, + ); + } + + recordPhase(base, "totalClientTime", totalClientTime, { + count: timings.chunkCount, + }); + + if (serverMeta?.server) { + recordServerBreakdownPhases(base, serverMeta.server, "server"); + } + + if (serverMeta?.delegation) { + recordDelegationBreakdownPhases(base, serverMeta.delegation); + } +} + +export function resetConnectionTracking(): void { + const state = getConnectionTrackingState(); + state.connectionTimeRecorded = false; + state.pendingConnectionTimeMs = undefined; +} diff --git a/packages/sdk/client/rpc/rpc-client.ts b/packages/sdk/client/rpc/rpc-client.ts index 4d75f1962f..371e197ffa 100644 --- a/packages/sdk/client/rpc/rpc-client.ts +++ b/packages/sdk/client/rpc/rpc-client.ts @@ -10,11 +10,33 @@ import { RPCError } from "./rpc-error"; import { withTimeout, withTimeoutStream } from "@/utils/withTimeout"; import { getClientLogger, summarizeRequest } from "@/logging"; import { getRPC, close as closeRPC } from "#rpc"; +import { + nowMs, + shouldProfile, + shouldIncludeServerBreakdown, + generateId as createProfileId, + createProfilingMeta, + createProfilingDisabledMeta, + injectProfilingMetaIntoObject, + extractProfilingMeta, + stripProfilingMeta, + recordFailure, +} from "@/profiling"; +import { + createClientTimings, + createClientStreamTimings, + recordClientEvents, + recordClientStreamEvents, + cacheConnectionTime, + flushConnectionTime, + resetConnectionTracking, +} from "./profiling"; const logger = getClientLogger(); let rpcInstance: Promise | null = null; let commandCounter = 0; +let firstConnectionPending = true; function getNextCommandId() { commandCounter = (commandCounter + 1) % Number.MAX_SAFE_INTEGER; @@ -27,22 +49,80 @@ function checkAndThrowError(response: Response): void { } } -function getRPCInstance(): Promise { - if (rpcInstance) return rpcInstance; +interface RPCResult { + rpc: RPC; + connectionMs?: number; +} + +async function getRPCInstance(): Promise { + if (rpcInstance) return { rpc: await rpcInstance }; + + const connectionStart = firstConnectionPending ? nowMs() : null; rpcInstance = getRPC() as unknown as Promise; - return rpcInstance; + const rpc = await rpcInstance; + + if (connectionStart !== null && firstConnectionPending) { + firstConnectionPending = false; + return { rpc, connectionMs: nowMs() - connectionStart }; + } + + return { rpc }; +} + +interface PreparedRPCContext { + rpc: RPC; + profilingEnabled: boolean; + signalDisable: boolean; +} + +async function prepareRPCContext( + requestType: Request["type"], + perCallProfiling: RPCOptions["profiling"] | undefined, + rpc?: RPC, +): Promise { + const rpcResult = rpc ? { rpc } : await getRPCInstance(); + const profilingEnabled = shouldProfile(requestType, perCallProfiling); + const signalDisable = perCallProfiling?.enabled === false; + + if (rpcResult.connectionMs !== undefined) { + cacheConnectionTime(rpcResult.connectionMs); + } + if (profilingEnabled) { + flushConnectionTime(); + } + + return { rpc: rpcResult.rpc, profilingEnabled, signalDisable }; } export async function send( request: T, rpc?: RPC, options?: RPCOptions, +): Promise { + const ctx = await prepareRPCContext(request.type, options?.profiling, rpc); + + if (!ctx.profilingEnabled) { + return sendBase(request, ctx.rpc, options, ctx.signalDisable); + } + return sendProfiled(request, ctx.rpc, options); +} + +async function sendBase( + request: T, + rpc: RPC, + options?: RPCOptions, + signalDisable: boolean = false, ): Promise { const parsedRequest = requestSchema.parse(request); - const rpcInstance = rpc || (await getRPCInstance()); - const req = rpcInstance.request(getNextCommandId()); + const req = rpc.request(getNextCommandId()); logger.debug("RPC Client sending:", summarizeRequest(request)); - const payload = JSON.stringify(parsedRequest); + const payloadObj = signalDisable + ? injectProfilingMetaIntoObject( + parsedRequest as Record, + createProfilingDisabledMeta(), + ) + : parsedRequest; + const payload = JSON.stringify(payloadObj); req.send(payload, "utf-8"); const response = await withTimeout(req.reply("utf-8"), options?.timeout); @@ -57,16 +137,106 @@ export async function send( return resPayload; } +async function sendProfiled( + request: T, + rpc: RPC, + options?: RPCOptions, +): Promise { + const requestType = request.type; + const profileId = createProfileId(); + const includeServer = shouldIncludeServerBreakdown(options?.profiling); + const timings = createClientTimings(profileId, requestType); + + try { + const zodStart = nowMs(); + const parsedRequest = requestSchema.parse(request); + timings.requestZodValidationMs = nowMs() - zodStart; + + const req = rpc.request(getNextCommandId()); + logger.debug("RPC Client sending:", summarizeRequest(request)); + + const profilingMeta = createProfilingMeta(profileId, includeServer); + const requestWithMeta = injectProfilingMetaIntoObject( + parsedRequest as Record, + profilingMeta, + ); + + const stringifyStart = nowMs(); + const payload = JSON.stringify(requestWithMeta); + timings.requestStringifyMs = nowMs() - stringifyStart; + + timings.sendStart = nowMs(); + req.send(payload, "utf-8"); + + const response = await withTimeout(req.reply("utf-8"), options?.timeout); + timings.firstResponseAt = nowMs(); + + const parseStart = nowMs(); + const rawParsed = JSON.parse(response?.toString() || "{}") as Record< + string, + unknown + >; + timings.responseJsonParseMs = nowMs() - parseStart; + + const responseMeta = extractProfilingMeta(rawParsed); + const cleanPayload = stripProfilingMeta(rawParsed); + + const zodValidateStart = nowMs(); + const resPayload = responseSchema.parse(cleanPayload); + timings.responseZodValidationMs = nowMs() - zodValidateStart; + + timings.requestEnd = nowMs(); + + logger.debug("ResPayload", { type: resPayload.type }); + + recordClientEvents(timings, responseMeta); + checkAndThrowError(resPayload); + + return resPayload; + } catch (error) { + if (timings.requestEnd === undefined) { + const base = { + ts: nowMs(), + op: timings.requestType, + kind: "rpc" as const, + profileId: timings.profileId, + }; + recordFailure(base, timings.requestStart, error); + } + throw error; + } +} + export async function* stream( request: T, rpc?: RPC, options: RPCOptions = {}, +): AsyncGenerator { + const ctx = await prepareRPCContext(request.type, options?.profiling, rpc); + + if (!ctx.profilingEnabled) { + yield* streamBase(request, ctx.rpc, options, ctx.signalDisable); + return; + } + yield* streamProfiled(request, ctx.rpc, options); +} + +async function* streamBase( + request: T, + rpc: RPC, + options: RPCOptions = {}, + signalDisable: boolean = false, ): AsyncGenerator { const parsedRequest = requestSchema.parse(request); - const rpcInstance = rpc || (await getRPCInstance()); - const req = rpcInstance.request(getNextCommandId()); + const req = rpc.request(getNextCommandId()); logger.debug("RPC Client streaming:", summarizeRequest(request)); - req.send(JSON.stringify(parsedRequest), "utf-8"); + const payloadObj = signalDisable + ? injectProfilingMetaIntoObject( + parsedRequest as Record, + createProfilingDisabledMeta(), + ) + : parsedRequest; + req.send(JSON.stringify(payloadObj), "utf-8"); const responseStream = req.createResponseStream({ encoding: "utf-8" }); let buffer = ""; @@ -101,8 +271,104 @@ export async function* stream( } } +async function* streamProfiled( + request: T, + rpc: RPC, + options: RPCOptions = {}, +): AsyncGenerator { + const requestType = request.type; + const profileId = createProfileId(); + const includeServer = shouldIncludeServerBreakdown(options?.profiling); + const timings = createClientStreamTimings(profileId, requestType); + let profilingMeta: ReturnType = undefined; + + try { + const zodStart = nowMs(); + const parsedRequest = requestSchema.parse(request); + timings.requestZodValidationMs = nowMs() - zodStart; + + const req = rpc.request(getNextCommandId()); + logger.debug("RPC Client streaming:", summarizeRequest(request)); + + const requestMeta = createProfilingMeta(profileId, includeServer); + const requestWithMeta = injectProfilingMetaIntoObject( + parsedRequest as Record, + requestMeta, + ); + + const stringifyStart = nowMs(); + const payload = JSON.stringify(requestWithMeta); + timings.requestStringifyMs = nowMs() - stringifyStart; + + timings.sendStart = nowMs(); + req.send(payload, "utf-8"); + + const responseStream = req.createResponseStream({ encoding: "utf-8" }); + let buffer = ""; + + async function* processStream(): AsyncGenerator { + for await (const chunk of responseStream as AsyncIterable) { + yield chunk; + } + } + + const streamWithTimeout = withTimeoutStream( + processStream(), + options?.timeout, + ); + + for await (const chunk of streamWithTimeout) { + const chunkTime = nowMs(); + if (timings.firstChunkAt === undefined) { + timings.firstChunkAt = chunkTime; + } + timings.lastChunkAt = chunkTime; + + buffer += chunk.toString(); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + + for (const line of lines) { + if (line.trim()) { + const rawParsed = JSON.parse(line) as Record; + + const chunkMeta = extractProfilingMeta(rawParsed); + if (chunkMeta) { + profilingMeta = chunkMeta; + } + + const cleanPayload = stripProfilingMeta(rawParsed); + const response = responseSchema.parse(cleanPayload); + + timings.chunkCount++; + checkAndThrowError(response); + yield response; + } + } + } + } catch (error) { + if (timings.requestEnd === undefined) { + const base = { + ts: nowMs(), + op: timings.requestType, + kind: "rpc" as const, + profileId: timings.profileId, + }; + recordFailure(base, timings.requestStart, error); + } + throw error; + } finally { + if (timings.chunkCount > 0) { + timings.requestEnd = nowMs(); + recordClientStreamEvents(timings, profilingMeta); + } + } +} + export async function close() { if (!rpcInstance) return; rpcInstance = null; + firstConnectionPending = true; + resetConnectionTracking(); await closeRPC(); } diff --git a/packages/sdk/examples/delegated-inference/consumer-profiled.ts b/packages/sdk/examples/delegated-inference/consumer-profiled.ts new file mode 100644 index 0000000000..bb4fcd05dc --- /dev/null +++ b/packages/sdk/examples/delegated-inference/consumer-profiled.ts @@ -0,0 +1,85 @@ +import { + close, + completion, + LLAMA_3_2_1B_INST_Q4_0, + loadModel, + profiler, +} from "@qvac/sdk"; + +const topicHex = process.argv[2]; +if (!topicHex) { + console.error("❌ Usage: bun run consumer-profiled.ts "); + process.exit(1); +} + +const providerPublicKey = process.argv[3]; +if (!providerPublicKey) { + console.error("❌ Usage: bun run consumer-profiled.ts "); + process.exit(1); +} + +try { + profiler.enable({ mode: "verbose", includeServerBreakdown: true }); + console.log("✓ Profiler enabled"); + + console.log(`\n📡 Topic: ${topicHex}`); + console.log(`🔑 Provider: ${providerPublicKey}\n`); + + console.log("→ Loading model (delegated, unary)..."); + const modelId = await loadModel({ + modelSrc: LLAMA_3_2_1B_INST_Q4_0, + modelType: "llm", + delegate: { + topic: topicHex, + providerPublicKey, + timeout: 30_000, + }, + }); + console.log(`✓ Model loaded: ${modelId}\n`); + + console.log("→ Running completion (delegated, streamed)..."); + const response = completion({ + modelId, + history: [{ role: "user", content: "Say hello in exactly 5 words." }], + stream: true, + }); + + process.stdout.write(" Response: "); + for await (const token of response.tokenStream) { + process.stdout.write(token); + } + await response.stats; + console.log("\n✓ Completion done\n"); + + console.log("=== Profiler Summary ==="); + console.log(profiler.exportSummary()); + + console.log("=== Profiler Table ==="); + console.log(profiler.exportTable()); + + // Look for delegation-specific metrics + const json = profiler.exportJSON(); + const delegationMetrics = Object.keys(json.aggregates).filter( + (k) => k.includes("delegation") || k.includes("delegated"), + ); + + if (delegationMetrics.length > 0) { + console.log("=== Delegation Metrics ==="); + for (const key of delegationMetrics) { + const agg = json.aggregates[key]; + if (agg) { + console.log(` ${key}: ${agg.avg.toFixed(2)}ms (count: ${agg.count})`); + } + } + } else { + console.log("⚠️ No delegation-specific metrics found"); + console.log(" (delegation profiling may need to be triggered)"); + } + + profiler.disable(); + void close(); +} catch (error) { + console.error("❌ Error:", error); + profiler.disable(); + process.exit(1); +} diff --git a/packages/sdk/examples/parallel-download.ts b/packages/sdk/examples/parallel-download.ts index 852f4e139b..d423f9081f 100644 --- a/packages/sdk/examples/parallel-download.ts +++ b/packages/sdk/examples/parallel-download.ts @@ -18,7 +18,10 @@ const assets = [ { name: "GTE Large FP16", src: GTE_LARGE_FP16 }, ]; -const timers: Record = {}; +const timers: Record< + string, + { start: number; firstProgress?: number; end?: number } +> = {}; console.log(`\n=== Parallel Download (${assets.length} assets) ===\n`); const wallStart = now(); @@ -59,14 +62,16 @@ try { const status = result.status === "fulfilled" ? "OK" : "FAILED"; const reason = result.status === "rejected" ? ` — ${result.reason}` : ""; - const timeToFirst = t.firstProgress != null - ? `${((t.firstProgress - t.start) / 1000).toFixed(1)}s` - : "N/A"; - const total = t.end != null - ? `${((t.end - t.start) / 1000).toFixed(1)}s` - : "N/A"; + const timeToFirst = + t.firstProgress != null + ? `${((t.firstProgress - t.start) / 1000).toFixed(1)}s` + : "N/A"; + const total = + t.end != null ? `${((t.end - t.start) / 1000).toFixed(1)}s` : "N/A"; - console.log(`${status} ${asset.name}: first-progress=${timeToFirst}, total=${total}${reason}`); + console.log( + `${status} ${asset.name}: first-progress=${timeToFirst}, total=${total}${reason}`, + ); } console.log(`\nWall time: ${((wallEnd - wallStart) / 1000).toFixed(1)}s`); diff --git a/packages/sdk/examples/profiling/basic.ts b/packages/sdk/examples/profiling/basic.ts new file mode 100644 index 0000000000..b3c84d31a0 --- /dev/null +++ b/packages/sdk/examples/profiling/basic.ts @@ -0,0 +1,57 @@ +import { + completion, + loadModel, + unloadModel, + LLAMA_3_2_1B_INST_Q4_0, + profiler, +} from "@qvac/sdk"; + +try { + // Enable profiling globally + profiler.enable({ + mode: "verbose", + includeServerBreakdown: true, + }); + console.log("Profiler enabled:", profiler.isEnabled()); + + const modelId = await loadModel({ + modelSrc: LLAMA_3_2_1B_INST_Q4_0, + modelType: "llm", + onProgress: (p) => console.log(` ${p.percentage.toFixed(1)}%`), + }); + console.log("Model loaded:", modelId); + + console.log("\n→ Running completion..."); + const result = completion({ + modelId, + history: [{ role: "user", content: "Say hello in one sentence." }], + stream: true, + }); + + for await (const token of result.tokenStream) { + process.stdout.write(token); + } + console.log(); + + await unloadModel({ modelId }); + + // Export profiling data + console.log("\n=== Profiler Summary ==="); + console.log(profiler.exportSummary()); + + console.log("\n=== Profiler Table ==="); + console.log(profiler.exportTable()); + + const json = profiler.exportJSON(); + console.log("\n=== Profiler JSON (structure) ==="); + console.log(" aggregates:", Object.keys(json.aggregates).length, "metrics"); + console.log(" recentEvents:", json.recentEvents?.length ?? 0, "events"); + console.log(" config:", json.config); + + // Disable profiling + profiler.disable(); + console.log("\nProfiler disabled:", !profiler.isEnabled()); +} catch (error) { + console.error("Error:", error); + process.exit(1); +} diff --git a/packages/sdk/examples/profiling/per-call.ts b/packages/sdk/examples/profiling/per-call.ts new file mode 100644 index 0000000000..030c42db28 --- /dev/null +++ b/packages/sdk/examples/profiling/per-call.ts @@ -0,0 +1,48 @@ +import { + embed, + loadModel, + unloadModel, + GTE_LARGE_FP16, + profiler, +} from "@qvac/sdk"; + +try { + profiler.disable(); + console.log("Profiler globally enabled:", profiler.isEnabled()); + + const modelId = await loadModel({ + modelSrc: GTE_LARGE_FP16, + modelType: "embeddings", + onProgress: (p) => console.log(` ${p.percentage.toFixed(1)}%`), + }); + console.log("Model loaded:", modelId); + + console.log("\n=== Embed with per-call profiling ==="); + const embedding1 = await embed( + { modelId, text: "Profile this specific call" }, + { profiling: { enabled: true, includeServerBreakdown: true } }, + ); + console.log("Embedding dimensions:", embedding1.length); + + console.log("\n=== Embed without profiling ==="); + const embedding2 = await embed({ + modelId, + text: "This call is not profiled", + }); + console.log("Embedding dimensions:", embedding2.length); + + console.log("\n=== Embed with profiling explicitly disabled ==="); + const embedding3 = await embed( + { modelId, text: "Profiling explicitly disabled for this call" }, + { profiling: { enabled: false } }, + ); + console.log("Embedding dimensions:", embedding3.length); + + await unloadModel({ modelId }); + + console.log("\n=== Profiler Summary (per-call data only) ==="); + console.log(profiler.exportSummary()); +} catch (error) { + console.error("Error:", error); + process.exit(1); +} diff --git a/packages/sdk/examples/transcription/parakeet-microphone-record.ts b/packages/sdk/examples/transcription/parakeet-microphone-record.ts index e706b6834e..913eb5dd81 100644 --- a/packages/sdk/examples/transcription/parakeet-microphone-record.ts +++ b/packages/sdk/examples/transcription/parakeet-microphone-record.ts @@ -64,7 +64,18 @@ console.log("Model loaded. Speak into your microphone (Ctrl+C to stop):\n"); const ffmpeg = spawn( "ffmpeg", - [...getAudioInputArgs(), "-ar", "16000", "-ac", "1", "-sample_fmt", "s16", "-f", "s16le", "pipe:1"], + [ + ...getAudioInputArgs(), + "-ar", + "16000", + "-ac", + "1", + "-sample_fmt", + "s16", + "-f", + "s16le", + "pipe:1", + ], { stdio: ["ignore", "pipe", "ignore"] }, ); @@ -88,7 +99,10 @@ ffmpeg.stdout.on("data", (chunk: Buffer) => { } } } catch (err) { - console.error("Transcription error:", err instanceof Error ? err.message : err); + console.error( + "Transcription error:", + err instanceof Error ? err.message : err, + ); } finally { processing = false; } diff --git a/packages/sdk/examples/transcription/parakeet-tdt-filesystem.ts b/packages/sdk/examples/transcription/parakeet-tdt-filesystem.ts index da9fd51d48..8367b12ff3 100644 --- a/packages/sdk/examples/transcription/parakeet-tdt-filesystem.ts +++ b/packages/sdk/examples/transcription/parakeet-tdt-filesystem.ts @@ -16,9 +16,7 @@ if (!args[0]) { "Usage: bun run examples/transcription/parakeet-tdt-filesystem.ts " + "[encoder-onnx] [encoder-data] [decoder-onnx] [vocab-txt] [preprocessor-onnx]", ); - console.error( - "\nIf model paths are omitted, defaults to registry models.", - ); + console.error("\nIf model paths are omitted, defaults to registry models."); process.exit(1); } @@ -45,9 +43,7 @@ try { parakeetPreprocessorSrc, }, onProgress: (progress) => { - console.log( - `Download progress: ${progress.percentage.toFixed(1)}%`, - ); + console.log(`Download progress: ${progress.percentage.toFixed(1)}%`); }, }); diff --git a/packages/sdk/index.ts b/packages/sdk/index.ts index ca836998d5..8d6f4bc654 100644 --- a/packages/sdk/index.ts +++ b/packages/sdk/index.ts @@ -86,6 +86,7 @@ export { PLUGIN_OCR, SDK_DEFAULT_PLUGINS, type BuiltinPlugin, + type ProfilerMode, } from "./schemas"; export { type ToolInput, type ToolHandler } from "./utils/tool-helpers"; @@ -101,3 +102,7 @@ export { SUPPORTED_AUDIO_FORMATS } from "./constants/audio"; // Logging exports export { getLogger, SDK_LOG_ID } from "./logging"; export type { Logger, LogTransport, LoggerOptions } from "./logging"; + +// Profiler exports +export { profiler } from "./profiling"; +export type { ProfilerRuntimeOptions, ProfilerExport } from "./profiling"; diff --git a/packages/sdk/models/update-models/index.ts b/packages/sdk/models/update-models/index.ts index 55d20af5c8..d245a4811e 100644 --- a/packages/sdk/models/update-models/index.ts +++ b/packages/sdk/models/update-models/index.ts @@ -66,11 +66,10 @@ async function checkOnly( } const addedWithNames = assignNames(rawAdded); - const { - added, - updated, - removed, - } = separateUpdates(addedWithNames, rawRemoved); + const { added, updated, removed } = separateUpdates( + addedWithNames, + rawRemoved, + ); console.log(""); console.log("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); @@ -168,8 +167,11 @@ async function updateModels( HISTORY_DIR, ); if (historyFile) { - const { added: trulyAdded, updated, removed: trulyRemoved } = - separateUpdates(addedWithNames, removed); + const { + added: trulyAdded, + updated, + removed: trulyRemoved, + } = separateUpdates(addedWithNames, removed); console.log(`📜 Created history file → ${historyFile}`); console.log( ` Added: ${trulyAdded.length}, Updated: ${updated.length}, Removed: ${trulyRemoved.length}`, diff --git a/packages/sdk/profiling/aggregator.ts b/packages/sdk/profiling/aggregator.ts new file mode 100644 index 0000000000..d46b9546b6 --- /dev/null +++ b/packages/sdk/profiling/aggregator.ts @@ -0,0 +1,136 @@ +/** + * Accumulates profiling statistics and maintains a bounded buffer of recent events. + * Uses a ring buffer for O(1) event insertion. + */ + +import type { ProfilingEvent, AggregatedStats } from "./types"; +import { getGlobalSingleton } from "@/utils/global-singleton"; +import { + createRingBuffer, + ringBufferPush, + ringBufferToArray, + ringBufferClear, + ringBufferResize, + ringBufferDroppedCount, + type RingBufferState, +} from "./ring-buffer"; + +const DEFAULT_MAX_RECENT_EVENTS = 1000; + +interface RollingStatsState { + count: number; + min: number; + max: number; + sum: number; + last: number; +} + +interface AggregatorState { + stats: Map; + eventBuffer: RingBufferState; +} + +const AGGREGATOR_STATE_KEY = Symbol.for("@qvac/sdk:profiler-aggregator-state"); + +function getAggregatorState(): AggregatorState { + return getGlobalSingleton(AGGREGATOR_STATE_KEY, () => { + return { + stats: new Map(), + eventBuffer: createRingBuffer(DEFAULT_MAX_RECENT_EVENTS), + }; + }); +} + +export function createAggregator( + maxRecentEvents: number = DEFAULT_MAX_RECENT_EVENTS, +): void { + const state = getAggregatorState(); + state.stats = new Map(); + state.eventBuffer = createRingBuffer(maxRecentEvents); +} + +function recordStats(key: string, value: number): void { + const aggregator = getAggregatorState(); + let statsState = aggregator.stats.get(key); + if (!statsState) { + statsState = { + count: 0, + min: Infinity, + max: -Infinity, + sum: 0, + last: 0, + }; + aggregator.stats.set(key, statsState); + } + + statsState.count++; + statsState.sum += value; + statsState.last = value; + if (value < statsState.min) statsState.min = value; + if (value > statsState.max) statsState.max = value; +} + +function getStats(state: RollingStatsState): AggregatedStats { + return { + count: state.count, + min: state.count > 0 ? state.min : 0, + max: state.count > 0 ? state.max : 0, + avg: state.count > 0 ? state.sum / state.count : 0, + sum: state.sum, + last: state.last, + }; +} + +export function recordEvent( + event: ProfilingEvent, + storeInBuffer: boolean = true, +): void { + const state = getAggregatorState(); + + if (storeInBuffer) { + ringBufferPush(state.eventBuffer, event); + } + + if (event.ms !== undefined) { + const key = event.phase ? `${event.op}.${event.phase}` : event.op; + recordStats(key, event.ms); + } +} + +export function getAggregates(): Record { + const state = getAggregatorState(); + const result: Record = {}; + for (const [key, statsState] of state.stats) { + result[key] = getStats(statsState); + } + return result; +} + +/** Returns events in chronological order (oldest first). */ +export function getRecentEvents(): ProfilingEvent[] { + return ringBufferToArray(getAggregatorState().eventBuffer); +} + +export function getEventCount(): number { + const state = getAggregatorState(); + let total = 0; + for (const statsState of state.stats.values()) { + total += statsState.count; + } + return total; +} + +export function getDroppedCount(): number { + return ringBufferDroppedCount(getAggregatorState().eventBuffer); +} + +export function clearAggregator(): void { + const state = getAggregatorState(); + state.stats.clear(); + ringBufferClear(state.eventBuffer); +} + +export function setMaxRecentEvents(max: number): void { + const state = getAggregatorState(); + state.eventBuffer = ringBufferResize(state.eventBuffer, max); +} diff --git a/packages/sdk/profiling/clock.ts b/packages/sdk/profiling/clock.ts new file mode 100644 index 0000000000..0a4bde9af5 --- /dev/null +++ b/packages/sdk/profiling/clock.ts @@ -0,0 +1,65 @@ +/** + * Runtime-safe monotonic clock for profiling. + * Priority: performance.now > process.hrtime > Date.now + */ + +export type ClockSource = "performance" | "hrtime" | "date"; + +let clockSource: ClockSource; +let nowMsImpl: () => number; + +// Detect and cache the best available clock source at module load +if ( + typeof globalThis !== "undefined" && + typeof globalThis.performance !== "undefined" && + typeof globalThis.performance.now === "function" +) { + clockSource = "performance"; + const perfNow = globalThis.performance.now.bind(globalThis.performance); + nowMsImpl = () => perfNow(); +} else if ( + typeof process !== "undefined" && + typeof process.hrtime === "function" && + typeof process.hrtime.bigint === "function" +) { + clockSource = "hrtime"; + const hrtime = process.hrtime.bigint.bind(process.hrtime); + const startNs = hrtime(); + nowMsImpl = () => Number(hrtime() - startNs) / 1_000_000; +} else { + clockSource = "date"; + const startMs = Date.now(); + nowMsImpl = () => Date.now() - startMs; +} + +export function getClockSource(): ClockSource { + return clockSource; +} + +export function isMonotonic(): boolean { + return clockSource === "performance" || clockSource === "hrtime"; +} + +export function nowMs(): number { + return nowMsImpl(); +} + +export async function measureAsync( + fn: () => Promise, +): Promise<[T, number]> { + const start = nowMs(); + const result = await fn(); + return [result, nowMs() - start]; +} + +export function measureSync(fn: () => T): [T, number] { + const start = nowMs(); + const result = fn(); + return [result, nowMs() - start]; +} + +export function generateProfileId(): string { + const timestamp = Date.now().toString(36); + const random = Math.random().toString(36).substring(2, 8); + return `${timestamp}-${random}`; +} diff --git a/packages/sdk/profiling/controller.ts b/packages/sdk/profiling/controller.ts new file mode 100644 index 0000000000..da6c3072f0 --- /dev/null +++ b/packages/sdk/profiling/controller.ts @@ -0,0 +1,180 @@ +/** + * Central state management for profiling enablement, configuration, and recording. + * + * Precedence (highest to lowest): + * 1. Per-call override + * 2. Runtime API (enable/disable) + * 3. Disabled default + */ + +import type { ProfilerMode, PerCallProfiling } from "@/schemas"; +import { getGlobalSingleton } from "@/utils/global-singleton"; +import type { + ProfilerRuntimeOptions, + ProfilingEvent, + AggregatedStats, +} from "./types"; +import { + createAggregator, + recordEvent as aggregatorRecord, + getAggregates as aggregatorGetAggregates, + getRecentEvents as aggregatorGetRecentEvents, + clearAggregator, + setMaxRecentEvents, +} from "./aggregator"; +import { nowMs, generateProfileId } from "./clock"; + +export interface ResolvedProfilerConfig { + enabled: boolean; + mode: ProfilerMode; + includeServerBreakdown: boolean; + operationFilters: string[]; + maxRecentEvents: number; +} + +const DEFAULT_CONFIG: ResolvedProfilerConfig = { + enabled: false, + mode: "summary", + includeServerBreakdown: false, + operationFilters: [], + maxRecentEvents: 1000, +}; + +type RecordCallback = (event: ProfilingEvent) => void; + +interface ControllerState { + runtimeOverride: boolean | undefined; + runtimeOptions: ProfilerRuntimeOptions; + onRecordCallbacks: RecordCallback[]; + initialized: boolean; +} + +const CONTROLLER_STATE_KEY = Symbol.for("@qvac/sdk:profiler-controller-state"); + +function getControllerState(): ControllerState { + const state = getGlobalSingleton(CONTROLLER_STATE_KEY, () => { + return { + runtimeOverride: undefined, + runtimeOptions: {}, + onRecordCallbacks: [], + initialized: false, + }; + }); + if (!state.initialized) { + createAggregator(DEFAULT_CONFIG.maxRecentEvents); + state.initialized = true; + } + + return state; +} + +export function enable(options?: ProfilerRuntimeOptions): void { + const state = getControllerState(); + state.runtimeOverride = true; + state.runtimeOptions = options ? { ...options } : {}; + clearAggregator(); + setMaxRecentEvents(DEFAULT_CONFIG.maxRecentEvents); +} + +export function disable(): void { + const state = getControllerState(); + state.runtimeOverride = false; + state.runtimeOptions = {}; + setMaxRecentEvents(DEFAULT_CONFIG.maxRecentEvents); +} + +export function isEnabled(): boolean { + const state = getControllerState(); + return state.runtimeOverride ?? false; +} + +export function getEffectiveConfig(): ResolvedProfilerConfig { + const state = getControllerState(); + return { + enabled: isEnabled(), + mode: state.runtimeOptions.mode ?? DEFAULT_CONFIG.mode, + includeServerBreakdown: + state.runtimeOptions.includeServerBreakdown ?? + DEFAULT_CONFIG.includeServerBreakdown, + operationFilters: [ + ...(state.runtimeOptions.operationFilters ?? DEFAULT_CONFIG.operationFilters), + ], + maxRecentEvents: DEFAULT_CONFIG.maxRecentEvents, + }; +} + +export function shouldProfile( + operation: string, + perCallOptions?: PerCallProfiling, +): boolean { + if (perCallOptions?.enabled !== undefined) { + return perCallOptions.enabled; + } + + if (!isEnabled()) { + return false; + } + + const config = getEffectiveConfig(); + if (config.operationFilters.length > 0) { + return config.operationFilters.includes(operation); + } + + return true; +} + +export function shouldIncludeServerBreakdown( + perCallOptions?: PerCallProfiling, +): boolean { + if (perCallOptions?.includeServerBreakdown !== undefined) { + return perCallOptions.includeServerBreakdown; + } + return getEffectiveConfig().includeServerBreakdown; +} + +export function generateId(): string { + return generateProfileId(); +} + +export function record(event: ProfilingEvent): void { + const state = getControllerState(); + let eventWithTs = event; + if (event.ts === undefined) { + eventWithTs = { ...event, ts: nowMs() }; + } + + const storeInBuffer = getEffectiveConfig().mode === "verbose"; + aggregatorRecord(eventWithTs, storeInBuffer); + + for (const cb of state.onRecordCallbacks) { + try { + cb(eventWithTs); + } catch { + // Callback errors should not break profiling + } + } +} + +/** Returns unsubscribe function. */ +export function onRecord(callback: RecordCallback): () => void { + const state = getControllerState(); + state.onRecordCallbacks.push(callback); + return () => { + const idx = state.onRecordCallbacks.indexOf(callback); + if (idx >= 0) { + state.onRecordCallbacks.splice(idx, 1); + } + }; +} + +export function getAggregates(): Record { + return aggregatorGetAggregates(); +} + +export function getRecentEvents(): ProfilingEvent[] { + return aggregatorGetRecentEvents(); +} + +export function clear(): void { + clearAggregator(); +} diff --git a/packages/sdk/profiling/envelope.ts b/packages/sdk/profiling/envelope.ts new file mode 100644 index 0000000000..6e3fe854d2 --- /dev/null +++ b/packages/sdk/profiling/envelope.ts @@ -0,0 +1,59 @@ +/** + * Helpers to inject/extract __profiling metadata in RPC payloads. + */ + +import { + PROFILING_KEY, + type ProfilingRequestMeta, + type ProfilingResponseMeta, +} from "@/schemas"; + +export function createProfilingMeta( + profileId: string, + includeServerBreakdown: boolean, +): ProfilingRequestMeta { + return { + enabled: true, + id: profileId, + includeServer: includeServerBreakdown, + }; +} + +export function createProfilingDisabledMeta(): ProfilingRequestMeta { + return { enabled: false }; +} + +export function injectProfilingMetaIntoObject( + obj: Record, + meta: ProfilingRequestMeta, +): Record { + return { ...obj, [PROFILING_KEY]: meta }; +} + +export function extractProfilingMeta( + payload: unknown, +): ProfilingResponseMeta | undefined { + if ( + typeof payload === "object" && + payload !== null && + PROFILING_KEY in payload + ) { + const meta = (payload as Record)[PROFILING_KEY]; + if (typeof meta === "object" && meta !== null) { + return meta as ProfilingResponseMeta; + } + } + return undefined; +} + +export function stripProfilingMeta(payload: T): T { + if (PROFILING_KEY in payload) { + const { [PROFILING_KEY]: _unused, ...rest } = payload as Record< + string, + unknown + >; + void _unused; + return rest as T; + } + return payload; +} diff --git a/packages/sdk/profiling/events.ts b/packages/sdk/profiling/events.ts new file mode 100644 index 0000000000..8cf7c03e4a --- /dev/null +++ b/packages/sdk/profiling/events.ts @@ -0,0 +1,93 @@ +import { nowMs } from "./clock"; +import { record } from "./controller"; +import type { ProfilingEventKind } from "./types"; +import type { ServerBreakdown, DelegationBreakdown } from "@/schemas"; + +export interface BaseTimings { + profileId: string; + requestType: string; + requestStart: number; +} + +export interface BaseEvent { + ts: number; + op: string; + kind: ProfilingEventKind; + profileId: string; +} + +export function recordPhase( + base: BaseEvent, + phase: string, + ms?: number, + extra?: { count?: number }, +): void { + if (ms === undefined) return; + record({ ...base, phase, ms, ...extra }); +} + +export function recordFailure( + base: BaseEvent, + startTime: number, + error: unknown, +): void { + const now = nowMs(); + record({ + ...base, + ts: now, + phase: "failed", + ms: now - startTime, + tags: { + error: error instanceof Error ? error.name : "Unknown", + message: + error instanceof Error + ? error.message.slice(0, 100) + : String(error).slice(0, 100), + }, + }); +} + +export function recordServerBreakdownPhases( + base: BaseEvent, + server: ServerBreakdown, + prefix: string = "server", +): void { + recordPhase(base, `${prefix}.request.jsonParse`, server.requestJsonParseMs); + recordPhase( + base, + `${prefix}.request.zodValidation`, + server.requestZodValidationMs, + ); + recordPhase(base, `${prefix}.handlerExecution`, server.handlerExecutionMs); + recordPhase( + base, + `${prefix}.response.zodValidation`, + server.responseZodValidationMs, + ); + recordPhase(base, `${prefix}.response.stringify`, server.responseStringifyMs); + recordPhase(base, `${prefix}.totalServerTime`, server.totalServerMs); +} + +export function recordDelegationBreakdownPhases( + base: BaseEvent, + delegation: DelegationBreakdown, + prefix: string = "delegation", +): void { + recordPhase(base, `${prefix}.connection`, delegation.connectionMs); + recordPhase( + base, + `${prefix}.request.stringify`, + delegation.requestStringifyMs, + ); + recordPhase(base, `${prefix}.serverWait`, delegation.serverWaitMs); + recordPhase( + base, + `${prefix}.response.jsonParse`, + delegation.responseJsonParseMs, + ); + recordPhase( + base, + `${prefix}.totalDelegationTime`, + delegation.totalDelegationMs, + ); +} diff --git a/packages/sdk/profiling/exporters.ts b/packages/sdk/profiling/exporters.ts new file mode 100644 index 0000000000..700ce882e2 --- /dev/null +++ b/packages/sdk/profiling/exporters.ts @@ -0,0 +1,217 @@ +/** + * JSON, table, and summary export utilities for profiling data. + */ + +import type { ProfilerExport, AggregatedStats } from "./types"; +import { + getEffectiveConfig, + getAggregates, + getRecentEvents, +} from "./controller"; +import { getDroppedCount, getEventCount } from "./aggregator"; +import { nowMs, getClockSource, isMonotonic } from "./clock"; + +export function exportJSON(options?: { + includeRecentEvents?: boolean; +}): ProfilerExport { + const config = getEffectiveConfig(); + const includeRecent = options?.includeRecentEvents ?? true; + + const result: ProfilerExport = { + config, + aggregates: getAggregates(), + exportedAt: nowMs(), + }; + + if (includeRecent && config.mode === "verbose") { + result.recentEvents = getRecentEvents(); + } + + return result; +} + +function aggregateMatchingStats( + aggregates: Record, + predicate: (key: string) => boolean, +): AggregatedStats | undefined { + const matches = Object.entries(aggregates).filter(([key]) => predicate(key)); + if (matches.length === 0) { + return undefined; + } + + let count = 0; + let sum = 0; + let min = Infinity; + let max = -Infinity; + let last = 0; + + for (const [, stats] of matches) { + count += stats.count; + sum += stats.sum; + if (stats.count > 0) { + if (stats.min < min) min = stats.min; + if (stats.max > max) max = stats.max; + last = stats.last; + } + } + + return { + count, + min: count > 0 ? min : 0, + max: count > 0 ? max : 0, + avg: count > 0 ? sum / count : 0, + sum, + last, + }; +} + +function formatDuration(ms: number): string { + if (ms < 1) { + return `${(ms * 1000).toFixed(0)}μs`; + } + if (ms < 1000) { + return `${ms.toFixed(1)}ms`; + } + if (ms < 60000) { + return `${(ms / 1000).toFixed(2)}s`; + } + return `${(ms / 60000).toFixed(2)}m`; +} + +function formatNumber(n: number): string { + if (Number.isInteger(n)) { + return n.toLocaleString(); + } + if (n === 0) return "0"; + if (Math.abs(n) < 0.01) return n.toExponential(2); + if (Math.abs(n) < 1) return n.toFixed(3); + if (Math.abs(n) < 100) return n.toFixed(2); + return n.toFixed(1); +} + +function pad(s: string, len: number, align: "left" | "right" = "left"): string { + if (s.length >= len) return s.substring(0, len); + const spaces = " ".repeat(len - s.length); + return align === "left" ? s + spaces : spaces + s; +} + +export function exportTable(): string { + const aggregates = getAggregates(); + const entries = Object.entries(aggregates); + + if (entries.length === 0) { + return "No profiling data recorded."; + } + + const metricWidth = 40; + const numWidth = 12; + + const header = [ + pad("Metric", metricWidth), + pad("Count", numWidth, "right"), + pad("Min", numWidth, "right"), + pad("Max", numWidth, "right"), + pad("Avg", numWidth, "right"), + pad("Total", numWidth, "right"), + ].join(" | "); + + const separator = "-".repeat(header.length); + + const rows = entries + .sort((a, b) => b[1].sum - a[1].sum) + .map(([metric, stats]) => { + return [ + pad(metric, metricWidth), + pad(formatNumber(stats.count), numWidth, "right"), + pad(formatDuration(stats.min), numWidth, "right"), + pad(formatDuration(stats.max), numWidth, "right"), + pad(formatDuration(stats.avg), numWidth, "right"), + pad(formatDuration(stats.sum), numWidth, "right"), + ].join(" | "); + }); + + return [separator, header, separator, ...rows, separator].join("\n"); +} + +export function exportSummary(): string { + const aggregates = getAggregates(); + const config = getEffectiveConfig(); + const eventCount = getEventCount(); + const droppedCount = getDroppedCount(); + + const lines: string[] = [ + "=".repeat(60), + "PROFILER SUMMARY", + "=".repeat(60), + "", + "Session:", + ` Status: ${config.enabled ? "enabled" : "disabled"}`, + ` Mode: ${config.mode}`, + ` Clock: ${getClockSource()} (monotonic: ${isMonotonic()})`, + ` Events: ${eventCount.toLocaleString()}`, + ` Dropped: ${droppedCount.toLocaleString()}`, + "", + ]; + + const keyMetrics = [ + { + label: "RPC Total", + stats: aggregateMatchingStats(aggregates, (key) => + key.endsWith(".totalClientTime"), + ), + }, + { + label: "Handler", + stats: aggregateMatchingStats( + aggregates, + (key) => key.endsWith(".server.handlerExecution") || !key.includes("."), + ), + }, + { + label: "Model Load", + stats: aggregateMatchingStats( + aggregates, + (key) => key === "load.totalTime" || key.endsWith(".load.totalTime"), + ), + }, + { + label: "Download", + stats: aggregateMatchingStats( + aggregates, + (key) => key === "download.time" || key.endsWith(".download.time"), + ), + }, + ]; + + const hasMetrics = keyMetrics.some((m) => m.stats); + + if (hasMetrics) { + lines.push("Key Metrics:"); + lines.push("-".repeat(60)); + lines.push( + " " + + "Metric".padEnd(18) + + "Samples".padStart(10) + + "Avg".padStart(12) + + "Total".padStart(12), + ); + lines.push("-".repeat(60)); + + for (const { label, stats } of keyMetrics) { + if (stats) { + const name = label.padEnd(18); + const samples = String(stats.count).padStart(10); + const avg = formatDuration(stats.avg).padStart(12); + const total = formatDuration(stats.sum).padStart(12); + lines.push(` ${name}${samples}${avg}${total}`); + } + } + } else { + lines.push("No metrics recorded yet."); + } + + lines.push(""); + lines.push("=".repeat(60)); + + return lines.join("\n"); +} diff --git a/packages/sdk/profiling/index.ts b/packages/sdk/profiling/index.ts new file mode 100644 index 0000000000..aaefed061c --- /dev/null +++ b/packages/sdk/profiling/index.ts @@ -0,0 +1,71 @@ +/** + * QVAC SDK Profiler + * + * @example + * ```ts + * import { profiler } from "@qvac/sdk"; + * + * profiler.enable({ mode: "summary" }); + * // ... run SDK operations ... + * console.log(profiler.exportTable()); + * profiler.disable(); + * ``` + */ + +import * as controller from "./controller"; +import * as exporters from "./exporters"; +import type { + ProfilerRuntimeOptions, + ProfilingEvent, + ProfilerExport, + AggregatedStats, +} from "./types"; + +export const profiler = { + enable: (options?: ProfilerRuntimeOptions) => controller.enable(options), + disable: () => controller.disable(), + isEnabled: () => controller.isEnabled(), + exportJSON: (options?: { includeRecentEvents?: boolean }): ProfilerExport => + exporters.exportJSON(options), + exportTable: () => exporters.exportTable(), + exportSummary: () => exporters.exportSummary(), + onRecord: (callback: (event: ProfilingEvent) => void) => + controller.onRecord(callback), + getConfig: () => controller.getEffectiveConfig(), + getAggregates: (): Record => + controller.getAggregates(), + clear: () => controller.clear(), +}; + +export type { + ProfilerRuntimeOptions, + ProfilingEvent, + ProfilerExport, + AggregatedStats, + ProfilingEventKind, +} from "./types"; +export type { ProfilerMode } from "@/schemas"; +export { nowMs } from "./clock"; +export { + record, + shouldProfile, + shouldIncludeServerBreakdown, + generateId, + isEnabled, + type ResolvedProfilerConfig, +} from "./controller"; +export { + createProfilingMeta, + createProfilingDisabledMeta, + injectProfilingMetaIntoObject, + extractProfilingMeta, + stripProfilingMeta, +} from "./envelope"; +export { + recordPhase, + recordFailure, + recordServerBreakdownPhases, + recordDelegationBreakdownPhases, + type BaseTimings, + type BaseEvent, +} from "./events"; diff --git a/packages/sdk/profiling/ring-buffer.ts b/packages/sdk/profiling/ring-buffer.ts new file mode 100644 index 0000000000..f4d7025489 --- /dev/null +++ b/packages/sdk/profiling/ring-buffer.ts @@ -0,0 +1,120 @@ +/** + * O(1) bounded buffer for event storage. + * Oldest events are overwritten when full. + */ + +export interface RingBufferState { + buffer: (T | undefined)[]; + capacity: number; + head: number; + size: number; + totalPushed: number; +} + +export function createRingBuffer(capacity: number): RingBufferState { + if (capacity < 1) { + throw new Error("Ring buffer capacity must be at least 1"); + } + + return { + buffer: new Array(capacity), + capacity, + head: 0, + size: 0, + totalPushed: 0, + }; +} + +// Returns the overwritten item if buffer was full, undefined otherwise. +export function ringBufferPush( + state: RingBufferState, + item: T, +): T | undefined { + const overwritten = + state.size === state.capacity ? state.buffer[state.head] : undefined; + + state.buffer[state.head] = item; + state.head = (state.head + 1) % state.capacity; + state.size = Math.min(state.size + 1, state.capacity); + state.totalPushed++; + + return overwritten; +} + +// Returns all items in chronological order (oldest first). +export function ringBufferToArray(state: RingBufferState): T[] { + if (state.size === 0) { + return []; + } + + const result: T[] = []; + const start = + state.size === state.capacity + ? state.head // Full: head points to oldest + : 0; + + for (let i = 0; i < state.size; i++) { + const index = (start + i) % state.capacity; + const item = state.buffer[index]; + if (item !== undefined) { + result.push(item); + } + } + + return result; +} + +// Returns the most recent N items (newest first). +export function ringBufferGetRecent( + state: RingBufferState, + count: number, +): T[] { + if (state.size === 0 || count <= 0) { + return []; + } + + const actualCount = Math.min(count, state.size); + const result: T[] = []; + + for (let i = 0; i < actualCount; i++) { + const index = (state.head - 1 - i + state.capacity) % state.capacity; + const item = state.buffer[index]; + if (item !== undefined) { + result.push(item); + } + } + + return result; +} + +export function ringBufferClear(state: RingBufferState): void { + state.buffer = new Array(state.capacity); + state.head = 0; + state.size = 0; + state.totalPushed = 0; +} + +export function ringBufferDroppedCount(state: RingBufferState): number { + return Math.max(0, state.totalPushed - state.capacity); +} + +// Resizes the buffer, keeping the most recent items. +export function ringBufferResize( + state: RingBufferState, + newCapacity: number, +): RingBufferState { + if (newCapacity < 1) { + throw new Error("Ring buffer capacity must be at least 1"); + } + + const items = ringBufferToArray(state); + const itemsToKeep = items.slice(-newCapacity); + const newState = createRingBuffer(newCapacity); + + for (const item of itemsToKeep) { + ringBufferPush(newState, item); + } + + newState.totalPushed = state.totalPushed; + return newState; +} diff --git a/packages/sdk/profiling/types.ts b/packages/sdk/profiling/types.ts new file mode 100644 index 0000000000..f3998823fd --- /dev/null +++ b/packages/sdk/profiling/types.ts @@ -0,0 +1,51 @@ +export type ProfilingEventKind = + | "rpc" + | "handler" + | "download" + | "load" + | "delegation"; + +export interface ProfilingEvent { + /** Timestamp when event was recorded (monotonic ms) */ + ts: number; + op: string; + kind: ProfilingEventKind; + profileId?: string; + phase?: string; + ms?: number; + /** Count (e.g., chunks, tokens) */ + count?: number; + bytes?: number; + /** Numeric gauges (e.g., throughput, token counters) */ + gauges?: Record; + /** String tags (e.g., handlerType, sourceType, modelId) */ + tags?: Record; +} + +export interface ProfilerRuntimeOptions { + mode?: "summary" | "verbose"; + includeServerBreakdown?: boolean; + operationFilters?: string[]; +} + +export interface AggregatedStats { + count: number; + min: number; + max: number; + avg: number; + sum: number; + last: number; +} + +export interface ProfilerExport { + config: { + enabled: boolean; + mode: "summary" | "verbose"; + includeServerBreakdown: boolean; + operationFilters: string[]; + maxRecentEvents: number; + }; + aggregates: Record; + recentEvents?: ProfilingEvent[]; + exportedAt: number; +} diff --git a/packages/sdk/schemas/common.ts b/packages/sdk/schemas/common.ts index e24e4066c8..5d22c9c029 100644 --- a/packages/sdk/schemas/common.ts +++ b/packages/sdk/schemas/common.ts @@ -1,4 +1,5 @@ import { z } from "zod"; +import { perCallProfilingSchema } from "./profiling"; import { pingRequestSchema, pingResponseSchema } from "./ping"; import { completionStreamRequestSchema, @@ -120,6 +121,7 @@ export const responseSchema = z.discriminatedUnion("type", [ export const rpcOptionsSchema = z.object({ timeout: z.number().min(100).optional(), forceNewConnection: z.boolean().optional(), + profiling: perCallProfilingSchema.optional(), }); export type Request = z.infer; diff --git a/packages/sdk/schemas/index.ts b/packages/sdk/schemas/index.ts index 053674ad67..6970117df6 100644 --- a/packages/sdk/schemas/index.ts +++ b/packages/sdk/schemas/index.ts @@ -38,6 +38,22 @@ export { type DeviceConfigDefaults, type DevicePattern, } from "./sdk-config"; +export { + PROFILING_KEY, + DELEGATION_BREAKDOWN_KEY, + profilerModeSchema, + serverBreakdownSchema, + delegationBreakdownSchema, + profilingRequestMetaSchema, + profilingResponseMetaSchema, + perCallProfilingSchema, + type ProfilerMode, + type ServerBreakdown, + type DelegationBreakdown, + type ProfilingRequestMeta, + type ProfilingResponseMeta, + type PerCallProfiling, +} from "./profiling"; export { runtimeContextSchema, type RuntimeContext } from "./runtime-context"; export * from "./get-model-info"; export * from "./model-src-utils"; diff --git a/packages/sdk/schemas/llamacpp-config.ts b/packages/sdk/schemas/llamacpp-config.ts index 694a1dd8cb..fde7e52071 100644 --- a/packages/sdk/schemas/llamacpp-config.ts +++ b/packages/sdk/schemas/llamacpp-config.ts @@ -71,7 +71,9 @@ export const embedConfigBaseSchema = z.object({ attention: z.enum(["causal", "non-causal"]).optional(), embdNormalize: z.number().int().optional(), flashAttention: z.enum(["on", "off", "auto"]).optional(), - mainGpu: z.union([z.number().int().min(0), z.enum(["integrated", "dedicated"])]).optional(), + mainGpu: z + .union([z.number().int().min(0), z.enum(["integrated", "dedicated"])]) + .optional(), verbosity: verbositySchema.optional(), }); diff --git a/packages/sdk/schemas/load-model.ts b/packages/sdk/schemas/load-model.ts index 93fd0ca7ed..f63369b84c 100644 --- a/packages/sdk/schemas/load-model.ts +++ b/packages/sdk/schemas/load-model.ts @@ -329,14 +329,13 @@ export const loadOcrModelRequestSchema = commonModelConfigSchema .strict(); // Custom plugin catch-all: accepts any modelType string EXCEPT built-ins -export const loadCustomPluginModelRequestSchema = commonModelConfigSchema.extend( - { +export const loadCustomPluginModelRequestSchema = + commonModelConfigSchema.extend({ modelType: z.string().refine((val) => !builtInModelTypes.has(val), { message: "Built-in model types must use their specific schema", }), modelConfig: z.record(z.string(), z.unknown()).optional(), - }, -); + }); // Union of all load model request types (using z.union since each modelType accepts multiple values) export const loadModelSrcRequestSchema = z diff --git a/packages/sdk/schemas/profiling.ts b/packages/sdk/schemas/profiling.ts new file mode 100644 index 0000000000..dcc2ff4220 --- /dev/null +++ b/packages/sdk/schemas/profiling.ts @@ -0,0 +1,65 @@ +import { z } from "zod"; + +/** Internal envelope key for profiling metadata in RPC payloads */ +export const PROFILING_KEY = "__profiling"; + +/** + * Symbol key for attaching delegation breakdown to response objects. + */ +export const DELEGATION_BREAKDOWN_KEY = Symbol.for("@qvac/sdk:delegation-breakdown"); + +export const profilerModeSchema = z.enum(["summary", "verbose"]); + +/** + * Server-side timing breakdown (server → client). + * Attached to profiling response when includeServerBreakdown is enabled. + */ +export const serverBreakdownSchema = z.object({ + requestJsonParseMs: z.number().optional(), + requestZodValidationMs: z.number().optional(), + handlerExecutionMs: z.number().optional(), + responseZodValidationMs: z.number().optional(), + responseStringifyMs: z.number().optional(), + totalServerMs: z.number().optional(), +}); + +/** + * Delegation timing breakdown (consumer server → client). + * Captures timing for server-to-provider delegation hops. + * Note: Only injected for unary requests; streaming delegation + * records server-side but does not inject into response. + */ +export const delegationBreakdownSchema = z.object({ + profileId: z.string().optional(), + connectionMs: z.number().optional(), + requestStringifyMs: z.number().optional(), + serverWaitMs: z.number().optional(), + responseJsonParseMs: z.number().optional(), + totalDelegationMs: z.number().optional(), +}); + +export const profilingRequestMetaSchema = z.object({ + enabled: z.boolean().optional(), + id: z.string().optional(), + includeServer: z.boolean().optional(), + mode: profilerModeSchema.optional(), +}); + +export const profilingResponseMetaSchema = z.object({ + id: z.string(), + server: serverBreakdownSchema.optional(), + delegation: delegationBreakdownSchema.optional(), +}); + +export const perCallProfilingSchema = z.object({ + enabled: z.boolean().optional(), + includeServerBreakdown: z.boolean().optional(), + mode: profilerModeSchema.optional(), +}); + +export type ProfilingResponseMeta = z.infer; +export type ProfilerMode = z.infer; +export type ProfilingRequestMeta = z.infer; +export type ServerBreakdown = z.infer; +export type DelegationBreakdown = z.infer; +export type PerCallProfiling = z.infer; diff --git a/packages/sdk/server/bare/delegate-rpc-client.ts b/packages/sdk/server/bare/delegate-rpc-client.ts index db22ab9b6f..17c7b3143d 100644 --- a/packages/sdk/server/bare/delegate-rpc-client.ts +++ b/packages/sdk/server/bare/delegate-rpc-client.ts @@ -6,6 +6,11 @@ import { withTimeout } from "@/utils/withTimeout"; import type { RPCOptions } from "@/schemas"; import { DelegateConnectionFailedError } from "@/utils/errors-server"; import { getServerLogger } from "@/logging"; +import { nowMs } from "@/profiling"; +import { + cacheDelegationConnectionTime, + clearPeerConnectionTracking, +} from "@/server/rpc/profiling/delegation-profiler"; const logger = getServerLogger(); @@ -53,12 +58,14 @@ function ensureConnectionHandler(): void { logger.debug(`Connection closed for peer: ${peerPubkey}`); activeRPCs.delete(peerPubkey); activeConnections.delete(peerPubkey); + clearPeerConnectionTracking(peerPubkey); }); conn.on("error", (err) => { logger.error(`Connection error for peer ${peerPubkey}:`, err); activeRPCs.delete(peerPubkey); activeConnections.delete(peerPubkey); + clearPeerConnectionTracking(peerPubkey); }); }); } @@ -81,6 +88,7 @@ async function closeConnection(publicKey: string): Promise { activeConnections.delete(publicKey); activeRPCs.delete(publicKey); + clearPeerConnectionTracking(publicKey); } } @@ -97,6 +105,7 @@ async function ensureRPCConnection( } const swarm = getSwarm(); + const connectionStart = nowMs(); // Track the listener so we can clean it up on timeout let onConnection: (conn: Connection) => void = () => {}; @@ -167,7 +176,12 @@ async function ensureRPCConnection( }); }); - return await withTimeout(connectionPromise, timeout); + const rpc = await withTimeout(connectionPromise, timeout); + + const connectionDuration = nowMs() - connectionStart; + cacheDelegationConnectionTime(publicKey, connectionDuration); + + return rpc; } catch (error: unknown) { // Clean up the per-request connection listener swarm.removeListener("connection", onConnection); @@ -225,4 +239,5 @@ export function cleanupStaleConnection(publicKey: string): void { conn.destroy(); activeConnections.delete(publicKey); } + clearPeerConnectionTracking(publicKey); } diff --git a/packages/sdk/server/bare/ops/transcribe.ts b/packages/sdk/server/bare/ops/transcribe.ts index bb294adff2..1f5279bde4 100644 --- a/packages/sdk/server/bare/ops/transcribe.ts +++ b/packages/sdk/server/bare/ops/transcribe.ts @@ -37,10 +37,7 @@ async function applyPrompt( prompt: string | undefined, engineType: string, ): Promise { - if ( - engineType !== ModelType.whispercppTranscription || - !prompt - ) { + if (engineType !== ModelType.whispercppTranscription || !prompt) { return null; } diff --git a/packages/sdk/server/error-handlers.ts b/packages/sdk/server/error-handlers.ts index 0b624ca0d0..009c4a8211 100644 --- a/packages/sdk/server/error-handlers.ts +++ b/packages/sdk/server/error-handlers.ts @@ -1,15 +1,27 @@ import type RPC from "bare-rpc"; import { createErrorResponse, responseSchema } from "@/schemas"; import { getServerLogger } from "@/logging"; +import { type ServerProfiler } from "./rpc/profiling"; const logger = getServerLogger(); -export function sendErrorResponse(req: RPC.IncomingRequest, error: unknown) { - try { - const errorResponse = createErrorResponse(error); +function buildErrorResponseData( + error: unknown, + profiler?: ServerProfiler, +): string { + const errorResponse = createErrorResponse(error); + const validated = responseSchema.parse(errorResponse); + const json = JSON.stringify(validated); + return profiler ? profiler.serializeError(json) : json; +} - const responseData = JSON.stringify(responseSchema.parse(errorResponse)); - req.reply(responseData, "utf-8"); +export function sendErrorResponse( + req: RPC.IncomingRequest, + error: unknown, + profiler?: ServerProfiler, +) { + try { + req.reply(buildErrorResponseData(error, profiler), "utf-8"); } catch (responseError) { logger.error("Failed to create error response:", responseError); const fallbackError = createErrorResponse( @@ -18,15 +30,14 @@ export function sendErrorResponse(req: RPC.IncomingRequest, error: unknown) { req.reply(JSON.stringify(fallbackError), "utf-8"); } } + export function sendStreamErrorResponse( stream: ReturnType, error: unknown, + profiler?: ServerProfiler, ) { try { - const errorResponse = createErrorResponse(error); - - const responseData = JSON.stringify(responseSchema.parse(errorResponse)); - stream.write(responseData + "\n", "utf-8"); + stream.write(buildErrorResponseData(error, profiler) + "\n", "utf-8"); stream.end(); } catch (responseError) { logger.error("Failed to create stream error response:", responseError); diff --git a/packages/sdk/server/rpc/delegate-transport.ts b/packages/sdk/server/rpc/delegate-transport.ts index 6e74eab124..88ab5ebbcf 100644 --- a/packages/sdk/server/rpc/delegate-transport.ts +++ b/packages/sdk/server/rpc/delegate-transport.ts @@ -9,13 +9,43 @@ import type RPC from "bare-rpc"; import { requestSchema, responseSchema, + PROFILING_KEY, + DELEGATION_BREAKDOWN_KEY, type Request, type Response, type RPCOptions, + type ProfilingRequestMeta, + type DelegationBreakdown, } from "@/schemas"; +import { + nowMs, + extractProfilingMeta, + recordFailure, + generateId, +} from "@/profiling"; import { withTimeout, withTimeoutStream } from "@/utils/withTimeout"; import { getServerLogger } from "@/logging"; import { DelegateProviderError } from "@/utils/errors-server"; +import { + shouldProfileDelegation, + createDelegationTimings, + createDelegationStreamTimings, + recordDelegationEvents, + recordDelegationStreamEvents, + flushServerConnectionEvent, + consumeBreakdownConnectionTime, + type DelegationTimings, + type DelegationStreamTimings, +} from "./profiling/delegation-profiler"; +import type { DelegatedHandlerOptions } from "./profiling"; + +export interface DelegateOptions extends RPCOptions, DelegatedHandlerOptions { + peerKey?: string; +} + +export type ResponseWithDelegation = Response & { + [DELEGATION_BREAKDOWN_KEY]?: DelegationBreakdown; +}; const logger = getServerLogger(); @@ -38,14 +68,34 @@ function checkAndThrowError(response: Response): void { export async function send( request: T, rpc: RPC, - options?: RPCOptions, + options?: DelegateOptions, +): Promise { + const { profilingMeta } = options ?? {}; + const shouldProfile = shouldProfileDelegation(request.type, profilingMeta); + + if (!shouldProfile) { + return sendBase(request, rpc, options, profilingMeta); + } + return sendProfiled(request, rpc, options, profilingMeta); +} + +async function sendBase( + request: T, + rpc: RPC, + options?: DelegateOptions, + profilingMeta?: ProfilingRequestMeta, ): Promise { const parsedRequest = requestSchema.parse(request); const req = rpc.request(getNextCommandId()); logger.debug("[delegate-transport] Sending:", { type: request.type }); - const payload = JSON.stringify(parsedRequest); + // Propagate per-call disable signal to delegated provider + const finalRequest = + profilingMeta?.enabled === false + ? { ...parsedRequest, [PROFILING_KEY]: { enabled: false } } + : parsedRequest; + const payload = JSON.stringify(finalRequest); req.send(payload, "utf-8"); const response = await withTimeout(req.reply("utf-8"), options?.timeout); @@ -60,17 +110,118 @@ export async function send( return resPayload; } +async function sendProfiled( + request: T, + rpc: RPC, + options?: DelegateOptions, + profilingMeta?: ProfilingRequestMeta, +): Promise { + const profileId = profilingMeta?.id ?? generateId(); + const includeServerBreakdown = profilingMeta?.includeServer ?? false; + const timings: DelegationTimings = createDelegationTimings( + profileId, + request.type, + ); + + try { + if (options?.peerKey) { + flushServerConnectionEvent(options.peerKey); + } + const connectionMs = options?.peerKey + ? consumeBreakdownConnectionTime(options.peerKey) + : undefined; + + const parsedRequest = requestSchema.parse(request); + const req = rpc.request(getNextCommandId()); + + logger.debug("[delegate-transport] Sending (profiled):", { + type: request.type, + }); + + const stringifyStart = nowMs(); + const profilingEnvelope: Record = { id: profileId }; + if (includeServerBreakdown) { + profilingEnvelope["includeServer"] = true; + } + const requestWithProfiling = { + ...parsedRequest, + [PROFILING_KEY]: profilingEnvelope, + }; + const payload = JSON.stringify(requestWithProfiling); + timings.requestStringifyMs = nowMs() - stringifyStart; + + timings.sendStart = nowMs(); + req.send(payload, "utf-8"); + + const response = await withTimeout(req.reply("utf-8"), options?.timeout); + timings.firstResponseAt = nowMs(); + + const parseStart = nowMs(); + const rawPayload = JSON.parse(response?.toString() || "{}") as unknown; + timings.responseJsonParseMs = nowMs() - parseStart; + + const resPayload = responseSchema.parse( + rawPayload, + ) as ResponseWithDelegation; + logger.debug("[delegate-transport] Response (profiled):", { + type: resPayload.type, + }); + + checkAndThrowError(resPayload); + + const serverMeta = extractProfilingMeta(rawPayload); + const delegationBreakdown = recordDelegationEvents( + timings, + serverMeta, + connectionMs, + ); + resPayload[DELEGATION_BREAKDOWN_KEY] = delegationBreakdown; + + return resPayload; + } catch (error) { + const base = { + ts: nowMs(), + op: timings.requestType, + kind: "delegation" as const, + profileId: timings.profileId, + }; + recordFailure(base, timings.requestStart, error); + throw error; + } +} + export async function* stream( request: T, rpc: RPC, - options: RPCOptions = {}, + options: DelegateOptions = {}, +): AsyncGenerator { + const { profilingMeta } = options; + const shouldProfile = shouldProfileDelegation(request.type, profilingMeta); + + if (!shouldProfile) { + yield* streamBase(request, rpc, options, profilingMeta); + return; + } + yield* streamProfiled(request, rpc, options, profilingMeta); +} + +async function* streamBase( + request: T, + rpc: RPC, + options: DelegateOptions = {}, + profilingMeta?: ProfilingRequestMeta, ): AsyncGenerator { const parsedRequest = requestSchema.parse(request); const req = rpc.request(getNextCommandId()); logger.debug("[delegate-transport] Streaming:", { type: request.type }); - req.send(JSON.stringify(parsedRequest), "utf-8"); + // Propagate per-call disable signal to delegated provider + const finalRequest = + profilingMeta?.enabled === false + ? { ...parsedRequest, [PROFILING_KEY]: { enabled: false } } + : parsedRequest; + req.send(JSON.stringify(finalRequest), "utf-8"); const responseStream = req.createResponseStream({ encoding: "utf-8" }); let buffer = ""; @@ -96,11 +247,92 @@ export async function* stream( for (const line of lines) { if (line.trim()) { const response = responseSchema.parse(JSON.parse(line)); - checkAndThrowError(response); - yield response; } } } } + +async function* streamProfiled( + request: T, + rpc: RPC, + options: DelegateOptions = {}, + profilingMeta?: ProfilingRequestMeta, +): AsyncGenerator { + const profileId = profilingMeta?.id ?? generateId(); + const timings: DelegationStreamTimings = createDelegationStreamTimings( + profileId, + request.type, + ); + + try { + if (options.peerKey) { + flushServerConnectionEvent(options.peerKey); + } + + const parsedRequest = requestSchema.parse(request); + const req = rpc.request(getNextCommandId()); + + logger.debug("[delegate-transport] Streaming (profiled):", { + type: request.type, + }); + + const stringifyStart = nowMs(); + const requestWithProfiling = { + ...parsedRequest, + [PROFILING_KEY]: { id: profileId }, + }; + const payload = JSON.stringify(requestWithProfiling); + timings.requestStringifyMs = nowMs() - stringifyStart; + + timings.sendStart = nowMs(); + req.send(payload, "utf-8"); + + const responseStream = req.createResponseStream({ encoding: "utf-8" }); + let buffer = ""; + + async function* processStream(): AsyncGenerator { + for await (const chunk of responseStream as AsyncIterable) { + yield chunk; + } + } + + const streamWithTimeout = withTimeoutStream( + processStream(), + options?.timeout, + ); + + for await (const chunk of streamWithTimeout) { + buffer += chunk.toString(); + + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + + for (const line of lines) { + if (line.trim()) { + const response = responseSchema.parse(JSON.parse(line)); + checkAndThrowError(response); + + timings.chunkCount++; + if (timings.firstChunkAt === undefined) { + timings.firstChunkAt = nowMs(); + } + timings.lastChunkAt = nowMs(); + + yield response; + } + } + } + recordDelegationStreamEvents(timings); + } catch (error) { + const base = { + ts: nowMs(), + op: timings.requestType, + kind: "delegation" as const, + profileId: timings.profileId, + }; + recordFailure(base, timings.requestStart, error); + throw error; + } +} diff --git a/packages/sdk/server/rpc/handle-request.ts b/packages/sdk/server/rpc/handle-request.ts index dc567a0832..2736fe29d3 100644 --- a/packages/sdk/server/rpc/handle-request.ts +++ b/packages/sdk/server/rpc/handle-request.ts @@ -1,9 +1,12 @@ import { requestSchema, normalizeModelType, + PROFILING_KEY, type CanonicalModelType, type Request, + type ProfilingRequestMeta, } from "@/schemas"; +import { nowMs } from "@/profiling"; import { resolveModelConfig } from "@/server/bare/registry/model-config-registry"; import type RPC from "bare-rpc"; import { sendErrorResponse } from "@/server/error-handlers"; @@ -17,14 +20,23 @@ import { handleInitConfig, isInitConfigMessage, } from "./handler-utils"; +import { createServerProfiler, type ServerProfiler } from "./profiling"; export async function handleRequest(req: RPC.IncomingRequest): Promise { + let profiler: ServerProfiler | undefined; + let validationStart = 0; + try { const rawData = req.data?.toString(); if (!rawData) { throw new RPCNoDataReceivedError(); } + + // Timing runs unconditionally since we can't know if client + // requested profiling until after parsing. + const parseStart = nowMs(); const jsonData: unknown = JSON.parse(rawData); + const jsonParseMs = nowMs() - parseStart; // Handle internal config initialization (bypasses schema) if (isInitConfigMessage(jsonData)) { @@ -32,18 +44,61 @@ export async function handleRequest(req: RPC.IncomingRequest): Promise { return; } - const processedData = applyDeviceDefaultsToRequest(jsonData); + const { data: cleanData, profilingMeta } = extractProfilingMeta(jsonData); + + profiler = createServerProfiler(profilingMeta); + profiler.markRequestParsed(jsonParseMs); + + validationStart = nowMs(); + const processedData = applyDeviceDefaultsToRequest(cleanData); const request: Request = requestSchema.parse(processedData); - const entry = registry[request.type]; + attachProfilingMetaToRequest(request, profilingMeta); + profiler.markRequestValidated(nowMs() - validationStart); + validationStart = 0; + const entry = registry[request.type]; if (!entry) { throw new RPCUnknownRequestTypeError(request.type); } - await executeHandler(req, request, entry); + await executeHandler(req, request, entry, profiler); } catch (error) { - sendErrorResponse(req, error); + if (profiler && validationStart > 0) { + profiler.markRequestValidated(nowMs() - validationStart); + } + sendErrorResponse(req, error, profiler); + } +} + +function attachProfilingMetaToRequest( + request: Request, + profilingMeta?: ProfilingRequestMeta, +): void { + if (!profilingMeta) return; + + Object.defineProperty(request as Record, PROFILING_KEY, { + value: profilingMeta, + enumerable: false, + configurable: true, + writable: false, + }); +} + +function extractProfilingMeta(data: unknown): { + data: unknown; + profilingMeta: ProfilingRequestMeta | undefined; +} { + if (!data || typeof data !== "object" || !(PROFILING_KEY in data)) { + return { data, profilingMeta: undefined }; } + + const obj = data as Record; + const { [PROFILING_KEY]: meta, ...rest } = obj; + + return { + data: rest, + profilingMeta: meta as ProfilingRequestMeta | undefined, + }; } /** diff --git a/packages/sdk/server/rpc/handler-utils.ts b/packages/sdk/server/rpc/handler-utils.ts index 62615823fa..30245420ec 100644 --- a/packages/sdk/server/rpc/handler-utils.ts +++ b/packages/sdk/server/rpc/handler-utils.ts @@ -1,9 +1,10 @@ import { - responseSchema, type QvacConfig, type Request, type Response, type RuntimeContext, + type ProfilingRequestMeta, + PROFILING_KEY, } from "@/schemas"; import type RPC from "bare-rpc"; import { @@ -12,16 +13,27 @@ import { } from "@/server/error-handlers"; import { setSDKConfig } from "@/server/bare/registry/config-registry"; import { setRuntimeContext } from "@/server/bare/registry/runtime-context-registry"; +import { type ServerProfiler } from "./profiling"; -// eslint-disable-next-line @typescript-eslint/no-explicit-any -type ReplyHandler = (request: any) => Promise | Response; -// eslint-disable-next-line @typescript-eslint/no-explicit-any -type StreamHandler = (request: any) => AsyncGenerator; -type ProgressHandler = ( - // eslint-disable-next-line @typescript-eslint/no-explicit-any +function getProfilingMetaFromRequest( + request: Request, +): ProfilingRequestMeta | undefined { + if (PROFILING_KEY in request) { + return (request as Record)[ + PROFILING_KEY + ] as ProfilingRequestMeta; + } + return undefined; +} + +/* eslint-disable @typescript-eslint/no-explicit-any */ +type ReplyHandler = ( request: any, - onProgress?: (update: Response) => void, -) => Promise; + ...args: any[] +) => Promise | Response; +type StreamHandler = (request: any, ...args: any[]) => AsyncGenerator; +type ProgressHandler = (request: any, ...args: any[]) => Promise; +/* eslint-enable @typescript-eslint/no-explicit-any */ export type HandlerEntry = { type: "reply" | "stream"; @@ -31,23 +43,30 @@ export type HandlerEntry = { supportsProgress?: boolean | ((request: Request) => boolean); }; -function writeToStream( - stream: ReturnType, - response: Response, -) { - stream.write(JSON.stringify(responseSchema.parse(response)) + "\n", "utf-8"); -} - async function executeReplyHandler( req: RPC.IncomingRequest, request: Request, handler: ReplyHandler, + profiler: ServerProfiler, + isDelegated: boolean, ) { + profiler.startHandler(); try { - const response = await handler(request); - req.reply(JSON.stringify(responseSchema.parse(response)), "utf-8"); + let response: Response; + if (isDelegated) { + const profilingMeta = getProfilingMetaFromRequest(request); + response = await handler( + request, + profilingMeta ? { profilingMeta } : undefined, + ); + } else { + response = await handler(request); + } + profiler.endHandler(); + req.reply(profiler.serialize(response, true), "utf-8"); } catch (error) { - sendErrorResponse(req, error); + profiler.endHandler(); + sendErrorResponse(req, error, profiler); } } @@ -55,18 +74,32 @@ async function executeStreamHandler( req: RPC.IncomingRequest, request: Request, handler: StreamHandler, + profiler: ServerProfiler, + isDelegated: boolean, ) { const stream = req.createResponseStream(); + profiler.startHandler(); + try { - for await (const response of handler(request)) { - stream.write( - JSON.stringify(responseSchema.parse(response)) + "\n", - "utf-8", + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let generator: AsyncGenerator; + if (isDelegated) { + const profilingMeta = getProfilingMetaFromRequest(request); + generator = handler( + request, + profilingMeta ? { profilingMeta } : undefined, ); + } else { + generator = handler(request); + } + for await (const response of generator) { + stream.write(profiler.serialize(response, false) + "\n", "utf-8"); } + profiler.endHandler(); stream.end(); } catch (error) { - sendStreamErrorResponse(stream, error); + profiler.endHandler(); + sendStreamErrorResponse(stream, error, profiler); } } @@ -74,16 +107,37 @@ async function executeProgressHandler( req: RPC.IncomingRequest, request: Request, handler: ProgressHandler, + profiler: ServerProfiler, + isDelegated: boolean, ) { const stream = req.createResponseStream(); + profiler.startHandler(); + + const progressCallback = (update: Response) => { + stream.write(profiler.serialize(update, false) + "\n", "utf-8"); + }; + try { - const response = await handler(request, (update) => - writeToStream(stream, update), - ); - writeToStream(stream, response); + let response: Response; + if (isDelegated) { + const profilingMeta = getProfilingMetaFromRequest(request); + const options: { + progressCallback: typeof progressCallback; + profilingMeta?: ProfilingRequestMeta; + } = { progressCallback }; + if (profilingMeta) { + options.profilingMeta = profilingMeta; + } + response = await handler(request, options); + } else { + response = await handler(request, progressCallback); + } + profiler.endHandler(); + stream.write(profiler.serialize(response, true) + "\n", "utf-8"); stream.end(); } catch (error) { - sendStreamErrorResponse(stream, error); + profiler.endHandler(); + sendStreamErrorResponse(stream, error, profiler); } } @@ -92,11 +146,12 @@ export async function executeHandler( req: RPC.IncomingRequest, request: Request, entry: HandlerEntry, + profiler: ServerProfiler, ) { - const handler = + const isDelegated = !!( entry.delegatedHandler && entry.isDelegated?.(request) - ? entry.delegatedHandler - : entry.handler; + ); + const handler = isDelegated ? entry.delegatedHandler! : entry.handler; const wantsProgress = "withProgress" in request && @@ -105,16 +160,30 @@ export async function executeHandler( ? entry.supportsProgress(request) : entry.supportsProgress); - try { - if (entry.type === "stream") { - await executeStreamHandler(req, request, handler as StreamHandler); - } else if (wantsProgress) { - await executeProgressHandler(req, request, handler as ProgressHandler); - } else { - await executeReplyHandler(req, request, handler as ReplyHandler); - } - } catch (error) { - sendErrorResponse(req, error); + if (entry.type === "stream") { + await executeStreamHandler( + req, + request, + handler as StreamHandler, + profiler, + isDelegated, + ); + } else if (wantsProgress) { + await executeProgressHandler( + req, + request, + handler as ProgressHandler, + profiler, + isDelegated, + ); + } else { + await executeReplyHandler( + req, + request, + handler as ReplyHandler, + profiler, + isDelegated, + ); } } diff --git a/packages/sdk/server/rpc/handlers/completion-stream-delegated.ts b/packages/sdk/server/rpc/handlers/completion-stream-delegated.ts index ebd340aeb8..7b936f25a2 100644 --- a/packages/sdk/server/rpc/handlers/completion-stream-delegated.ts +++ b/packages/sdk/server/rpc/handlers/completion-stream-delegated.ts @@ -2,17 +2,23 @@ import type { CompletionStreamRequest, CompletionStreamResponse, } from "@/schemas"; +import type { DelegatedHandlerOptions } from "@/server/rpc/profiling"; import { getModelEntry } from "@/server/bare/registry/model-registry"; import { getRPC } from "@/server/bare/delegate-rpc-client"; -import { stream } from "@/server/rpc/delegate-transport"; +import { stream, type DelegateOptions } from "@/server/rpc/delegate-transport"; import { ModelIsDelegatedError } from "@/utils/errors-server"; import { getServerLogger } from "@/logging"; const logger = getServerLogger(); +export type HandleCompletionStreamDelegatedOptions = DelegatedHandlerOptions; + export async function* handleCompletionStreamDelegated( request: CompletionStreamRequest, + options?: HandleCompletionStreamDelegatedOptions, ): AsyncGenerator { + const { profilingMeta } = options ?? {}; + // Get delegation info from model registry const entry = getModelEntry(request.modelId); @@ -30,8 +36,17 @@ export async function* handleCompletionStreamDelegated( // Create RPC instance for this HyperSwarm peer const rpc = await getRPC(topic, providerPublicKey, { timeout }); + // Build delegate options with profiling metadata + const delegateOpts: DelegateOptions = { peerKey: providerPublicKey }; + if (profilingMeta) { + delegateOpts.profilingMeta = profilingMeta; + } + if (timeout) { + delegateOpts.timeout = timeout; + } + // Use the regular stream function with the HyperSwarm RPC instance - const responseStream = stream(request, rpc, { timeout }); + const responseStream = stream(request, rpc, delegateOpts); // Yield each response from the stream for await (const response of responseStream) { diff --git a/packages/sdk/server/rpc/handlers/load-model-delegated.ts b/packages/sdk/server/rpc/handlers/load-model-delegated.ts index 302d2cd89c..b016c5e22b 100644 --- a/packages/sdk/server/rpc/handlers/load-model-delegated.ts +++ b/packages/sdk/server/rpc/handlers/load-model-delegated.ts @@ -3,9 +3,15 @@ import type { LoadModelResponse, ModelProgressUpdate, } from "@/schemas"; -import { modelInputToSrcSchema } from "@/schemas"; +import { DELEGATION_BREAKDOWN_KEY, modelInputToSrcSchema } from "@/schemas"; +import type { DelegatedHandlerOptions } from "@/server/rpc/profiling"; +import type { ResponseWithDelegation } from "@/server/rpc/delegate-transport"; import { registerModel } from "@/server/bare/registry/model-registry"; -import { send, stream } from "@/server/rpc/delegate-transport"; +import { + send, + stream, + type DelegateOptions, +} from "@/server/rpc/delegate-transport"; import { getRPC, cleanupStaleConnection, @@ -19,10 +25,15 @@ import { getServerLogger } from "@/logging"; const logger = getServerLogger(); +export interface HandleLoadModelDelegatedOptions extends DelegatedHandlerOptions { + progressCallback?: (update: ModelProgressUpdate) => void; +} + export async function handleLoadModelDelegated( request: LoadModelSrcRequest, - progressCallback?: (update: ModelProgressUpdate) => void, + options?: HandleLoadModelDelegatedOptions, ): Promise { + const { progressCallback, profilingMeta } = options ?? {}; if (!request.delegate) { throw new ModelLoadFailedError( "Delegate information is required for delegated load model", @@ -54,15 +65,21 @@ export async function handleLoadModelDelegated( const { delegate: _, ...providerRequest } = request; let finalResponse: LoadModelResponse | undefined; + let delegationBreakdown: ResponseWithDelegation[typeof DELEGATION_BREAKDOWN_KEY]; + + // Build delegate options with profiling metadata + const delegateOpts: DelegateOptions = { peerKey: providerPublicKey }; + if (profilingMeta) { + delegateOpts.profilingMeta = profilingMeta; + } + if (timeout) { + delegateOpts.timeout = timeout; + } if (request.withProgress) { // Use streaming for progress updates logger.debug("📊 Using streaming mode for loadModel with progress"); - const responseStream = stream( - providerRequest, - rpc, - timeout ? { timeout } : {}, - ); + const responseStream = stream(providerRequest, rpc, delegateOpts); for await (const response of responseStream) { if (response.type === "modelProgress") { @@ -82,11 +99,11 @@ export async function handleLoadModelDelegated( } else { // Use simple send for non-progress requests logger.debug("📤 Using simple send mode for loadModel"); - finalResponse = (await send( - providerRequest, - rpc, - timeout ? { timeout } : {}, - )) as LoadModelResponse; + const providerResponse = await send(providerRequest, rpc, delegateOpts); + finalResponse = providerResponse as LoadModelResponse; + delegationBreakdown = (providerResponse as ResponseWithDelegation)[ + DELEGATION_BREAKDOWN_KEY + ]; } if (!finalResponse || !finalResponse.success) { @@ -121,11 +138,18 @@ export async function handleLoadModelDelegated( `✅ Delegated model registered: ${modelId} -> provider: ${providerPublicKey}`, ); - return { + const result: LoadModelResponse = { type: "loadModel", success: true, modelId, }; + + if (delegationBreakdown) { + (result as ResponseWithDelegation)[DELEGATION_BREAKDOWN_KEY] = + delegationBreakdown; + } + + return result; } catch (error) { logger.error("Error in delegated load model:", error); diff --git a/packages/sdk/server/rpc/handlers/load-model/handler.ts b/packages/sdk/server/rpc/handlers/load-model/handler.ts index d10a91e60b..94cbb2d598 100644 --- a/packages/sdk/server/rpc/handlers/load-model/handler.ts +++ b/packages/sdk/server/rpc/handlers/load-model/handler.ts @@ -75,7 +75,10 @@ export async function handleLoadModel( const parseResult = plugin.loadConfigSchema.safeParse(resolvedModelConfig); if (!parseResult.success) { const details = parseResult.error.issues - .map((i: { path: unknown[]; message: string }) => `${String(i.path.join("."))}: ${i.message}`) + .map( + (i: { path: unknown[]; message: string }) => + `${String(i.path.join("."))}: ${i.message}`, + ) .join(", "); throw new PluginLoadConfigValidationFailedError( canonicalModelType, diff --git a/packages/sdk/server/rpc/handlers/load-model/registry.ts b/packages/sdk/server/rpc/handlers/load-model/registry.ts index 462e42a557..d62fef9887 100644 --- a/packages/sdk/server/rpc/handlers/load-model/registry.ts +++ b/packages/sdk/server/rpc/handlers/load-model/registry.ts @@ -127,7 +127,9 @@ async function downloadSingleFileFromRegistry( if (blobBinding) { logger.info(`📥 Downloading blob directly: ${modelFileName}`); - const result = await client.downloadBlob(blobBinding, { timeout: REGISTRY_STREAM_TIMEOUT_MS }); + const result = await client.downloadBlob(blobBinding, { + timeout: REGISTRY_STREAM_TIMEOUT_MS, + }); if (!("stream" in result.artifact)) { throw new RegistryDownloadFailedError( `No stream returned for blob ${modelFileName}`, @@ -281,7 +283,14 @@ async function downloadShardedFilesFromRegistry( throw new DownloadCancelledError(); } - type ShardEntry = { filename: string; size: number; checksum: string; path: string; source: string; blobBinding?: QVACBlobBinding }; + type ShardEntry = { + filename: string; + size: number; + checksum: string; + path: string; + source: string; + blobBinding?: QVACBlobBinding; + }; let shards: ShardEntry[]; if (localShardMetadata?.length) { @@ -771,7 +780,8 @@ export async function downloadModelFromRegistry( logger.info(`✅ ONNX model and data file both cached`); if (progressCallback) { - const total = (modelMetadata?.expectedSize || 0) + companionDataFile.expectedSize; + const total = + (modelMetadata?.expectedSize || 0) + companionDataFile.expectedSize; progressCallback({ type: "modelProgress", downloaded: total, @@ -792,7 +802,9 @@ export async function downloadModelFromRegistry( return mainCached; } - const mainBlobBinding = modelMetadata ? buildBlobBinding(modelMetadata) : undefined; + const mainBlobBinding = modelMetadata + ? buildBlobBinding(modelMetadata) + : undefined; const dataBlobBinding = companionDataFile.blobCoreKey ? buildBlobBinding(companionDataFile) : undefined; @@ -847,7 +859,9 @@ export async function downloadModelFromRegistry( return cachedPath; } - const blobBinding = modelMetadata ? buildBlobBinding(modelMetadata) : undefined; + const blobBinding = modelMetadata + ? buildBlobBinding(modelMetadata) + : undefined; return createManagedDownload( downloadKey, diff --git a/packages/sdk/server/rpc/handlers/plugin-dispatch.ts b/packages/sdk/server/rpc/handlers/plugin-dispatch.ts index 98fa56df4b..57f3c4d8f2 100644 --- a/packages/sdk/server/rpc/handlers/plugin-dispatch.ts +++ b/packages/sdk/server/rpc/handlers/plugin-dispatch.ts @@ -1,5 +1,9 @@ import { getModelEntry } from "@/server/bare/registry/model-registry"; import { getPlugin } from "@/server/plugins"; +import { + profileReplyHandler, + profileStreamHandler, +} from "@/server/rpc/profiling"; import { ModelNotFoundError, ModelIsDelegatedError, @@ -64,17 +68,23 @@ export async function dispatchPluginReply( handlerName: string, request: TRequest, ): Promise { - const { result, streaming } = resolvePluginHandler( - modelId, - handlerName, - request, - ); + return profileReplyHandler({ op: handlerName, request }, async () => { + const { result, streaming } = resolvePluginHandler( + modelId, + handlerName, + request, + ); - if (streaming) { - throw new PluginHandlerTypeMismatchError(handlerName, "reply", "streaming"); - } + if (streaming) { + throw new PluginHandlerTypeMismatchError( + handlerName, + "reply", + "streaming", + ); + } - return result as Promise; + return result as Promise; + }); } /** @@ -88,15 +98,21 @@ export async function* dispatchPluginStream( handlerName: string, request: TRequest, ): AsyncGenerator { - const { result, streaming } = resolvePluginHandler( - modelId, - handlerName, - request, - ); + yield* profileStreamHandler({ op: handlerName, request }, async function* () { + const { result, streaming } = resolvePluginHandler( + modelId, + handlerName, + request, + ); - if (!streaming) { - throw new PluginHandlerTypeMismatchError(handlerName, "streaming", "reply"); - } + if (!streaming) { + throw new PluginHandlerTypeMismatchError( + handlerName, + "streaming", + "reply", + ); + } - yield* result as AsyncGenerator; + yield* result as AsyncGenerator; + }); } diff --git a/packages/sdk/server/rpc/handlers/plugin-invoke.ts b/packages/sdk/server/rpc/handlers/plugin-invoke.ts index c91bf98838..21ec92cc4a 100644 --- a/packages/sdk/server/rpc/handlers/plugin-invoke.ts +++ b/packages/sdk/server/rpc/handlers/plugin-invoke.ts @@ -6,6 +6,10 @@ import type { } from "@/schemas/plugin"; import { getModelEntry } from "@/server/bare/registry/model-registry"; import { getPlugin, getPluginHandler } from "@/server/plugins"; +import { + profileReplyHandler, + profileStreamHandler, +} from "@/server/rpc/profiling"; import { PluginNotFoundError, PluginHandlerNotFoundError, @@ -50,69 +54,32 @@ function resolvePluginHandler(modelId: string, handlerName: string) { export async function handlePluginInvoke( request: PluginInvokeRequest, ): Promise { - const { modelId, handler: handlerName, params } = request; - - logger.debug(`[pluginInvoke] modelId=${modelId} handler=${handlerName}`); - - const { handlerDef } = resolvePluginHandler(modelId, handlerName); - - if (handlerDef.streaming) { - throw new PluginHandlerTypeMismatchError(handlerName, "reply", "streaming"); - } - - const parseResult = handlerDef.requestSchema.safeParse(params); - if (!parseResult.success) { - const details = parseResult.error.issues - .map((i) => `${String(i.path.join("."))}: ${i.message}`) - .join(", "); - throw new PluginRequestValidationFailedError(handlerName, details); - } + return profileReplyHandler({ op: "pluginInvoke", request }, async () => { + const { modelId, handler: handlerName, params } = request; - const result = await handlerDef.handler(parseResult.data); + logger.debug(`[pluginInvoke] modelId=${modelId} handler=${handlerName}`); - const responseParseResult = handlerDef.responseSchema.safeParse(result); - if (!responseParseResult.success) { - const details = responseParseResult.error.issues - .map((i) => `${String(i.path.join("."))}: ${i.message}`) - .join(", "); - throw new PluginResponseValidationFailedError(handlerName, details); - } + const { handlerDef } = resolvePluginHandler(modelId, handlerName); - return { - type: "pluginInvoke", - result: responseParseResult.data, - }; -} - -export async function* handlePluginInvokeStream( - request: PluginInvokeStreamRequest, -): AsyncGenerator { - const { modelId, handler: handlerName, params } = request; - - logger.debug( - `[pluginInvokeStream] modelId=${modelId} handler=${handlerName}`, - ); - - const { handlerDef } = resolvePluginHandler(modelId, handlerName); - - if (!handlerDef.streaming) { - throw new PluginHandlerTypeMismatchError(handlerName, "streaming", "reply"); - } + if (handlerDef.streaming) { + throw new PluginHandlerTypeMismatchError( + handlerName, + "reply", + "streaming", + ); + } - const parseResult = handlerDef.requestSchema.safeParse(params); - if (!parseResult.success) { - const details = parseResult.error.issues - .map((i) => `${String(i.path.join("."))}: ${i.message}`) - .join(", "); - throw new PluginRequestValidationFailedError(handlerName, details); - } + const parseResult = handlerDef.requestSchema.safeParse(params); + if (!parseResult.success) { + const details = parseResult.error.issues + .map((i) => `${String(i.path.join("."))}: ${i.message}`) + .join(", "); + throw new PluginRequestValidationFailedError(handlerName, details); + } - const generator = handlerDef.handler( - parseResult.data, - ) as AsyncGenerator; + const result = await handlerDef.handler(parseResult.data); - for await (const chunk of generator) { - const responseParseResult = handlerDef.responseSchema.safeParse(chunk); + const responseParseResult = handlerDef.responseSchema.safeParse(result); if (!responseParseResult.success) { const details = responseParseResult.error.issues .map((i) => `${String(i.path.join("."))}: ${i.message}`) @@ -120,16 +87,68 @@ export async function* handlePluginInvokeStream( throw new PluginResponseValidationFailedError(handlerName, details); } - yield { - type: "pluginInvokeStream", + return { + type: "pluginInvoke" as const, result: responseParseResult.data, - done: false, }; - } + }); +} - yield { - type: "pluginInvokeStream", - result: null, - done: true, - }; +export async function* handlePluginInvokeStream( + request: PluginInvokeStreamRequest, +): AsyncGenerator { + yield* profileStreamHandler( + { op: "pluginInvokeStream", request }, + async function* () { + const { modelId, handler: handlerName, params } = request; + + logger.debug( + `[pluginInvokeStream] modelId=${modelId} handler=${handlerName}`, + ); + + const { handlerDef } = resolvePluginHandler(modelId, handlerName); + + if (!handlerDef.streaming) { + throw new PluginHandlerTypeMismatchError( + handlerName, + "streaming", + "reply", + ); + } + + const parseResult = handlerDef.requestSchema.safeParse(params); + if (!parseResult.success) { + const details = parseResult.error.issues + .map((i) => `${String(i.path.join("."))}: ${i.message}`) + .join(", "); + throw new PluginRequestValidationFailedError(handlerName, details); + } + + const generator = handlerDef.handler( + parseResult.data, + ) as AsyncGenerator; + + for await (const chunk of generator) { + const responseParseResult = handlerDef.responseSchema.safeParse(chunk); + if (!responseParseResult.success) { + const details = responseParseResult.error.issues + .map((i) => `${String(i.path.join("."))}: ${i.message}`) + .join(", "); + throw new PluginResponseValidationFailedError(handlerName, details); + } + + yield { + type: "pluginInvokeStream" as const, + result: responseParseResult.data, + done: false, + }; + } + + yield { + type: "pluginInvokeStream" as const, + result: null, + done: true, + }; + }, + ); } diff --git a/packages/sdk/server/rpc/handlers/rag.ts b/packages/sdk/server/rpc/handlers/rag.ts index 0c83d95557..c1b0046ed6 100644 --- a/packages/sdk/server/rpc/handlers/rag.ts +++ b/packages/sdk/server/rpc/handlers/rag.ts @@ -13,6 +13,10 @@ import { registerRagOperation, unregisterRagOperation, } from "@/server/bare/rag-hyperdb"; +import { + profileReplyHandler, + registerOperationMetrics, +} from "@/server/rpc/profiling"; type ProgressOperation = "ingest" | "saveEmbeddings" | "reindex"; @@ -21,6 +25,26 @@ interface HandlerOptions { signal?: AbortSignal; } +registerOperationMetrics< + { operation?: string; workspace?: string }, + { processed?: number; results?: unknown[] } +>({ + op: "rag", + kind: "handler", + getTags: (req) => { + const tags: Record = {}; + if (req.operation) tags["operation"] = req.operation; + if (req.workspace) tags["workspace"] = req.workspace; + return tags; + }, + fromResult: (res) => { + const gauges: Record = {}; + if (res.processed !== undefined) gauges["processed"] = res.processed; + if (res.results !== undefined) gauges["resultsCount"] = res.results.length; + return Object.keys(gauges).length > 0 ? gauges : undefined; + }, +}); + function createHandlerOptions( operation: ProgressOperation, workspace: string, @@ -58,6 +82,15 @@ function omitOnProgress>( export async function handleRag( request: RagRequest, onProgress?: (update: RagProgressUpdate) => void, +): Promise { + return profileReplyHandler({ op: "rag", request }, async () => + handleRagInternal(request, onProgress), + ); +} + +async function handleRagInternal( + request: RagRequest, + onProgress?: (update: RagProgressUpdate) => void, ): Promise { switch (request.operation) { case "chunk": { diff --git a/packages/sdk/server/rpc/profiling/context.ts b/packages/sdk/server/rpc/profiling/context.ts new file mode 100644 index 0000000000..0d2d1d9546 --- /dev/null +++ b/packages/sdk/server/rpc/profiling/context.ts @@ -0,0 +1,69 @@ +import { + PROFILING_KEY, + type ProfilingRequestMeta, + type ServerBreakdown, + type DelegationBreakdown, +} from "@/schemas"; +import { nowMs } from "@/profiling"; + +export interface ServerProfilingContext { + meta: ProfilingRequestMeta; + requestStart: number; + jsonParseMs?: number; + zodValidationMs?: number; + handlerExecutionMs?: number; + responseZodValidationMs?: number; + responseStringifyMs?: number; +} + +export function createProfilingContext( + meta: ProfilingRequestMeta, +): ServerProfilingContext { + return { meta, requestStart: nowMs() }; +} + +function buildServerBreakdown(ctx: ServerProfilingContext): ServerBreakdown { + return { + requestJsonParseMs: ctx.jsonParseMs, + requestZodValidationMs: ctx.zodValidationMs, + handlerExecutionMs: ctx.handlerExecutionMs, + responseZodValidationMs: ctx.responseZodValidationMs, + responseStringifyMs: ctx.responseStringifyMs, + totalServerMs: nowMs() - ctx.requestStart, + }; +} + +export interface ProfilingInjectionOptions { + ctx?: ServerProfilingContext; + delegation?: DelegationBreakdown; +} + +export function injectProfilingIntoString( + jsonString: string, + options: ProfilingInjectionOptions, +): string { + const { ctx, delegation } = options; + const includeServer = ctx?.meta.includeServer ?? false; + + // Nothing to inject or invalid JSON + if ((!includeServer && !delegation) || !jsonString.endsWith("}")) { + return jsonString; + } + const id = ctx?.meta.id ?? delegation?.profileId ?? ""; + const profilingMeta: Record = { id }; + + if (includeServer && ctx) { + profilingMeta["server"] = buildServerBreakdown(ctx); + } + + if (delegation) { + const { profileId: _unused, ...delegationWithoutId } = delegation; + void _unused; + profilingMeta["delegation"] = delegationWithoutId; + } + + return ( + jsonString.slice(0, -1) + + `,"${PROFILING_KEY}":${JSON.stringify(profilingMeta)}}` + ); +} diff --git a/packages/sdk/server/rpc/profiling/delegation-profiler.ts b/packages/sdk/server/rpc/profiling/delegation-profiler.ts new file mode 100644 index 0000000000..6d040e0f57 --- /dev/null +++ b/packages/sdk/server/rpc/profiling/delegation-profiler.ts @@ -0,0 +1,226 @@ +import { + nowMs, + record, + shouldProfile, + recordPhase, + recordServerBreakdownPhases, + type BaseTimings, + type BaseEvent, +} from "@/profiling"; +import type { + ProfilingRequestMeta, + ProfilingResponseMeta, + DelegationBreakdown, +} from "@/schemas"; + +export interface DelegatedHandlerOptions { + profilingMeta?: ProfilingRequestMeta; +} + +/** + * Per-peer connection tracking for delegation. + * Separate tracking for server-side event recording vs breakdown injection. + * + * Flow: + * 1. Connection happens → cacheDelegationConnectionTime stores ms + * 2. First profiled call (stream or unary) → flushServerConnectionEvent records server event + * 3. First unary call → consumeBreakdownConnectionTime returns ms for breakdown injection + */ +const pendingConnectionTimes = new Map(); +const serverConnectionRecorded = new Map(); +const breakdownConnectionMs = new Map(); +const breakdownConnectionInjected = new Map(); + +interface BaseDelegationTimings extends BaseTimings { + requestStringifyMs?: number; + sendStart?: number; +} + +export interface DelegationTimings extends BaseDelegationTimings { + firstResponseAt?: number; + responseJsonParseMs?: number; +} + +export interface DelegationStreamTimings extends BaseDelegationTimings { + firstChunkAt?: number; + lastChunkAt?: number; + chunkCount: number; +} + +export function shouldProfileDelegation( + op: string, + incomingMeta?: ProfilingRequestMeta, +): boolean { + if (incomingMeta?.enabled === false) { + return false; + } + if (incomingMeta?.enabled === true) { + return true; + } + return shouldProfile(op); +} + +export function createDelegationTimings( + profileId: string, + requestType: string, +): DelegationTimings { + return { profileId, requestType, requestStart: nowMs() }; +} + +export function createDelegationStreamTimings( + profileId: string, + requestType: string, +): DelegationStreamTimings { + return { profileId, requestType, requestStart: nowMs(), chunkCount: 0 }; +} + +export function cacheDelegationConnectionTime( + peerKey: string, + durationMs: number, +): void { + if (serverConnectionRecorded.get(peerKey)) return; + if (pendingConnectionTimes.has(peerKey)) return; + pendingConnectionTimes.set(peerKey, durationMs); +} + +export function flushServerConnectionEvent(peerKey: string): void { + if (serverConnectionRecorded.get(peerKey)) return; + + const ms = pendingConnectionTimes.get(peerKey); + if (ms === undefined) return; + + pendingConnectionTimes.delete(peerKey); + serverConnectionRecorded.set(peerKey, true); + + if (!breakdownConnectionInjected.get(peerKey)) { + breakdownConnectionMs.set(peerKey, ms); + } + + record({ + ts: nowMs(), + op: "delegation", + kind: "delegation", + phase: "connection", + ms, + tags: { peer: peerKey.slice(0, 16) }, + }); +} + +export function consumeBreakdownConnectionTime( + peerKey: string, +): number | undefined { + if (breakdownConnectionInjected.get(peerKey)) return undefined; + + const ms = breakdownConnectionMs.get(peerKey); + if (ms === undefined) return undefined; + + breakdownConnectionMs.delete(peerKey); + breakdownConnectionInjected.set(peerKey, true); + + return ms; +} + +export function buildDelegationBreakdown( + timings: DelegationTimings, + connectionMs?: number, +): DelegationBreakdown { + const now = nowMs(); + const breakdown: DelegationBreakdown = { + profileId: timings.profileId, + }; + + if (connectionMs !== undefined) { + breakdown.connectionMs = connectionMs; + } + if (timings.requestStringifyMs !== undefined) { + breakdown.requestStringifyMs = timings.requestStringifyMs; + } + if ( + timings.sendStart !== undefined && + timings.firstResponseAt !== undefined + ) { + breakdown.serverWaitMs = timings.firstResponseAt - timings.sendStart; + } + if (timings.responseJsonParseMs !== undefined) { + breakdown.responseJsonParseMs = timings.responseJsonParseMs; + } + breakdown.totalDelegationMs = now - timings.requestStart; + + return breakdown; +} + +export function recordDelegationEvents( + timings: DelegationTimings, + serverMeta?: ProfilingResponseMeta, + connectionMs?: number, +): DelegationBreakdown { + const now = nowMs(); + const base: BaseEvent = { + ts: now, + op: timings.requestType, + kind: "delegation", + profileId: timings.profileId, + }; + + const breakdown = buildDelegationBreakdown(timings, connectionMs); + + recordPhase(base, "request.stringify", breakdown.requestStringifyMs); + recordPhase(base, "serverWait", breakdown.serverWaitMs); + recordPhase(base, "response.jsonParse", breakdown.responseJsonParseMs); + recordPhase(base, "totalDelegationTime", breakdown.totalDelegationMs); + + if (serverMeta?.server) { + recordServerBreakdownPhases(base, serverMeta.server, "delegated"); + } + + return breakdown; +} + +export function recordDelegationStreamEvents( + timings: DelegationStreamTimings, + serverMeta?: ProfilingResponseMeta, +): void { + const now = nowMs(); + const totalTime = now - timings.requestStart; + const base: BaseEvent = { + ts: now, + op: timings.requestType, + kind: "delegation", + profileId: timings.profileId, + }; + + recordPhase(base, "request.stringify", timings.requestStringifyMs); + + if (timings.sendStart !== undefined && timings.firstChunkAt !== undefined) { + recordPhase(base, "ttfb", timings.firstChunkAt - timings.sendStart); + } + if (timings.firstChunkAt !== undefined && timings.lastChunkAt !== undefined) { + recordPhase( + base, + "streamDuration", + timings.lastChunkAt - timings.firstChunkAt, + ); + } + + recordPhase(base, "totalDelegationTime", totalTime, { + count: timings.chunkCount, + }); + + if (serverMeta?.server) { + recordServerBreakdownPhases(base, serverMeta.server, "delegated"); + } +} + +export function clearPeerConnectionTracking(peerKey: string): void { + pendingConnectionTimes.delete(peerKey); + serverConnectionRecorded.delete(peerKey); + breakdownConnectionMs.delete(peerKey); + breakdownConnectionInjected.delete(peerKey); +} + +export function resetDelegationConnectionTracking(): void { + pendingConnectionTimes.clear(); + serverConnectionRecorded.clear(); + breakdownConnectionMs.clear(); + breakdownConnectionInjected.clear(); +} diff --git a/packages/sdk/server/rpc/profiling/index.ts b/packages/sdk/server/rpc/profiling/index.ts new file mode 100644 index 0000000000..36c067dcf3 --- /dev/null +++ b/packages/sdk/server/rpc/profiling/index.ts @@ -0,0 +1,25 @@ +export { createServerProfiler, type ServerProfiler } from "./profiler"; +export { + profileReplyHandler, + profileStreamHandler, +} from "./operation-wrappers"; +export { + registerOperationMetrics, + buildOperationEvent, + type OperationMetricsConfig, +} from "./operation-metrics"; +export { + shouldProfileDelegation, + createDelegationTimings, + createDelegationStreamTimings, + recordDelegationEvents, + recordDelegationStreamEvents, + cacheDelegationConnectionTime, + flushServerConnectionEvent, + consumeBreakdownConnectionTime, + clearPeerConnectionTracking, + resetDelegationConnectionTracking, + type DelegationTimings, + type DelegationStreamTimings, + type DelegatedHandlerOptions, +} from "./delegation-profiler"; diff --git a/packages/sdk/server/rpc/profiling/operation-metrics.ts b/packages/sdk/server/rpc/profiling/operation-metrics.ts new file mode 100644 index 0000000000..ab802c8542 --- /dev/null +++ b/packages/sdk/server/rpc/profiling/operation-metrics.ts @@ -0,0 +1,183 @@ +/** + * Declarative extraction of operation-level metrics from request/response data. + * Used by operation wrappers to capture handler-specific profiling events. + */ + +import type { CompletionStats, TranslationStats, OCRStats } from "@/schemas"; +import type { ProfilingEvent, ProfilingEventKind } from "@/profiling/types"; + +export type MetricExtractor = ( + data: T, +) => Record | undefined; + +export interface OperationMetricsConfig< + TRequest = unknown, + TResponse = unknown, +> { + op: string; + kind: ProfilingEventKind; + fromRequest?: MetricExtractor; + fromFinalChunk?: MetricExtractor; + fromResult?: MetricExtractor; + getTags?: (request: TRequest) => Record; +} + +const metricsRegistry = new Map(); + +export function registerOperationMetrics( + config: OperationMetricsConfig, +): void { + metricsRegistry.set(config.op, config as OperationMetricsConfig); +} + +export function buildOperationEvent( + op: string, + profileId: string, + ts: number, + executionMs: number, + request?: unknown, + finalResponse?: unknown, + ttfb?: number, +): ProfilingEvent | undefined { + const config = metricsRegistry.get(op); + if (!config) { + return { + ts, + op, + kind: "handler", + profileId, + ms: executionMs, + }; + } + + const gauges: Record = {}; + + if (ttfb !== undefined) { + gauges["ttfb"] = ttfb; + } + + if (config.fromRequest && request) { + const extracted = config.fromRequest(request); + if (extracted) Object.assign(gauges, extracted); + } + + if (config.fromFinalChunk && finalResponse) { + const extracted = config.fromFinalChunk(finalResponse); + if (extracted) Object.assign(gauges, extracted); + } + + if (config.fromResult && finalResponse) { + const extracted = config.fromResult(finalResponse); + if (extracted) Object.assign(gauges, extracted); + } + + const tags = config.getTags?.(request as never); + const hasGauges = Object.keys(gauges).length > 0; + const hasTags = tags && Object.keys(tags).length > 0; + + const event: ProfilingEvent = { + ts, + op: config.op, + kind: config.kind, + profileId, + ms: executionMs, + }; + + if (hasGauges) { + event.gauges = gauges; + } + if (hasTags) { + event.tags = tags; + } + + return event; +} + +registerOperationMetrics<{ modelId?: string }, { stats?: CompletionStats }>({ + op: "completionStream", + kind: "handler", + getTags: (req) => (req.modelId ? { modelId: req.modelId } : {}), + fromFinalChunk: (res) => { + if (!res.stats) return undefined; + const gauges: Record = {}; + if (res.stats.timeToFirstToken !== undefined) + gauges["timeToFirstToken"] = res.stats.timeToFirstToken; + if (res.stats.tokensPerSecond !== undefined) + gauges["tokensPerSecond"] = res.stats.tokensPerSecond; + if (res.stats.cacheTokens !== undefined) + gauges["cacheTokens"] = res.stats.cacheTokens; + return Object.keys(gauges).length > 0 ? gauges : undefined; + }, +}); + +registerOperationMetrics<{ modelId?: string }, { stats?: TranslationStats }>({ + op: "translate", + kind: "handler", + getTags: (req) => (req.modelId ? { modelId: req.modelId } : {}), + fromFinalChunk: (res) => { + if (!res.stats) return undefined; + const gauges: Record = {}; + if (res.stats.processedTokens !== undefined) + gauges["processedTokens"] = res.stats.processedTokens; + if (res.stats.processingTime !== undefined) + gauges["processingTime"] = res.stats.processingTime; + return Object.keys(gauges).length > 0 ? gauges : undefined; + }, +}); + +registerOperationMetrics<{ modelId?: string }, unknown>({ + op: "transcribeStream", + kind: "handler", + getTags: (req) => (req.modelId ? { modelId: req.modelId } : {}), +}); + +registerOperationMetrics<{ modelId?: string }, unknown>({ + op: "textToSpeech", + kind: "handler", + getTags: (req) => (req.modelId ? { modelId: req.modelId } : {}), +}); + +registerOperationMetrics<{ modelId?: string }, unknown>({ + op: "embed", + kind: "handler", + getTags: (req) => (req.modelId ? { modelId: req.modelId } : {}), +}); + +registerOperationMetrics<{ modelId?: string }, { stats?: OCRStats }>({ + op: "ocrStream", + kind: "handler", + getTags: (req) => (req.modelId ? { modelId: req.modelId } : {}), + fromFinalChunk: (res) => { + if (!res.stats) return undefined; + const gauges: Record = {}; + if (res.stats.detectionTime !== undefined) + gauges["detectionTime"] = res.stats.detectionTime; + if (res.stats.recognitionTime !== undefined) + gauges["recognitionTime"] = res.stats.recognitionTime; + if (res.stats.totalTime !== undefined) + gauges["totalTime"] = res.stats.totalTime; + return Object.keys(gauges).length > 0 ? gauges : undefined; + }, +}); + +registerOperationMetrics<{ modelId?: string; handler?: string }, unknown>({ + op: "pluginInvoke", + kind: "handler", + getTags: (req) => { + const tags: Record = {}; + if (req.modelId) tags["modelId"] = req.modelId; + if (req.handler) tags["handler"] = req.handler; + return tags; + }, +}); + +registerOperationMetrics<{ modelId?: string; handler?: string }, unknown>({ + op: "pluginInvokeStream", + kind: "handler", + getTags: (req) => { + const tags: Record = {}; + if (req.modelId) tags["modelId"] = req.modelId; + if (req.handler) tags["handler"] = req.handler; + return tags; + }, +}); diff --git a/packages/sdk/server/rpc/profiling/operation-wrappers.ts b/packages/sdk/server/rpc/profiling/operation-wrappers.ts new file mode 100644 index 0000000000..f36fa6df79 --- /dev/null +++ b/packages/sdk/server/rpc/profiling/operation-wrappers.ts @@ -0,0 +1,191 @@ +/** + * Generic wrappers for profiling handler execution. + * Used to wrap dispatch/invoke functions with timing capture. + */ + +import { + PROFILING_KEY, + type PerCallProfiling, + type ProfilingRequestMeta, +} from "@/schemas"; +import { nowMs, generateProfileId } from "@/profiling/clock"; +import { record, shouldProfile } from "@/profiling/controller"; +import { buildOperationEvent } from "./operation-metrics"; + +export interface ProfiledReplyOptions { + op: string; + request: TRequest; + perCall?: PerCallProfiling; +} + +export interface ProfiledStreamOptions { + op: string; + request: TRequest; + perCall?: PerCallProfiling; +} + +interface RecordOperationEventParams { + options: ProfiledReplyOptions | ProfiledStreamOptions; + profileId: string; + startTs: number; + executionMs: number; + finalResponse?: TResponse | undefined; + ttfb?: number | undefined; + count?: number | undefined; + errored?: boolean | undefined; +} + +function getRequestProfilingMeta( + request: unknown, +): ProfilingRequestMeta | undefined { + if (!request || typeof request !== "object") { + return undefined; + } + + const meta = (request as Record)[PROFILING_KEY]; + if (!meta || typeof meta !== "object") { + return undefined; + } + + return meta as ProfilingRequestMeta; +} + +function resolvePerCallProfiling( + options: ProfiledReplyOptions | ProfiledStreamOptions, +): PerCallProfiling | undefined { + if (options.perCall) { + return options.perCall; + } + + const meta = getRequestProfilingMeta(options.request); + if (!meta) { + return undefined; + } + + if (meta.enabled === false) { + return { enabled: false }; + } + + return { + enabled: true, + includeServerBreakdown: meta.includeServer, + mode: meta.mode, + }; +} + +function recordOperationEvent( + params: RecordOperationEventParams, +): void { + const event = buildOperationEvent( + params.options.op, + params.profileId, + params.startTs, + params.executionMs, + params.options.request, + params.finalResponse, + params.ttfb, + ); + + if (!event) return; + + if (params.errored) { + event.tags = { ...event.tags, error: "true" }; + } + + if (params.count !== undefined && params.count > 0) { + event.count = params.count; + } + + record(event); +} + +export async function profileReplyHandler( + options: ProfiledReplyOptions, + handler: () => Promise, +): Promise { + const perCall = resolvePerCallProfiling(options); + if (!shouldProfile(options.op, perCall)) { + return handler(); + } + + const profileId = generateProfileId(); + const startTs = nowMs(); + + try { + const result = await handler(); + const executionMs = nowMs() - startTs; + recordOperationEvent({ + options, + profileId, + startTs, + executionMs, + finalResponse: result, + }); + + return result; + } catch (error) { + const executionMs = nowMs() - startTs; + recordOperationEvent({ + options, + profileId, + startTs, + executionMs, + errored: true, + }); + + throw error; + } +} + +export async function* profileStreamHandler( + options: ProfiledStreamOptions, + handler: () => AsyncGenerator, +): AsyncGenerator { + const perCall = resolvePerCallProfiling(options); + if (!shouldProfile(options.op, perCall)) { + yield* handler(); + return; + } + + const profileId = generateProfileId(); + const startTs = nowMs(); + let ttfb: number | undefined; + let lastChunk: TResponse | undefined; + let chunkCount = 0; + + try { + for await (const chunk of handler()) { + if (ttfb === undefined) { + ttfb = nowMs() - startTs; + } + chunkCount++; + lastChunk = chunk; + yield chunk; + } + + const executionMs = nowMs() - startTs; + recordOperationEvent({ + options, + profileId, + startTs, + executionMs, + finalResponse: lastChunk, + ttfb, + count: chunkCount, + }); + } catch (error) { + const executionMs = nowMs() - startTs; + recordOperationEvent({ + options, + profileId, + startTs, + executionMs, + finalResponse: lastChunk, + ttfb, + count: chunkCount, + errored: true, + }); + + throw error; + } +} diff --git a/packages/sdk/server/rpc/profiling/profiler.ts b/packages/sdk/server/rpc/profiling/profiler.ts new file mode 100644 index 0000000000..3d87c06ee9 --- /dev/null +++ b/packages/sdk/server/rpc/profiling/profiler.ts @@ -0,0 +1,99 @@ +import { nowMs } from "@/profiling"; +import { + responseSchema, + DELEGATION_BREAKDOWN_KEY, + type Response, + type ProfilingRequestMeta, +} from "@/schemas"; +import { + createProfilingContext, + injectProfilingIntoString, + type ServerProfilingContext, +} from "./context"; +import type { ResponseWithDelegation } from "../delegate-transport"; + +export type ServerProfiler = { + markRequestParsed: (ms: number) => void; + markRequestValidated: (ms: number) => void; + startHandler: () => void; + endHandler: () => void; + serialize: (response: Response, final?: boolean) => string; + serializeError: (json: string) => string; + getContext: () => ServerProfilingContext | undefined; +}; + +const noopProfiler: ServerProfiler = { + markRequestParsed: () => {}, + markRequestValidated: () => {}, + startHandler: () => {}, + endHandler: () => {}, + serialize: (response) => { + const delegation = (response as ResponseWithDelegation)[ + DELEGATION_BREAKDOWN_KEY + ]; + const json = JSON.stringify(responseSchema.parse(response)); + if (delegation) { + return injectProfilingIntoString(json, { delegation }); + } + return json; + }, + serializeError: (json) => json, + getContext: () => undefined, +}; + +function createActiveProfiler(meta: ProfilingRequestMeta): ServerProfiler { + const ctx = createProfilingContext(meta); + let handlerStart = 0; + let handlerEnded = false; + + return { + markRequestParsed: (ms) => { + ctx.jsonParseMs = ms; + }, + markRequestValidated: (ms) => { + ctx.zodValidationMs = ms; + }, + startHandler: () => { + handlerStart = nowMs(); + handlerEnded = false; + }, + endHandler: () => { + if (handlerEnded) return; + handlerEnded = true; + ctx.handlerExecutionMs = nowMs() - handlerStart; + }, + serialize: (response, final = true) => { + const delegation = (response as ResponseWithDelegation)[ + DELEGATION_BREAKDOWN_KEY + ]; + + const zodStart = nowMs(); + const validated = responseSchema.parse(response); + ctx.responseZodValidationMs = + (ctx.responseZodValidationMs ?? 0) + (nowMs() - zodStart); + + const stringifyStart = nowMs(); + const json = JSON.stringify(validated); + ctx.responseStringifyMs = + (ctx.responseStringifyMs ?? 0) + (nowMs() - stringifyStart); + + const injectionOpts = delegation ? { ctx, delegation } : { ctx }; + return final ? injectProfilingIntoString(json, injectionOpts) : json; + }, + serializeError: (json) => injectProfilingIntoString(json, { ctx }), + getContext: () => ctx, + }; +} + +export function createServerProfiler( + meta?: ProfilingRequestMeta, +): ServerProfiler { + if ( + meta?.includeServer && + typeof meta.id === "string" && + meta.id.length > 0 + ) { + return createActiveProfiler(meta); + } + return noopProfiler; +} diff --git a/packages/sdk/test/unit/load-model-schema.test.ts b/packages/sdk/test/unit/load-model-schema.test.ts index d3c35a8c31..de9bcc2242 100644 --- a/packages/sdk/test/unit/load-model-schema.test.ts +++ b/packages/sdk/test/unit/load-model-schema.test.ts @@ -55,6 +55,9 @@ test("loadModelRequestSchema: custom plugin allows unknown modelConfig keys", (t const result = loadModelSrcRequestSchema.safeParse(customPluginRequest); t.is(result.success, true); if (result.success) { - t.is((result.data.modelConfig as Record)?.customOption1, "value1"); + t.is( + (result.data.modelConfig as Record)?.customOption1, + "value1", + ); } }); diff --git a/packages/sdk/test/unit/model-types.test.ts b/packages/sdk/test/unit/model-types.test.ts index c981d72c5c..13138cde9f 100644 --- a/packages/sdk/test/unit/model-types.test.ts +++ b/packages/sdk/test/unit/model-types.test.ts @@ -69,10 +69,7 @@ test("normalizeModelType passes through canonical values unchanged", (t) => { ); t.is(normalizeModelType("llamacpp-embedding"), "llamacpp-embedding"); t.is(normalizeModelType("nmtcpp-translation"), "nmtcpp-translation"); - t.is( - normalizeModelType("parakeet-transcription"), - "parakeet-transcription", - ); + t.is(normalizeModelType("parakeet-transcription"), "parakeet-transcription"); t.is(normalizeModelType("onnx-tts"), "onnx-tts"); t.is(normalizeModelType("onnx-ocr"), "onnx-ocr"); }); diff --git a/packages/sdk/test/unit/profiler.test.ts b/packages/sdk/test/unit/profiler.test.ts new file mode 100644 index 0000000000..6ad97e6fd9 --- /dev/null +++ b/packages/sdk/test/unit/profiler.test.ts @@ -0,0 +1,244 @@ +import test from "brittle"; +import { + enable, + disable, + isEnabled, + shouldProfile, + shouldIncludeServerBreakdown, + record, + getAggregates, + getRecentEvents, +} from "../../profiling/controller"; +import { clearAggregator } from "../../profiling/aggregator"; +import { + exportJSON, + exportTable, + exportSummary, +} from "../../profiling/exporters"; +import type { ProfilingEvent } from "../../profiling/types"; + +function reset() { + disable(); + clearAggregator(); +} + +function testEvent(op: string, phase: string, ms: number): ProfilingEvent { + return { ts: Date.now(), op, kind: "rpc", phase, ms }; +} + +// ============================================================================= +// Core Contract +// ============================================================================= + +test("profiler: enable/disable toggles isEnabled", (t: any) => { + reset(); + t.is(isEnabled(), false, "disabled by default"); + + enable(); + t.is(isEnabled(), true, "enabled after enable()"); + + disable(); + t.is(isEnabled(), false, "disabled after disable()"); +}); + +test("profiler: enable() resets aggregates and events", (t: any) => { + reset(); + enable({ mode: "verbose" }); + record(testEvent("test", "a", 100)); + record(testEvent("test", "b", 200)); + + const before = getAggregates(); + t.ok(Object.keys(before).length > 0, "has aggregates before re-enable"); + + enable({ mode: "verbose" }); + + const after = getAggregates(); + t.is(Object.keys(after).length, 0, "aggregates cleared after enable()"); + + const events = getRecentEvents(); + t.is(events.length, 0, "events cleared after enable()"); +}); + +test("profiler: disable() preserves aggregates but exportJSON omits events", (t: any) => { + reset(); + enable({ mode: "verbose" }); + record(testEvent("test", "x", 50)); + disable(); + + const aggregates = getAggregates(); + t.ok( + Object.keys(aggregates).length > 0, + "aggregates preserved after disable", + ); + + const json = exportJSON(); + t.ok( + Object.keys(json.aggregates).length > 0, + "exportJSON includes aggregates", + ); + t.is( + json.recentEvents, + undefined, + "exportJSON omits recentEvents after disable", + ); +}); + +test("profiler: verbose mode buffers events", (t: any) => { + reset(); + enable({ mode: "verbose" }); + record(testEvent("test", "v", 10)); + + const events = getRecentEvents(); + t.ok(events.length > 0, "events buffered in verbose mode"); +}); + +test("profiler: summary mode does not buffer events", (t: any) => { + reset(); + enable({ mode: "summary" }); + record(testEvent("test", "s", 10)); + + const events = getRecentEvents(); + t.is(events.length, 0, "no events buffered in summary mode"); + + const aggregates = getAggregates(); + t.ok(Object.keys(aggregates).length > 0, "aggregates still recorded"); +}); + +test("profiler: exportJSON omits recentEvents in summary mode", (t: any) => { + reset(); + enable({ mode: "summary" }); + record(testEvent("test", "e", 10)); + + const json = exportJSON(); + t.is(json.recentEvents, undefined, "recentEvents omitted in summary"); + t.ok(Object.keys(json.aggregates).length > 0, "aggregates present"); +}); + +test("profiler: exportJSON includes recentEvents in verbose mode", (t: any) => { + reset(); + enable({ mode: "verbose" }); + record(testEvent("test", "e", 10)); + + const json = exportJSON(); + t.ok(Array.isArray(json.recentEvents), "recentEvents included in verbose"); + t.ok(json.recentEvents!.length > 0, "recentEvents has entries"); +}); + +test("profiler: exportTable returns string", (t: any) => { + reset(); + enable(); + record(testEvent("test", "t", 10)); + + const table = exportTable(); + t.is(typeof table, "string"); + t.ok(table.includes("test.t"), "table contains metric key"); +}); + +test("profiler: exportSummary returns string", (t: any) => { + reset(); + enable(); + record(testEvent("test", "s", 10)); + + const summary = exportSummary(); + t.is(typeof summary, "string"); + t.ok(summary.includes("PROFILER SUMMARY"), "summary has header"); +}); + +// ============================================================================= +// Precedence +// ============================================================================= + +test("profiler: shouldProfile per-call > runtime > default", (t: any) => { + reset(); + + // Default: disabled + t.is(shouldProfile("test"), false, "disabled by default"); + + // Runtime enable + enable(); + t.is(shouldProfile("test"), true, "runtime enable takes effect"); + + // Per-call disable overrides runtime + t.is( + shouldProfile("test", { enabled: false }), + false, + "per-call disable wins", + ); + + // Per-call enable when runtime disabled + disable(); + t.is(shouldProfile("test", { enabled: true }), true, "per-call enable wins"); +}); + +test("profiler: shouldIncludeServerBreakdown per-call > runtime > default", (t: any) => { + reset(); + + // Default: false + t.is(shouldIncludeServerBreakdown(), false, "default false"); + + // Runtime enable with includeServerBreakdown + enable({ includeServerBreakdown: true }); + t.is( + shouldIncludeServerBreakdown(), + true, + "runtime includeServer takes effect", + ); + + // Per-call override + t.is( + shouldIncludeServerBreakdown({ includeServerBreakdown: false }), + false, + "per-call override wins", + ); + + // Per-call enable when runtime disabled + disable(); + enable({ includeServerBreakdown: false }); + t.is( + shouldIncludeServerBreakdown({ includeServerBreakdown: true }), + true, + "per-call enable wins", + ); +}); + +// ============================================================================= +// Gating (wrapper-level disabled path) +// ============================================================================= + +test("profiler: no aggregates when globally disabled and no per-call override", (t: any) => { + reset(); + // Simulate what wrappers do: check shouldProfile before recording + if (shouldProfile("gated")) { + record(testEvent("gated", "test", 100)); + } + + const aggregates = getAggregates(); + t.is(Object.keys(aggregates).length, 0, "nothing recorded when gated"); +}); + +test("profiler: per-call enabled:true records even when globally disabled", (t: any) => { + reset(); + if (shouldProfile("percall", { enabled: true })) { + record(testEvent("percall", "enabled", 50)); + } + + const aggregates = getAggregates(); + t.ok( + "percall.enabled" in aggregates, + "per-call enable bypasses global disable", + ); +}); + +test("profiler: per-call enabled:false suppresses when globally enabled", (t: any) => { + reset(); + enable(); + if (shouldProfile("suppressed", { enabled: false })) { + record(testEvent("suppressed", "test", 50)); + } + + const aggregates = getAggregates(); + t.not( + "suppressed.test" in aggregates, + "per-call disable suppresses recording", + ); +}); diff --git a/packages/sdk/utils/global-singleton.ts b/packages/sdk/utils/global-singleton.ts new file mode 100644 index 0000000000..5eb34a9c9c --- /dev/null +++ b/packages/sdk/utils/global-singleton.ts @@ -0,0 +1,11 @@ +export function getGlobalSingleton(key: symbol, create: () => T): T { + const global = globalThis as { [key: symbol]: unknown }; + const existing = global[key]; + if (existing !== undefined) { + return existing as T; + } + + const value = create(); + global[key] = value as unknown; + return value; +}