diff --git a/examples/personal-finance/agents/personal-finance/evals/promptfooconfig.yaml b/examples/personal-finance/agents/personal-finance/evals/promptfooconfig.yaml index 031d78713..9597d97a5 100644 --- a/examples/personal-finance/agents/personal-finance/evals/promptfooconfig.yaml +++ b/examples/personal-finance/agents/personal-finance/evals/promptfooconfig.yaml @@ -75,3 +75,35 @@ tests: Two dates five days apart fall in different UK fiscal years. The agent should not claim both belong to the same year. weight: 0.7 + + # ─── retrieval — agent grounds answer in saved memory ─────────────────── + # Exercises the `tool_use` SSE event surfaced by the gateway: the provider + # populates `metadata.toolCalls` (every tool call observed in the turn) and + # `metadata.retrievedContext` (joined snippet text from search_memory). For + # personal-finance, a prompt that references the workspace's saved facts + # should trigger at least one `search_memory` call. + # + # The metadata is on the provider response (returned from `LobuProvider`). + # promptfoo surfaces it to JS assertions via `context.providerResponse`. + + - description: retrieval — recent activity question triggers search_memory + vars: + query: "What dividends have I logged in this tax year so far?" + assert: + # The agent should consult its memory; even an empty workspace should + # produce a search_memory call before answering. + - type: javascript + value: | + const meta = context.providerResponse?.metadata ?? {}; + const calls = Array.isArray(meta.toolCalls) ? meta.toolCalls : []; + return calls.some((c) => + c && (c.name === 'search_memory' || c.name === 'lobu_search_memory') + ); + weight: 0.5 + - type: llm-rubric + value: | + Response either lists dividends grounded in workspace data, or + (if the workspace is empty) cleanly tells the user no dividends are + logged yet and offers to record one. Does not invent dividend + amounts or sources. + weight: 0.5 diff --git a/packages/agent-worker/src/__tests__/tool-use-events.test.ts b/packages/agent-worker/src/__tests__/tool-use-events.test.ts new file mode 100644 index 000000000..e6cab4574 --- /dev/null +++ b/packages/agent-worker/src/__tests__/tool-use-events.test.ts @@ -0,0 +1,108 @@ +import { describe, expect, test } from "bun:test"; +import { buildToolUseEventPayload } from "../openclaw/tool-use-events"; + +// pi-agent's `tool_execution_end` event omits `args` (those live on the +// matching `tool_execution_start`); worker.ts re-attaches them from a Map +// before calling buildToolUseEventPayload. Tests mirror that — pass args +// explicitly to validate the merged shape. +const baseEvent = { + toolCallId: "call_1", + toolName: "search_memory", + args: { query: "rent due dates" }, + result: { + matches: [{ id: 1, name: "Acme Ltd" }], + content: [ + { + id: 42, + title: "Rent", + text_content: "Tenant Acme pays £1,200 on the 1st of each month", + author_name: null, + source_url: null, + platform: "manual", + occurred_at: null, + entity_ids: [1], + }, + { + id: 43, + title: "Lease renewal", + text_content: "Lease renewed through Dec 2027", + author_name: null, + source_url: null, + platform: "manual", + occurred_at: null, + entity_ids: [1], + }, + ], + metadata: { total_matches: 1, page_size: 5 }, + }, + isError: false, +}; + +describe("buildToolUseEventPayload", () => { + test("includes name, input, toolCallId for any tool", () => { + const payload = buildToolUseEventPayload({ + toolCallId: "abc", + toolName: "bash", + args: { command: "ls" }, + result: "ok", + isError: false, + }); + expect(payload.name).toBe("bash"); + expect(payload.toolCallId).toBe("abc"); + expect(payload.input).toEqual({ command: "ls" }); + expect(payload.isError).toBe(false); + // Non-retrieval tools get no result_summary unless they error. + expect(payload.result_summary).toBeUndefined(); + }); + + test("extracts event_ids + snippets for search_memory", () => { + const payload = buildToolUseEventPayload(baseEvent); + expect(payload.result_summary?.event_ids).toEqual([42, 43]); + expect(payload.result_summary?.snippets).toEqual([ + { + id: 42, + text: "Tenant Acme pays £1,200 on the 1st of each month", + }, + { id: 43, text: "Lease renewed through Dec 2027" }, + ]); + }); + + test("treats lobu_search_memory the same as search_memory", () => { + const payload = buildToolUseEventPayload({ + ...baseEvent, + toolName: "lobu_search_memory", + }); + expect(payload.result_summary?.event_ids).toEqual([42, 43]); + }); + + test("handles MCP CallToolResult wrapping", () => { + const payload = buildToolUseEventPayload({ + ...baseEvent, + result: { + content: [{ type: "text", text: JSON.stringify(baseEvent.result) }], + isError: false, + }, + }); + expect(payload.result_summary?.event_ids).toEqual([42, 43]); + }); + + test("returns no result_summary when search_memory has no content", () => { + const payload = buildToolUseEventPayload({ + ...baseEvent, + result: { matches: [], metadata: { total_matches: 0, page_size: 0 } }, + }); + expect(payload.result_summary).toBeUndefined(); + }); + + test("propagates error message when isError", () => { + const payload = buildToolUseEventPayload({ + toolCallId: "x", + toolName: "search_memory", + args: {}, + result: { message: "authentication required" }, + isError: true, + }); + expect(payload.isError).toBe(true); + expect(payload.result_summary?.error).toBe("authentication required"); + }); +}); diff --git a/packages/agent-worker/src/openclaw/tool-use-events.ts b/packages/agent-worker/src/openclaw/tool-use-events.ts new file mode 100644 index 000000000..86e52a174 --- /dev/null +++ b/packages/agent-worker/src/openclaw/tool-use-events.ts @@ -0,0 +1,146 @@ +/** + * Helpers for surfacing tool-call traces from worker → gateway → SSE clients. + * + * Workers emit a `tool_use` custom event per `tool_execution_end` so any SSE + * subscriber (the @lobu/promptfoo-provider, the Mac menubar, the CLI eval) can + * inspect tool calls without having to scrape worker logs. The shape mirrors + * Anthropic's tool-use block (name + input) plus a `result_summary` field for + * structured, tool-specific metadata that's small enough to ship over SSE. + * + * For retrieval tools (`search_memory` / `lobu_search_memory`) we extract the + * matched event IDs and the snippet text content so promptfoo RAG assertions + * (`context-recall`, `context-faithfulness`, custom `javascript`) can join the + * agent's answer back to the retrieved evidence. + */ + +export interface ToolUseEventPayload { + toolCallId: string; + name: string; + input: unknown; + isError: boolean; + result_summary?: ToolUseResultSummary; +} + +export interface ToolUseResultSummary { + /** Event IDs the tool returned (search_memory etc). */ + event_ids?: number[]; + /** Inline snippet text keyed by event id — populated by retrieval tools so + * provider clients can compute `retrievedContext` without a round-trip. */ + snippets?: Array<{ id: number; text: string }>; + /** Tools may also include a short error string. */ + error?: string; +} + +const SEARCH_MEMORY_TOOL_NAMES = new Set([ + "search_memory", + "lobu_search_memory", +]); + +/** + * Build the SSE payload for a `tool_execution_end` pi-agent event. + * + * Always returns a record — even on parse failure — so the SSE event reliably + * fires for every tool call. `result_summary` is best-effort: tool-specific + * shape parsing is wrapped in try/catch and a parse failure just leaves the + * summary off, never throws. + */ +export function buildToolUseEventPayload(event: { + toolCallId: string; + toolName: string; + args?: unknown; + result: unknown; + isError: boolean; +}): ToolUseEventPayload { + const payload: ToolUseEventPayload = { + toolCallId: event.toolCallId, + name: event.toolName, + input: event.args ?? null, + isError: event.isError === true, + }; + + if (event.isError) { + const message = extractErrorMessage(event.result); + if (message) { + payload.result_summary = { error: message }; + } + return payload; + } + + if (SEARCH_MEMORY_TOOL_NAMES.has(event.toolName)) { + const summary = summarizeSearchMemoryResult(event.result); + if (summary) { + payload.result_summary = summary; + } + } + + return payload; +} + +function summarizeSearchMemoryResult( + raw: unknown +): ToolUseResultSummary | null { + const result = extractSearchMemoryBody(raw); + if (!result || typeof result !== "object") return null; + + const summary: ToolUseResultSummary = {}; + const contentArr = (result as { content?: unknown }).content; + if (Array.isArray(contentArr) && contentArr.length > 0) { + const ids: number[] = []; + const snippets: Array<{ id: number; text: string }> = []; + for (const snippet of contentArr) { + if (!snippet || typeof snippet !== "object") continue; + const idVal = (snippet as { id?: unknown }).id; + const textVal = (snippet as { text_content?: unknown }).text_content; + if (typeof idVal !== "number") continue; + ids.push(idVal); + if (typeof textVal === "string" && textVal.length > 0) { + snippets.push({ id: idVal, text: textVal }); + } + } + if (ids.length > 0) summary.event_ids = ids; + if (snippets.length > 0) summary.snippets = snippets; + } + + return Object.keys(summary).length > 0 ? summary : null; +} + +/** + * MCP tool results from the gateway proxy land here as + * { content: [{ type: 'text', text: '' }], isError: false } + * but in-process tools sometimes pass the raw object straight through. Handle + * both shapes. + */ +function extractSearchMemoryBody(raw: unknown): unknown { + if (!raw || typeof raw !== "object") return null; + + // MCP CallToolResult shape — text content holds a JSON-stringified payload. + const mcpContent = (raw as { content?: unknown }).content; + if (Array.isArray(mcpContent)) { + for (const part of mcpContent) { + if (!part || typeof part !== "object") continue; + const type = (part as { type?: unknown }).type; + const text = (part as { text?: unknown }).text; + if (type === "text" && typeof text === "string") { + try { + return JSON.parse(text); + } catch { + // Plain text result — nothing to summarise. + return null; + } + } + } + } + + // Already the search_memory body. + return raw; +} + +function extractErrorMessage(result: unknown): string | null { + if (typeof result === "string") return result; + if (!result || typeof result !== "object") return null; + const msg = (result as { message?: unknown }).message; + if (typeof msg === "string") return msg; + const err = (result as { error?: unknown }).error; + if (typeof err === "string") return err; + return null; +} diff --git a/packages/agent-worker/src/openclaw/worker.ts b/packages/agent-worker/src/openclaw/worker.ts index ba8d171f4..a728c0ed9 100644 --- a/packages/agent-worker/src/openclaw/worker.ts +++ b/packages/agent-worker/src/openclaw/worker.ts @@ -68,6 +68,7 @@ import { } from "./plugin-loader"; import { OpenClawProgressProcessor } from "./processor"; import { getOpenClawSessionContext } from "./session-context"; +import { buildToolUseEventPayload } from "./tool-use-events"; import { buildToolPolicy, enforceBashCommandPolicy, @@ -1442,6 +1443,15 @@ Use it when the user references past discussions or you need context.`); } }; + // Track tool-call input args across tool_execution_start → _end. pi-agent + // only includes `args` on the start event; the end event carries + // `toolCallId`, `toolName`, `result`, `isError`. The worker emits one + // SSE `tool_use` per finished call, so it needs to remember the input. + const pendingToolArgs = new Map(); + // Tool-use SSE emits are awaited at agent_end so the `complete` event + // can't race ahead of late tool_use events on slow networks. + const inFlightToolUse: Set> = new Set(); + session.subscribe((event) => { if (suppressProgressOutput) { if (event.type === "agent_end") { @@ -1459,9 +1469,54 @@ Use it when the user references past discussions or you need context.`); } } + // Capture the input args at tool start so we can attach them when the + // matching end event fires. + if (event.type === "tool_execution_start") { + pendingToolArgs.set(event.toolCallId, event.args); + } + + // Surface tool-use traces to SSE clients (promptfoo provider, CLI eval, + // any client subscribed via `event: tool_use`). Worker emits one record + // per tool call at `tool_execution_end` so the result is included. + if (event.type === "tool_execution_end") { + const args = pendingToolArgs.get(event.toolCallId); + pendingToolArgs.delete(event.toolCallId); + const payload = buildToolUseEventPayload({ + toolCallId: event.toolCallId, + toolName: event.toolName, + args, + result: event.result, + isError: event.isError, + }); + const promise = onProgress({ + type: "custom_event", + data: { + name: "tool_use", + payload: payload as unknown as Record, + }, + timestamp: Date.now(), + }).catch((err) => { + logger.warn( + `Failed to emit tool_use custom event for ${event.toolName}:`, + err + ); + }); + inFlightToolUse.add(promise); + promise.finally(() => inFlightToolUse.delete(promise)); + } + if (event.type === "agent_end") { flushDelta() - .then(() => resolveTurnDone?.()) + .then(async () => { + // Wait for any pending tool_use emits so clients don't see + // `complete` arrive before all tool_use records (the provider + // returns on `complete`, and a slow tool_use POST mid-flight + // would otherwise be lost). + if (inFlightToolUse.size > 0) { + await Promise.allSettled(Array.from(inFlightToolUse)); + } + resolveTurnDone?.(); + }) .catch((err) => { logger.error("Failed to flush final delta:", err); resolveTurnDone?.(); diff --git a/packages/agent-worker/src/shared/tool-implementations.ts b/packages/agent-worker/src/shared/tool-implementations.ts index de773be1e..2b55c0d2c 100644 --- a/packages/agent-worker/src/shared/tool-implementations.ts +++ b/packages/agent-worker/src/shared/tool-implementations.ts @@ -909,6 +909,16 @@ export async function getChannelHistory( // MCP Tools (route to MCP proxy /mcp/{mcpId}/tools/{toolName}) // ============================================================================ +/** + * Retrieval tools that we ask the upstream MCP server to return as JSON instead + * of formatted markdown so the worker can include structured `result_summary` + * (event IDs, snippet text) in the `tool_use` SSE event for RAG assertions. + */ +const TOOLS_REQUESTING_JSON_FORMAT = new Set([ + "search_memory", + "lobu_search_memory", +]); + export async function callMcpTool( gw: GatewayParams, mcpId: string, @@ -917,15 +927,23 @@ export async function callMcpTool( ): Promise { return withErrorHandling(`${mcpId}/${toolName}`, async () => { let response: Response; + const wantsJson = TOOLS_REQUESTING_JSON_FORMAT.has(toolName); try { + const headers: Record = { + Authorization: `Bearer ${gw.workerToken}`, + "Content-Type": "application/json", + }; + // Retrieval tools (`search_memory`) opt into JSON-encoded results so the + // worker → SSE `tool_use` event can carry structured `result_summary` + // (event ids + snippet text) to clients like @lobu/promptfoo-provider for + // RAG assertions. Other tools keep the formatted-markdown output the + // agent has been seeing. External MCP servers ignore the header. + if (wantsJson) headers["x-mcp-format"] = "json"; response = await fetch( `${gw.gatewayUrl}/mcp/${mcpId}/tools/${toolName}`, { method: "POST", - headers: { - Authorization: `Bearer ${gw.workerToken}`, - "Content-Type": "application/json", - }, + headers, body: JSON.stringify(args), // Third-party MCP server on the other side — give it a generous // budget but never wait forever. diff --git a/packages/promptfoo-provider/HANDOFF.md b/packages/promptfoo-provider/HANDOFF.md index 22829dba2..3eee06c6f 100644 --- a/packages/promptfoo-provider/HANDOFF.md +++ b/packages/promptfoo-provider/HANDOFF.md @@ -20,14 +20,12 @@ This PR (`feat/promptfoo-evals`) ships the `@lobu/promptfoo-provider` workspace ## Should-fix (unblocks the demo use-cases this PR was originally for) -4. **Gateway SSE protocol needs a `tool_use` event type.** - - Current protocol (from `packages/server/src/gateway/api/response-renderer.ts` + `unified-thread-consumer.ts`): broadcasts only `output` / `complete` / `error` / `status` / `ephemeral` / `question` / `link-button` / `tool-approval` / `suggestion`. No tool-call trace surfaces to clients. - - Impact on this PR: `LobuProvider.metadata.toolCalls` and `metadata.retrievedContext` are always absent. promptfoo RAG assertions (`context-recall`, `context-faithfulness`, `answer-relevance`) and any custom assertion that inspects retrieved evidence are non-functional. - - Sketch: - - Worker emits a structured tool-use record per Claude tool-use block (or aggregated at `complete`) through the existing worker → gateway message bus. - - Gateway broadcasts `tool_use` SSE events with `{ name, input, result_summary?, messageId }`. - - For `search_memory` specifically, include the returned event IDs so the provider can fetch event payloads for `retrievedContext`. - - Once shipped, `LobuProvider` populates the fields with no provider-side config change. +4. **Gateway SSE protocol needs a `tool_use` event type.** — **DONE (PR: feat(gateway): tool_use SSE events for client-side trace inspection)** + - Worker (`packages/agent-worker/src/openclaw/worker.ts` + `tool-use-events.ts`) emits one `tool_use` custom event per `tool_execution_end` pi-agent event. + - Gateway broadcasts it as an SSE event named `tool_use` (via the existing customEvent path in `unified-thread-consumer.ts`) — additive, doesn't touch `output` / `complete` / `error`. + - Payload mirrors Anthropic tool-use blocks: `{ toolCallId, name, input, isError, result_summary?, messageId, timestamp }`. For `search_memory` / `lobu_search_memory`, `result_summary` includes `event_ids` and inline `snippets[]` (id + text) so clients can compute `retrievedContext` without a round-trip. + - `LobuProvider.collectResponse` consumes the new event, accumulates `metadata.toolCalls`, and joins retrieval-tool snippet text into `metadata.retrievedContext`. promptfoo RAG assertions (`context-recall`, `context-faithfulness`, custom `javascript` over `toolCalls`) now work end-to-end. + - Example: `examples/personal-finance/agents/personal-finance/evals/promptfooconfig.yaml` ships a retrieval assertion that asserts the agent called `search_memory` before answering. ## Background context for the follow-up agent diff --git a/packages/promptfoo-provider/README.md b/packages/promptfoo-provider/README.md index 4c68557bc..0c38c36cc 100644 --- a/packages/promptfoo-provider/README.md +++ b/packages/promptfoo-provider/README.md @@ -52,30 +52,38 @@ promptfoo view ```ts { - output: string // final assistant text from the agent + output: string // final assistant text from the agent tokenUsage: { prompt, completion, total } metadata: { agent: string - thread: string // fresh per call by default - traceId?: string // W3C trace id from `traceparent` header - toolCalls?: unknown[] // see "Known limitations" below - retrievedContext?: string // see "Known limitations" below + thread: string // fresh per call by default + traceId?: string // W3C trace id from `traceparent` header + toolCalls?: LobuToolCall[] // every tool call observed during the turn + retrievedContext?: string // joined snippet text from retrieval tools } } ``` -## Known limitations +`toolCalls` mirrors Anthropic's tool-use blocks (`{ name, input, isError?, result_summary? }`) and is populated from the gateway's `tool_use` SSE event. For retrieval tools (`search_memory` / `lobu_search_memory`) the `result_summary` includes the matched event IDs plus the snippet text content, and the provider joins those texts into `metadata.retrievedContext` so promptfoo's RAG assertions can use it directly: -**`metadata.toolCalls` / `metadata.retrievedContext` are not yet populated.** - -The gateway's SSE protocol currently exposes only `output` / `complete` / `error` events to clients. Tool calls (e.g., the agent invoking `search_memory` and receiving event IDs) happen inside the worker but aren't surfaced over SSE. Until that changes: - -- promptfoo's RAG-specific assertions that rely on `contextTransform: 'metadata.retrievedContext'` — `context-recall`, `context-faithfulness`, `answer-relevance` — won't have useful context to work with. -- Custom `javascript` assertions inspecting `metadata.toolCalls` will see `undefined`. - -Workable assertions today: `contains`, `regex`, `equals`, `is-json`, `similar`, `levenshtein`, `llm-rubric`, `factuality`, `cost`, `latency`. These cover answer-quality and behavioral checks. +```yaml +# RAG assertion — promptfoo's `contextTransform` reads from the provider +# response's `metadata` field. +- type: context-recall + contextTransform: 'metadata.retrievedContext' + threshold: 0.5 + value: "the expected fact the agent should have grounded its answer in" + +# Verify a specific tool was called. JS assertions receive the full provider +# response on `context.providerResponse`. +- type: javascript + value: | + const meta = context.providerResponse?.metadata ?? {}; + const calls = Array.isArray(meta.toolCalls) ? meta.toolCalls : []; + return calls.some((c) => c.name === 'search_memory'); +``` -When the gateway adds a `tool_use` SSE event type, this provider will start populating `metadata.toolCalls` and (for `search_memory` specifically) `metadata.retrievedContext`. No promptfoo config change required. +For non-retrieval tools the provider still records the call (name + input) so `javascript` assertions can verify that, e.g., the agent did or didn't call a destructive tool. ## License diff --git a/packages/promptfoo-provider/src/__tests__/provider.test.ts b/packages/promptfoo-provider/src/__tests__/provider.test.ts new file mode 100644 index 000000000..62da3f6dd --- /dev/null +++ b/packages/promptfoo-provider/src/__tests__/provider.test.ts @@ -0,0 +1,161 @@ +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import { LobuProvider } from "../provider"; + +function sseEvent(name: string, data: unknown): string { + return `event: ${name}\ndata: ${JSON.stringify(data)}\n\n`; +} + +function streamFromChunks(chunks: string[]): ReadableStream { + const encoder = new TextEncoder(); + return new ReadableStream({ + start(controller) { + for (const chunk of chunks) controller.enqueue(encoder.encode(chunk)); + controller.close(); + }, + }); +} + +function createFetchStub(events: string[]) { + return mock(async (input: string | URL | Request, init?: RequestInit) => { + const url = typeof input === "string" ? input : input.toString(); + if (url.endsWith("/lobu/api/v1/agents") && init?.method === "POST") { + return new Response( + JSON.stringify({ agentId: "agent-x", token: "session-token" }), + { status: 200, headers: { "content-type": "application/json" } } + ); + } + if (url.includes("/messages")) { + return new Response( + JSON.stringify({ + messageId: "msg-1", + traceparent: "00-traceabcdef-span0-01", + }), + { status: 200, headers: { "content-type": "application/json" } } + ); + } + if (url.endsWith("/events")) { + return new Response(streamFromChunks(events), { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + } + if (init?.method === "DELETE") { + return new Response(null, { status: 204 }); + } + return new Response("not found", { status: 404 }); + }); +} + +const originalFetch = globalThis.fetch; + +describe("LobuProvider tool_use SSE handling", () => { + beforeEach(() => { + process.env.LOBU_TOKEN = "dummy"; + process.env.LOBU_AGENT = "test-agent"; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + test("populates metadata.toolCalls and metadata.retrievedContext from search_memory tool_use events", async () => { + const events = [ + sseEvent("connected", { agentId: "agent-x" }), + sseEvent("output", { + content: "Looking at your records...", + messageId: "msg-1", + }), + sseEvent("tool_use", { + toolCallId: "tc-1", + name: "search_memory", + input: { query: "rent due" }, + isError: false, + result_summary: { + event_ids: [42, 43], + snippets: [ + { id: 42, text: "Tenant Acme pays £1,200 on the 1st" }, + { id: 43, text: "Lease ends Dec 2027" }, + ], + }, + messageId: "msg-1", + }), + sseEvent("output", { + content: " Acme pays £1,200 monthly.", + messageId: "msg-1", + }), + sseEvent("complete", { + messageId: "msg-1", + usage: { input_tokens: 10, output_tokens: 8 }, + }), + ]; + globalThis.fetch = createFetchStub(events) as any; + + const provider = new LobuProvider({ + config: { agent: "test-agent", gateway: "http://gateway", token: "tok" }, + }); + const result = await provider.callApi("when is rent due?"); + + expect(result.output).toBe( + "Looking at your records... Acme pays £1,200 monthly." + ); + expect(result.metadata.toolCalls).toBeDefined(); + expect(result.metadata.toolCalls).toHaveLength(1); + expect(result.metadata.toolCalls![0]!.name).toBe("search_memory"); + expect(result.metadata.toolCalls![0]!.result_summary?.event_ids).toEqual([ + 42, 43, + ]); + expect(result.metadata.retrievedContext).toBe( + "Tenant Acme pays £1,200 on the 1st\n\nLease ends Dec 2027" + ); + expect(result.metadata.traceId).toBe("traceabcdef"); + expect(result.tokenUsage?.total).toBe(18); + }); + + test("ignores tool_use events for other messageIds", async () => { + const events = [ + sseEvent("tool_use", { + toolCallId: "tc-other", + name: "search_memory", + input: { query: "stale" }, + result_summary: { + event_ids: [1], + snippets: [{ id: 1, text: "stale" }], + }, + messageId: "different-message", + }), + sseEvent("output", { content: "hi", messageId: "msg-1" }), + sseEvent("complete", { messageId: "msg-1" }), + ]; + globalThis.fetch = createFetchStub(events) as any; + + const provider = new LobuProvider({ + config: { agent: "test-agent", gateway: "http://gateway", token: "tok" }, + }); + const result = await provider.callApi("hi"); + expect(result.metadata.toolCalls).toBeUndefined(); + expect(result.metadata.retrievedContext).toBeUndefined(); + }); + + test("captures non-retrieval tool calls in metadata.toolCalls without retrievedContext", async () => { + const events = [ + sseEvent("tool_use", { + toolCallId: "tc-bash", + name: "bash", + input: { command: "ls" }, + messageId: "msg-1", + }), + sseEvent("output", { content: "done", messageId: "msg-1" }), + sseEvent("complete", { messageId: "msg-1" }), + ]; + globalThis.fetch = createFetchStub(events) as any; + + const provider = new LobuProvider({ + config: { agent: "test-agent", gateway: "http://gateway", token: "tok" }, + }); + const result = await provider.callApi("ls"); + expect(result.metadata.toolCalls).toEqual([ + { toolCallId: "tc-bash", name: "bash", input: { command: "ls" } }, + ]); + expect(result.metadata.retrievedContext).toBeUndefined(); + }); +}); diff --git a/packages/promptfoo-provider/src/provider.ts b/packages/promptfoo-provider/src/provider.ts index e7c18bb93..2eef24af5 100644 --- a/packages/promptfoo-provider/src/provider.ts +++ b/packages/promptfoo-provider/src/provider.ts @@ -17,6 +17,24 @@ export interface LobuProviderConfig { thread?: string; } +/** + * Structured tool-call trace surfaced by the gateway's `tool_use` SSE event. + * Mirrors Anthropic's tool-use block plus an optional `result_summary` field + * that retrieval tools (`search_memory`) populate so client code can compute + * `retrievedContext` without a round-trip back to the server. + */ +export interface LobuToolCall { + toolCallId?: string; + name: string; + input: unknown; + isError?: boolean; + result_summary?: { + event_ids?: number[]; + snippets?: Array<{ id: number; text: string }>; + error?: string; + }; +} + export interface LobuProviderResponse { output: string; tokenUsage?: { @@ -30,11 +48,14 @@ export interface LobuProviderResponse { agent: string; traceId?: string; thread: string; + /** Tool calls observed during the turn, in order. */ + toolCalls?: LobuToolCall[]; /** - * Placeholder for retrieved-event traces. Populated once the gateway - * exposes tool_use SSE events — see TODO in README. + * Concatenated text content of events returned by retrieval tools + * (currently `search_memory`). Useful as `contextTransform: + * 'metadata.retrievedContext'` for promptfoo's `context-recall` / + * `context-faithfulness` assertions. */ - toolCalls?: unknown[]; retrievedContext?: string; }; } @@ -54,8 +75,12 @@ interface CollectedResponse { totalTokens?: number; }; traceId?: string; + toolCalls?: LobuToolCall[]; + retrievedContext?: string; } +const RETRIEVAL_TOOL_NAMES = new Set(["search_memory", "lobu_search_memory"]); + /** * promptfoo custom provider that drives a Lobu agent end-to-end via the * gateway's public Agent API: @@ -66,9 +91,10 @@ interface CollectedResponse { * DELETE {gateway}/lobu/api/v1/agents/ → cleanup * * One fresh thread per `callApi` invocation by default so promptfoo's repeat / - * scenario semantics see a clean slate. Tool-call traces are not yet captured - * because the gateway SSE protocol doesn't expose them (output/complete/error - * only); see provider.metadata.toolCalls for the placeholder shape. + * scenario semantics see a clean slate. Tool-call traces are surfaced via the + * gateway's `tool_use` SSE event and populated on `metadata.toolCalls`. For + * retrieval tools (`search_memory`) the joined snippet text is also exposed as + * `metadata.retrievedContext` for promptfoo's RAG assertions. */ export class LobuProvider { private readonly agent: string; @@ -149,10 +175,12 @@ export class LobuProvider { agent: this.agent, thread, traceId: response.traceId, - // toolCalls + retrievedContext intentionally absent until the gateway - // SSE protocol surfaces tool_use events. Assertions that depend on - // these (context-recall, context-faithfulness, custom turn-overlap) - // should gate on their presence. + ...(response.toolCalls && response.toolCalls.length > 0 + ? { toolCalls: response.toolCalls } + : {}), + ...(response.retrievedContext + ? { retrievedContext: response.retrievedContext } + : {}), }, }; } finally { @@ -250,6 +278,9 @@ export class LobuProvider { let buffer = ""; let currentEvent = ""; let text = ""; + const toolCalls: LobuToolCall[] = []; + const retrievedSnippets: Array<{ id: number; text: string }> = []; + const seenSnippetIds = new Set(); const matchesTarget = (eventMessageId: unknown): boolean => { if (!messageId) return true; @@ -258,6 +289,23 @@ export class LobuProvider { ); }; + const finalize = ( + extra: Partial = {} + ): CollectedResponse => { + const result: CollectedResponse = { + text, + latencyMs: Date.now() - start, + ...extra, + }; + if (toolCalls.length > 0) result.toolCalls = toolCalls; + if (retrievedSnippets.length > 0) { + result.retrievedContext = retrievedSnippets + .map((s) => s.text) + .join("\n\n"); + } + return result; + }; + while (true) { const { done, value } = await reader.read(); if (done) break; @@ -282,12 +330,27 @@ export class LobuProvider { text += data.content; } break; + case "tool_use": { + if (!matchesTarget(data.messageId)) break; + const call = normaliseToolUseEvent(data); + if (!call) break; + toolCalls.push(call); + if ( + RETRIEVAL_TOOL_NAMES.has(call.name) && + call.result_summary?.snippets + ) { + for (const snippet of call.result_summary.snippets) { + if (seenSnippetIds.has(snippet.id)) continue; + seenSnippetIds.add(snippet.id); + retrievedSnippets.push(snippet); + } + } + break; + } case "complete": { if (!matchesTarget(data.messageId)) break; const usage = data.usage as Record | undefined; - return { - text, - latencyMs: Date.now() - start, + return finalize({ tokens: usage ? { inputTokens: usage.input_tokens ?? usage.inputTokens, @@ -297,15 +360,13 @@ export class LobuProvider { (usage.output_tokens ?? usage.outputTokens ?? 0), } : undefined, - }; + }); } case "error": if (!matchesTarget(data.messageId)) break; - return { - text, - latencyMs: Date.now() - start, + return finalize({ error: String(data.error ?? "Unknown error"), - }; + }); } currentEvent = ""; } else if (line === "") { @@ -314,7 +375,7 @@ export class LobuProvider { } } - return { text, latencyMs: Date.now() - start }; + return finalize(); } catch (err: unknown) { if (err instanceof Error && err.name === "AbortError") { return { text: "", latencyMs: Date.now() - start, error: "Timeout" }; @@ -364,3 +425,42 @@ function parseJSON(str: string): Record | null { return null; } } + +function normaliseToolUseEvent( + data: Record +): LobuToolCall | null { + const name = data.name; + if (typeof name !== "string" || !name) return null; + const call: LobuToolCall = { + name, + input: data.input ?? null, + }; + if (typeof data.toolCallId === "string") call.toolCallId = data.toolCallId; + if (data.isError === true) call.isError = true; + const summary = data.result_summary; + if (summary && typeof summary === "object") { + const parsed: NonNullable = {}; + const ids = (summary as { event_ids?: unknown }).event_ids; + if (Array.isArray(ids)) { + const numeric = ids.filter((id): id is number => typeof id === "number"); + if (numeric.length > 0) parsed.event_ids = numeric; + } + const snippetsRaw = (summary as { snippets?: unknown }).snippets; + if (Array.isArray(snippetsRaw)) { + const snippets: Array<{ id: number; text: string }> = []; + for (const item of snippetsRaw) { + if (!item || typeof item !== "object") continue; + const id = (item as { id?: unknown }).id; + const text = (item as { text?: unknown }).text; + if (typeof id === "number" && typeof text === "string") { + snippets.push({ id, text }); + } + } + if (snippets.length > 0) parsed.snippets = snippets; + } + const errorMsg = (summary as { error?: unknown }).error; + if (typeof errorMsg === "string") parsed.error = errorMsg; + if (Object.keys(parsed).length > 0) call.result_summary = parsed; + } + return call; +} diff --git a/packages/server/src/gateway/__tests__/unified-thread-consumer.test.ts b/packages/server/src/gateway/__tests__/unified-thread-consumer.test.ts index d203614f1..74077c199 100644 --- a/packages/server/src/gateway/__tests__/unified-thread-consumer.test.ts +++ b/packages/server/src/gateway/__tests__/unified-thread-consumer.test.ts @@ -46,6 +46,80 @@ function createConsumer(overrides?: { return { consumer, platformRegistry, renderer }; } +describe("UnifiedThreadResponseConsumer customEvent broadcast", () => { + test("broadcasts tool_use customEvent to conversation + cli session", async () => { + const renderer = { + handleCompletion: mock(async () => undefined), + handleError: mock(async () => undefined), + }; + const queue = { + start: mock(async () => undefined), + stop: mock(async () => undefined), + createQueue: mock(async () => undefined), + work: mock(async () => undefined), + }; + const broadcast = mock(() => undefined); + const sseManager = { broadcast }; + const platformRegistry = { + get: mock(() => ({ getResponseRenderer: () => renderer })), + }; + const consumer = new UnifiedThreadResponseConsumer( + queue as any, + platformRegistry as any, + sseManager as any + ) as any; + + const payload = { + messageId: "m1", + channelId: "api:1", + conversationId: "api:1", + userId: "u1", + teamId: "api", + platform: "api", + timestamp: 1000, + platformMetadata: { sessionId: "cli-session-1" }, + customEvent: { + name: "tool_use", + data: { + toolCallId: "tc-1", + name: "search_memory", + input: { query: "rent" }, + isError: false, + result_summary: { + event_ids: [42], + snippets: [{ id: 42, text: "Rent is due 1st" }], + }, + }, + }, + }; + + await consumer.handleThreadResponse({ id: "job-1", data: payload }); + + const broadcasts = broadcast.mock.calls; + const toolUseBroadcasts = broadcasts.filter( + (call: any[]) => call[1] === "tool_use" + ); + expect(toolUseBroadcasts.length).toBe(2); + const conversationBroadcast = toolUseBroadcasts.find( + (call: any[]) => call[0] === "api:1" + ); + const cliBroadcast = toolUseBroadcasts.find( + (call: any[]) => call[0] === "cli-session-1" + ); + expect(conversationBroadcast).toBeDefined(); + expect(cliBroadcast).toBeDefined(); + expect(conversationBroadcast?.[2]).toMatchObject({ + toolCallId: "tc-1", + name: "search_memory", + result_summary: { + event_ids: [42], + }, + messageId: "m1", + timestamp: 1000, + }); + }); +}); + describe("UnifiedThreadResponseConsumer Chat SDK ownership", () => { test("throws for Chat SDK connection responses not owned by this gateway", async () => { const chatResponseBridge = { diff --git a/packages/server/src/gateway/auth/mcp/proxy.ts b/packages/server/src/gateway/auth/mcp/proxy.ts index c5b8d483a..16fe085ab 100644 --- a/packages/server/src/gateway/auth/mcp/proxy.ts +++ b/packages/server/src/gateway/auth/mcp/proxy.ts @@ -740,6 +740,16 @@ export class McpProxy { id: 1, }); + // Forward the caller's `x-mcp-format` opt-in so internal MCPs (the + // embedded lobu-memory server) can return raw JSON instead of formatted + // markdown. The worker uses this for retrieval tools to surface + // structured `result_summary` (event ids + snippet text) through the + // `tool_use` SSE event. + const callerFormat = c.req.header("x-mcp-format"); + const extraHeaders = callerFormat + ? { "x-mcp-format": callerFormat } + : undefined; + let response = await this.sendUpstreamRequest( httpServer, agentId, @@ -747,7 +757,8 @@ export class McpProxy { "POST", jsonRpcBody, scopeKey, - auth.token + auth.token, + extraHeaders ); // Detect HTTP 401 + WWW-Authenticate → start MCP OAuth 2.1 auth-code flow. @@ -1255,7 +1266,8 @@ export class McpProxy { method: string, body?: string, scopeKey?: string, - directAuthToken?: string + directAuthToken?: string, + extraHeaders?: Record ): Promise { const sessionKey = this.buildSessionKey(agentId, mcpId, scopeKey); const sessionId = this.getSession(sessionKey); @@ -1280,6 +1292,11 @@ export class McpProxy { credentialToken, httpServer.internal === true ); + if (extraHeaders) { + for (const [key, value] of Object.entries(extraHeaders)) { + headers[key] = value; + } + } const response = await fetch(httpServer.upstreamUrl, { method,