Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/sdk/client/api/completion-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ export function completion(params: CompletionParams): {

const responses: AsyncGenerator<unknown> = streamRpc(
request,
undefined,
params.rpcOptions,
);

Expand Down
7 changes: 5 additions & 2 deletions packages/sdk/client/api/download-asset.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { send, stream } from "@/client/rpc/rpc-client";
import {
type DownloadAssetOptions as BaseDownloadAssetOptions,
type RPCOptions,
downloadAssetOptionsToRequestSchema,
} from "@/schemas";
import {
Expand All @@ -22,6 +23,7 @@ export type DownloadAssetOptions = BaseDownloadAssetOptions;
* - assetSrc: The location from which the asset is downloaded (local path, remote URL, or Hyperdrive URL)
* - seed: Optional boolean for hyperdrive seeding
* - onProgress: Optional callback for download progress
* @param rpcOptions - Optional RPC options including per-call profiling configuration
*
* @returns Promise that resolves to the asset ID (either the provided assetSrc or a generated ID)
*
Expand All @@ -48,12 +50,13 @@ export type DownloadAssetOptions = BaseDownloadAssetOptions;
*/
export async function downloadAsset(
options: DownloadAssetOptions,
rpcOptions?: RPCOptions,
): Promise<string> {
const request = downloadAssetOptionsToRequestSchema.parse(options);

if (options.onProgress) {
// Use streaming for progress updates
for await (const response of stream(request)) {
for await (const response of stream(request, rpcOptions)) {
if (response.type === "modelProgress") {
options.onProgress(response);
} else if (response.type === "downloadAsset") {
Expand All @@ -67,7 +70,7 @@ export async function downloadAsset(
throw new StreamEndedError();
} else {
// Use regular send for simple downloading
const response = await send(request);
const response = await send(request, rpcOptions);
if (response.type !== "downloadAsset") {
throw new InvalidResponseError("downloadAsset");
}
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/client/api/embed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export async function embed(
...params,
};

const response = await send(request, undefined, options);
const response = await send(request, options);
if (response.type !== "embed") {
throw new InvalidResponseError("embed");
}
Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/client/api/invoke-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export async function invokePlugin<TResponse = unknown, TParams = unknown>(
params: options.params,
};

const response = await send(request, undefined, rpcOptions);
const response = await send(request, rpcOptions);

if (response.type !== "pluginInvoke") {
throw new InvalidResponseError("pluginInvoke");
Expand All @@ -53,7 +53,7 @@ export async function* invokePluginStream<
params: options.params,
};

for await (const chunk of stream(request, undefined, rpcOptions)) {
for await (const chunk of stream(request, rpcOptions)) {
const response = chunk as PluginInvokeStreamResponse;
if (response.type !== "pluginInvokeStream") {
throw new InvalidResponseError("pluginInvokeStream");
Expand Down
18 changes: 14 additions & 4 deletions packages/sdk/client/api/load-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { startLoggingStreamForModel } from "@/client/logging-stream-registry";
import {
type LoadModelOptions,
type ReloadConfigOptions,
type RPCOptions,
loadModelOptionsToRequestSchema,
reloadConfigOptionsToRequestSchema,
isModelTypeAlias,
Expand Down Expand Up @@ -32,6 +33,7 @@ const logger = getClientLogger();
* - modelConfig: Model-specific configuration options (companion sources, model parameters, etc.)
* - onProgress: Callback for download progress updates
* - logger: Logger instance for model operation logs
* @param rpcOptions - Optional RPC options including per-call profiling configuration
*
* @returns Promise that resolves to the model ID (either the provided modelSrc or a generated ID)
*
Expand Down Expand Up @@ -107,7 +109,10 @@ const logger = getClientLogger();
* });
* ```
*/
export function loadModel(options: LoadModelOptions): Promise<string>;
export function loadModel(
options: LoadModelOptions,
rpcOptions?: RPCOptions,
): Promise<string>;

/**
* Hot-reloads configuration on an already loaded model.
Expand All @@ -116,6 +121,7 @@ export function loadModel(options: LoadModelOptions): Promise<string>;
* - modelId: The ID of an existing loaded model
* - modelType: The type of model (must match the loaded model)
* - modelConfig: New configuration to apply
* @param rpcOptions - Optional RPC options including per-call profiling configuration
*
* @returns Promise that resolves to the model ID
*
Expand All @@ -139,10 +145,14 @@ export function loadModel(options: LoadModelOptions): Promise<string>;
* });
* ```
*/
export function loadModel(options: ReloadConfigOptions): Promise<string>;
export function loadModel(
options: ReloadConfigOptions,
rpcOptions?: RPCOptions,
): Promise<string>;

export async function loadModel(
options: LoadModelOptions | ReloadConfigOptions,
rpcOptions?: RPCOptions,
): Promise<string> {
const isReloadConfig = "modelId" in options && !("modelSrc" in options);

Expand All @@ -162,7 +172,7 @@ export async function loadModel(

if (onProgress) {
// Use streaming for progress updates
for await (const response of stream(request)) {
for await (const response of stream(request, rpcOptions)) {
if (response.type === "modelProgress") {
onProgress(response);
} else if (response.type === "loadModel") {
Expand Down Expand Up @@ -191,7 +201,7 @@ export async function loadModel(
}

// Use regular send for simple loading
const response = await send(request);
const response = await send(request, rpcOptions);
if (response.type !== "loadModel") {
throw new InvalidResponseError("loadModel");
}
Expand Down
24 changes: 12 additions & 12 deletions packages/sdk/client/api/rag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function ragChunk(
...params,
};

const response = await send(request, undefined, options);
const response = await send(request, options);

if (response.type !== "rag") {
throw new InvalidResponseError("rag");
Expand Down Expand Up @@ -139,7 +139,7 @@ export async function ragIngest(

if (onProgress) {
// Use streaming for progress updates
for await (const event of stream(request, undefined, options)) {
for await (const event of stream(request, options)) {
if (event.type === "rag:progress" && event.operation === "ingest") {
const progress: RagProgressUpdate = event;
onProgress(
Expand All @@ -165,7 +165,7 @@ export async function ragIngest(
throw new StreamEndedError();
}

const response = await send(request, undefined, options);
const response = await send(request, options);
if (response.type !== "rag") {
throw new InvalidResponseError("rag");
}
Expand Down Expand Up @@ -232,7 +232,7 @@ export async function ragSaveEmbeddings(
};

if (onProgress) {
for await (const event of stream(request, undefined, options)) {
for await (const event of stream(request, options)) {
if (
event.type === "rag:progress" &&
event.operation === "saveEmbeddings"
Expand All @@ -258,7 +258,7 @@ export async function ragSaveEmbeddings(
throw new StreamEndedError();
}

const response = await send(request, undefined, options);
const response = await send(request, options);

if (response.type !== "rag") {
throw new InvalidResponseError("rag");
Expand Down Expand Up @@ -314,7 +314,7 @@ export async function ragSearch(
n: params.n ?? 3,
};

const response = await send(request, undefined, options);
const response = await send(request, options);
if (response.type !== "rag") {
throw new InvalidResponseError("rag");
}
Expand Down Expand Up @@ -360,7 +360,7 @@ export async function ragDeleteEmbeddings(
...params,
};

const response = await send(request, undefined, options);
const response = await send(request, options);
if (response.type !== "rag") {
throw new InvalidResponseError("rag");
}
Expand Down Expand Up @@ -428,7 +428,7 @@ export async function ragReindex(
};

if (onProgress) {
for await (const event of stream(request, undefined, options)) {
for await (const event of stream(request, options)) {
if (event.type === "rag:progress" && event.operation === "reindex") {
const progress: RagProgressUpdate = event;
onProgress(
Expand All @@ -451,7 +451,7 @@ export async function ragReindex(
throw new StreamEndedError();
}

const response = await send(request, undefined, options);
const response = await send(request, options);

if (response.type !== "rag") {
throw new InvalidResponseError("rag");
Expand Down Expand Up @@ -494,7 +494,7 @@ export async function ragListWorkspaces(
operation: "listWorkspaces",
};

const response = await send(request, undefined, options);
const response = await send(request, options);

if (response.type !== "rag") {
throw new InvalidResponseError("rag");
Expand Down Expand Up @@ -544,7 +544,7 @@ export async function ragCloseWorkspace(
...params,
};

const response = await send(request, undefined, options);
const response = await send(request, options);

if (response.type !== "rag") {
throw new InvalidResponseError("rag");
Expand Down Expand Up @@ -584,7 +584,7 @@ export async function ragDeleteWorkspace(
...params,
};

const response = await send(request, undefined, options);
const response = await send(request, options);

if (response.type !== "rag") {
throw new InvalidResponseError("rag");
Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/client/api/text-to-speech.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export function textToSpeech(

if (params.stream) {
const bufferStream = (async function* () {
for await (const response of streamRpc(request, undefined, options)) {
for await (const response of streamRpc(request, options)) {
if (response.type === "textToSpeech") {
const streamResponse = ttsResponseSchema.parse(response);
if (streamResponse.buffer.length > 0) {
Expand All @@ -54,7 +54,7 @@ export function textToSpeech(

const bufferPromise = (async () => {
let buffer: number[] = [];
for await (const response of streamRpc(request, undefined, options)) {
for await (const response of streamRpc(request, options)) {
if (response.type === "textToSpeech") {
const streamResponse = ttsResponseSchema.parse(response);
buffer = buffer.concat(streamResponse.buffer);
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/client/api/transcribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export async function* transcribeStream(
...(params.prompt && { prompt: params.prompt }),
};

for await (const response of stream(request, undefined, options)) {
for await (const response of stream(request, options)) {
if (response.type === "transcribeStream") {
const streamResponse = transcribeStreamResponseSchema.parse(response);

Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/client/api/translate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export function translate(

if (params.stream) {
const tokenStream = (async function* () {
for await (const response of streamRpc(request, undefined, options)) {
for await (const response of streamRpc(request, options)) {
if (response.type === "translate") {
const streamResponse = translateResponseSchema.parse(response);
if (!streamResponse.done) {
Expand Down Expand Up @@ -118,7 +118,7 @@ export function translate(
const textPromise = (async () => {
let buffer = "";

for await (const response of streamRpc(request, undefined, options)) {
for await (const response of streamRpc(request, options)) {
if (response.type === "translate") {
const streamResponse = translateResponseSchema.parse(response);
buffer += streamResponse.token;
Expand Down
24 changes: 23 additions & 1 deletion packages/sdk/client/rpc/profiling/profiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,32 @@ export function recordClientEvents(
recordPhase(
base,
"clientOverhead",
totalClientTime - serverMeta.server.totalServerMs,
Math.max(0, totalClientTime - serverMeta.server.totalServerMs),
);
}
}

if (serverMeta?.delegation) {
recordDelegationBreakdownPhases(base, serverMeta.delegation);
}

if (serverMeta?.operation) {
recordOperationEvent(serverMeta.operation);
}
}

function recordOperationEvent(op: NonNullable<ProfilingResponseMeta["operation"]>): void {
const event: Parameters<typeof record>[0] = {
ts: nowMs(),
op: op.op,
kind: op.kind,
ms: op.ms,
};
if (op.profileId !== undefined) event.profileId = op.profileId;
if (op.gauges !== undefined) event.gauges = op.gauges;
if (op.tags !== undefined) event.tags = op.tags;
if (op.count !== undefined) event.count = op.count;
record(event);
}

export function recordClientStreamEvents(
Expand Down Expand Up @@ -205,6 +223,10 @@ export function recordClientStreamEvents(
if (serverMeta?.delegation) {
recordDelegationBreakdownPhases(base, serverMeta.delegation);
}

if (serverMeta?.operation) {
recordOperationEvent(serverMeta.operation);
}
}

export function resetConnectionTracking(): void {
Expand Down
6 changes: 4 additions & 2 deletions packages/sdk/client/rpc/rpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type RPC from "bare-rpc";
import {
requestSchema,
responseSchema,
PROFILING_TRAILER_KEY,
type Request,
type Response,
type RPCOptions,
Expand Down Expand Up @@ -96,8 +97,8 @@ async function prepareRPCContext(

export async function send<T extends Request>(
request: T,
rpc?: RPC,
options?: RPCOptions,
rpc?: RPC,
): Promise<Response> {
const ctx = await prepareRPCContext(request.type, options?.profiling, rpc);

Expand Down Expand Up @@ -209,8 +210,8 @@ async function sendProfiled<T extends Request>(

export async function* stream<T extends Request>(
request: T,
rpc?: RPC,
options: RPCOptions = {},
rpc?: RPC,
): AsyncGenerator<Response> {
const ctx = await prepareRPCContext(request.type, options?.profiling, rpc);

Expand Down Expand Up @@ -337,6 +338,7 @@ async function* streamProfiled<T extends Request>(
profilingMeta = chunkMeta;
}

if (rawParsed[PROFILING_TRAILER_KEY] === true) continue;
const cleanPayload = stripProfilingMeta(rawParsed);
const response = responseSchema.parse(cleanPayload);

Expand Down
Loading
Loading