diff --git a/proxy/metrics_monitor.go b/proxy/metrics_monitor.go index 81030826..df666dd9 100644 --- a/proxy/metrics_monitor.go +++ b/proxy/metrics_monitor.go @@ -220,7 +220,7 @@ func (mp *metricsMonitor) wrapHandler( tm := TokenMetrics{ Timestamp: time.Now(), Model: modelID, - DurationMs: int(time.Since(recorder.StartTime()).Milliseconds()), + DurationMs: int(recorder.Timings().totalDuration().Milliseconds()), } body := recorder.body.Bytes() @@ -241,7 +241,7 @@ func (mp *metricsMonitor) wrapHandler( } } if strings.Contains(recorder.Header().Get("Content-Type"), "text/event-stream") { - if parsed, err := processStreamingResponse(modelID, recorder.StartTime(), body); err != nil { + if parsed, err := processStreamingResponse(modelID, request.URL.Path, recorder.Timings(), body); err != nil { mp.logger.Warnf("error processing streaming response: %v, path=%s, recording minimal metrics", err, request.URL.Path) } else { tm = parsed @@ -249,19 +249,10 @@ func (mp *metricsMonitor) wrapHandler( } else { if gjson.ValidBytes(body) { parsed := gjson.ParseBytes(body) - usage := parsed.Get("usage") - timings := parsed.Get("timings") - - // extract timings for infill - response is an array, timings are in the last element - // see #463 - if strings.HasPrefix(request.URL.Path, "/infill") { - if arr := parsed.Array(); len(arr) > 0 { - timings = arr[len(arr)-1].Get("timings") - } - } + usage, timings := findMetricsPayload(parsed, request.URL.Path) if usage.Exists() || timings.Exists() { - if parsedMetrics, err := parseMetrics(modelID, recorder.StartTime(), usage, timings); err != nil { + if parsedMetrics, err := parseMetrics(modelID, recorder.Timings(), usage, timings, false); err != nil { mp.logger.Warnf("error parsing metrics: %v, path=%s, recording minimal metrics", err, request.URL.Path) } else { tm = parsedMetrics @@ -307,7 +298,7 @@ func (mp *metricsMonitor) wrapHandler( return nil } -func processStreamingResponse(modelID string, start time.Time, body []byte) (TokenMetrics, error) { +func processStreamingResponse(modelID, reqPath string, timingInfo responseTimingInfo, body []byte) (TokenMetrics, error) { // Iterate **backwards** through the body looking for the data payload with // usage data. This avoids allocating a slice of all lines via bytes.Split. @@ -347,11 +338,10 @@ func processStreamingResponse(modelID string, start time.Time, body []byte) (Tok if gjson.ValidBytes(data) { parsed := gjson.ParseBytes(data) - usage := parsed.Get("usage") - timings := parsed.Get("timings") + usage, timings := findMetricsPayload(parsed, reqPath) if usage.Exists() || timings.Exists() { - return parseMetrics(modelID, start, usage, timings) + return parseMetrics(modelID, timingInfo, usage, timings, true) } } } @@ -359,7 +349,40 @@ func processStreamingResponse(modelID string, start time.Time, body []byte) (Tok return TokenMetrics{}, fmt.Errorf("no valid JSON data found in stream") } -func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result) (TokenMetrics, error) { +func findMetricsPayload(parsed gjson.Result, reqPath string) (gjson.Result, gjson.Result) { + candidates := []gjson.Result{parsed} + + if data := parsed.Get("data"); data.Exists() { + candidates = append(candidates, data) + } + if response := parsed.Get("response"); response.Exists() { + candidates = append(candidates, response) + } + if response := parsed.Get("data.response"); response.Exists() { + candidates = append(candidates, response) + } + + for _, candidate := range candidates { + usage := candidate.Get("usage") + timings := candidate.Get("timings") + + // extract timings for infill - response is an array, timings are in the last element + // see #463 + if strings.HasPrefix(reqPath, "/infill") { + if arr := candidate.Array(); len(arr) > 0 { + timings = arr[len(arr)-1].Get("timings") + } + } + + if usage.Exists() || timings.Exists() { + return usage, timings + } + } + + return gjson.Result{}, gjson.Result{} +} + +func parseMetrics(modelID string, timingInfo responseTimingInfo, usage, timings gjson.Result, allowFallback bool) (TokenMetrics, error) { // default values cachedTokens := -1 // unknown or missing data outputTokens := 0 @@ -368,7 +391,7 @@ func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result) // timings data tokensPerSecond := -1.0 promptPerSecond := -1.0 - durationMs := int(time.Since(start).Milliseconds()) + durationMs := int(timingInfo.totalDuration().Milliseconds()) if usage.Exists() { if pt := usage.Get("prompt_tokens"); pt.Exists() { @@ -402,6 +425,10 @@ func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result) if cachedValue := timings.Get("cache_n"); cachedValue.Exists() { cachedTokens = int(cachedValue.Int()) } + } else if allowFallback { + if generationDuration := timingInfo.generationDuration(); generationDuration > 0 && outputTokens > 1 { + tokensPerSecond = float64(outputTokens-1) / generationDuration.Seconds() + } } return TokenMetrics{ @@ -439,9 +466,11 @@ func decompressBody(body []byte, encoding string) ([]byte, error) { // while also capturing it in a buffer for later processing type responseBodyCopier struct { gin.ResponseWriter - body *bytes.Buffer - tee io.Writer - start time.Time + body *bytes.Buffer + tee io.Writer + requestStart time.Time + firstWrite time.Time + lastWrite time.Time } func newBodyCopier(w gin.ResponseWriter) *responseBodyCopier { @@ -450,13 +479,16 @@ func newBodyCopier(w gin.ResponseWriter) *responseBodyCopier { ResponseWriter: w, body: bodyBuffer, tee: io.MultiWriter(w, bodyBuffer), + requestStart: time.Now(), } } func (w *responseBodyCopier) Write(b []byte) (int, error) { - if w.start.IsZero() { - w.start = time.Now() + now := time.Now() + if w.firstWrite.IsZero() { + w.firstWrite = now } + w.lastWrite = now // Single write operation that writes to both the response and buffer return w.tee.Write(b) @@ -471,7 +503,38 @@ func (w *responseBodyCopier) Header() http.Header { } func (w *responseBodyCopier) StartTime() time.Time { - return w.start + return w.firstWrite +} + +type responseTimingInfo struct { + requestStart time.Time + firstWrite time.Time + lastWrite time.Time +} + +func (w *responseBodyCopier) Timings() responseTimingInfo { + return responseTimingInfo{ + requestStart: w.requestStart, + firstWrite: w.firstWrite, + lastWrite: w.lastWrite, + } +} + +func (t responseTimingInfo) totalDuration() time.Duration { + if !t.requestStart.IsZero() { + if !t.lastWrite.IsZero() { + return t.lastWrite.Sub(t.requestStart) + } + return time.Since(t.requestStart) + } + return 0 +} + +func (t responseTimingInfo) generationDuration() time.Duration { + if t.firstWrite.IsZero() || t.lastWrite.IsZero() { + return 0 + } + return t.lastWrite.Sub(t.firstWrite) } // sensitiveHeaders lists headers that should be redacted in captures diff --git a/proxy/metrics_monitor_test.go b/proxy/metrics_monitor_test.go index 32b6846a..7902fa3f 100644 --- a/proxy/metrics_monitor_test.go +++ b/proxy/metrics_monitor_test.go @@ -198,6 +198,74 @@ func TestMetricsMonitor_WrapHandler(t *testing.T) { assert.Equal(t, 50, metrics[0].OutputTokens) }) + t.Run("successful responses request with input and output usage data", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + responseBody := `{ + "object": "response", + "output": [{ + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "Hello"}] + }], + "usage": { + "input_tokens": 120, + "output_tokens": 45 + } + }` + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(responseBody)) + time.Sleep(10 * time.Millisecond) + return nil + } + + req := httptest.NewRequest("POST", "/v1/responses", nil) + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler) + assert.NoError(t, err) + + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, "test-model", metrics[0].Model) + assert.Equal(t, 120, metrics[0].InputTokens) + assert.Equal(t, 45, metrics[0].OutputTokens) + assert.Equal(t, -1.0, metrics[0].PromptPerSecond) + assert.Equal(t, -1.0, metrics[0].TokensPerSecond) + }) + + t.Run("chunked non-streaming responses request does not estimate generation speed", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"object":"response","usage":{"input_tokens":120,`)) + time.Sleep(15 * time.Millisecond) + _, _ = w.Write([]byte(`"output_tokens":45,"total_tokens":165},`)) + time.Sleep(15 * time.Millisecond) + _, _ = w.Write([]byte(`"output":[{"type":"message","role":"assistant","content":[{"type":"output_text","text":"Hello"}]}]}`)) + return nil + } + + req := httptest.NewRequest("POST", "/v1/responses", nil) + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler) + assert.NoError(t, err) + + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, 120, metrics[0].InputTokens) + assert.Equal(t, 45, metrics[0].OutputTokens) + assert.Equal(t, -1.0, metrics[0].TokensPerSecond) + }) + t.Run("successful request with timings data", func(t *testing.T) { mm := newMetricsMonitor(testLogger, 10, 0) @@ -679,6 +747,95 @@ data: [DONE] assert.Equal(t, 50, metrics[0].OutputTokens) }) + t.Run("finds metrics in OpenAI Responses completion event", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + responseBody := `data: {"event":"response.created","data":{"type":"response.created","response":{"id":"resp_123","object":"response","status":"in_progress"}}} + +data: {"event":"response.output_text.delta","data":{"type":"response.output_text.delta","item_id":"msg_123","delta":"Hello"}} + +data: {"event":"response.completed","data":{"type":"response.completed","response":{"id":"resp_123","object":"response","status":"completed","usage":{"input_tokens":80,"output_tokens":25,"total_tokens":105}}}} + +data: [DONE] + +` + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + w.Write([]byte(responseBody)) + return nil + } + + req := httptest.NewRequest("POST", "/v1/responses", nil) + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler) + assert.NoError(t, err) + + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, 80, metrics[0].InputTokens) + assert.Equal(t, 25, metrics[0].OutputTokens) + }) + + t.Run("estimates prompt and generation speed for streamed Responses events", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + time.Sleep(15 * time.Millisecond) + _, _ = w.Write([]byte("data: {\"event\":\"response.output_text.delta\",\"data\":{\"type\":\"response.output_text.delta\",\"item_id\":\"msg_123\",\"delta\":\"Hello\"}}\n\n")) + time.Sleep(20 * time.Millisecond) + _, _ = w.Write([]byte("data: {\"event\":\"response.completed\",\"data\":{\"type\":\"response.completed\",\"response\":{\"id\":\"resp_123\",\"object\":\"response\",\"status\":\"completed\",\"usage\":{\"input_tokens\":80,\"output_tokens\":25,\"total_tokens\":105}}}}\n\n")) + _, _ = w.Write([]byte("data: [DONE]\n\n")) + return nil + } + + req := httptest.NewRequest("POST", "/v1/responses", nil) + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler) + assert.NoError(t, err) + + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, 80, metrics[0].InputTokens) + assert.Equal(t, 25, metrics[0].OutputTokens) + assert.Equal(t, -1.0, metrics[0].PromptPerSecond) + assert.Greater(t, metrics[0].TokensPerSecond, 0.0) + }) + + t.Run("single write fallback leaves generation speed unknown", func(t *testing.T) { + mm := newMetricsMonitor(testLogger, 10, 0) + + nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + time.Sleep(15 * time.Millisecond) + _, _ = w.Write([]byte("data: {\"event\":\"response.completed\",\"data\":{\"type\":\"response.completed\",\"response\":{\"id\":\"resp_123\",\"object\":\"response\",\"status\":\"completed\",\"usage\":{\"input_tokens\":8,\"output_tokens\":1,\"total_tokens\":9}}}}\n\n")) + _, _ = w.Write([]byte("data: [DONE]\n\n")) + return nil + } + + req := httptest.NewRequest("POST", "/v1/responses", nil) + rec := httptest.NewRecorder() + ginCtx, _ := gin.CreateTestContext(rec) + + err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler) + assert.NoError(t, err) + + metrics := mm.getMetrics() + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, 8, metrics[0].InputTokens) + assert.Equal(t, 1, metrics[0].OutputTokens) + assert.Equal(t, -1.0, metrics[0].PromptPerSecond) + assert.Equal(t, -1.0, metrics[0].TokensPerSecond) + }) + t.Run("handles streaming with no valid JSON records minimal metrics", func(t *testing.T) { mm := newMetricsMonitor(testLogger, 10, 0) diff --git a/ui-svelte/src/components/CaptureDialog.svelte b/ui-svelte/src/components/CaptureDialog.svelte index 088bbd8e..36659c0a 100644 --- a/ui-svelte/src/components/CaptureDialog.svelte +++ b/ui-svelte/src/components/CaptureDialog.svelte @@ -31,7 +31,7 @@ const reqCt = getContentType(capture.req_headers); const respCt = getContentType(capture.resp_headers); reqBodyTab = reqCt.includes("json") ? "pretty" : "raw"; - respBodyTab = respCt.includes("text/event-stream") + respBodyTab = isSSE && (renderedResponse.reasoning || renderedResponse.content) ? "chat" : respCt.includes("json") ? "pretty" @@ -89,13 +89,80 @@ return `data:${mimeType};base64,${body}`; } - interface SSEChat { + interface RenderedResponse { reasoning: string; content: string; } - function parseSSEChat(text: string): SSEChat { - const result: SSEChat = { reasoning: "", content: "" }; + function appendResponseOutput( + result: RenderedResponse, + response: unknown, + ): RenderedResponse { + const next = { ...result }; + const record = response as + | { + output?: Array<{ + type?: string; + content?: Array<{ type?: string; text?: string }>; + }>; + choices?: Array<{ + message?: { + content?: string | Array<{ type?: string; text?: string }>; + reasoning_content?: string; + }; + }>; + output_text?: string; + } + | undefined; + + if (!record) return next; + + if (typeof record.output_text === "string") { + next.content += record.output_text; + } + + for (const choice of record.choices || []) { + const message = choice.message; + if (!message) continue; + if (typeof message.reasoning_content === "string") { + next.reasoning += message.reasoning_content; + } + if (typeof message.content === "string") { + next.content += message.content; + } else { + for (const part of message.content || []) { + if (part?.type === "text" && typeof part.text === "string") { + next.content += part.text; + } + } + } + } + + for (const item of record.output || []) { + if (item?.type === "reasoning") { + for (const part of item.content || []) { + if (typeof part?.text === "string") { + next.reasoning += part.text; + } + } + } + if (item?.type === "message") { + for (const part of item.content || []) { + if (part?.type === "output_text" && typeof part.text === "string") { + next.content += part.text; + } + } + } + } + + return next; + } + + function parseSSEChat(text: string): RenderedResponse { + const result: RenderedResponse = { reasoning: "", content: "" }; + const seenReasoningDeltaItems = new Set(); + const seenContentDeltaItems = new Set(); + for (const line of text.split("\n")) { const trimmed = line.trim(); if (!trimmed || !trimmed.startsWith("data: ")) continue; @@ -106,6 +173,42 @@ const delta = parsed.choices?.[0]?.delta; if (delta?.content) result.content += delta.content; if (delta?.reasoning_content) result.reasoning += delta.reasoning_content; + + const eventType = parsed.event || parsed.type; + if ( + eventType === "response.reasoning_text.delta" && + typeof parsed.data?.delta === "string" + ) { + result.reasoning += parsed.data.delta; + if (typeof parsed.data?.item_id === "string") { + seenReasoningDeltaItems.add(parsed.data.item_id); + } + } + if ( + eventType === "response.output_text.delta" && + typeof parsed.data?.delta === "string" + ) { + result.content += parsed.data.delta; + if (typeof parsed.data?.item_id === "string") { + seenContentDeltaItems.add(parsed.data.item_id); + } + } + if ( + eventType === "response.output_text.done" && + typeof parsed.data?.text === "string" && + typeof parsed.data?.item_id === "string" && + !seenContentDeltaItems.has(parsed.data.item_id) + ) { + result.content += parsed.data.text; + } + if ( + eventType === "response.completed" && + parsed.data?.response && + !result.reasoning && + !result.content + ) { + return appendResponseOutput(result, parsed.data.response); + } } catch { // skip unparseable lines } @@ -131,8 +234,8 @@ function getCopyText(): string { if (respBodyTab === "chat") { let text = ""; - if (sseChat.reasoning) text += sseChat.reasoning + "\n\n"; - text += sseChat.content; + if (renderedResponse.reasoning) text += renderedResponse.reasoning + "\n\n"; + text += renderedResponse.content; return text; } return displayedResponseBody; @@ -177,14 +280,16 @@ return formatJson(responseBodyRaw); }); - let sseChat = $derived.by(() => { - if (!isSSE || !responseBodyRaw) - return { reasoning: "", content: "" } as SSEChat; - return parseSSEChat(responseBodyRaw); + let renderedResponse = $derived.by(() => { + if (!responseBodyRaw) return { reasoning: "", content: "" } as RenderedResponse; + if (isSSE) return parseSSEChat(responseBodyRaw); + return { reasoning: "", content: "" } as RenderedResponse; }); let displayedResponseBody = $derived.by(() => { - if (respBodyTab === "pretty") return responseBodyPretty; + if (respBodyTab === "pretty") { + return responseBodyPretty; + } return responseBodyRaw; }); @@ -333,7 +438,7 @@ {:else if isSSE || isResponseText}
- {#if isSSE} + {#if isSSE && (renderedResponse.reasoning || renderedResponse.content)}