Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion proxy/metrics_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ func (mp *metricsMonitor) wrapHandler(
return nil
}
}

if strings.Contains(recorder.Header().Get("Content-Type"), "text/event-stream") {
if parsed, err := processStreamingResponse(modelID, recorder.StartTime(), body); err != nil {
mp.logger.Warnf("error processing streaming response: %v, path=%s, recording minimal metrics", err, request.URL.Path)
Expand All @@ -253,6 +252,14 @@ func (mp *metricsMonitor) wrapHandler(
usage := parsed.Get("usage")
timings := parsed.Get("timings")

// extract timings for infill - response is an array, timings are in the last element
// see #463
if strings.HasPrefix(request.URL.Path, "/infill") {
if arr := parsed.Array(); len(arr) > 0 {
timings = arr[len(arr)-1].Get("timings")
}
}

if usage.Exists() || timings.Exists() {
if parsedMetrics, err := parseMetrics(modelID, recorder.StartTime(), usage, timings); err != nil {
mp.logger.Warnf("error parsing metrics: %v, path=%s, recording minimal metrics", err, request.URL.Path)
Expand Down
69 changes: 69 additions & 0 deletions proxy/metrics_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,75 @@ data: [DONE]
assert.Equal(t, 0, metrics[0].InputTokens)
assert.Equal(t, 0, metrics[0].OutputTokens)
})

t.Run("infill request extracts timings from last array element", func(t *testing.T) {
mm := newMetricsMonitor(testLogger, 10, 0)

// Infill response is an array with timings in the last element
responseBody := `[
{"content": "first chunk"},
{"content": "second chunk"},
{"content": "final", "timings": {
"prompt_n": 150,
"predicted_n": 75,
"prompt_per_second": 200.5,
"predicted_per_second": 35.5,
"prompt_ms": 600.0,
"predicted_ms": 1800.0,
"cache_n": 30
}}
]`

nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(responseBody))
return nil
}

req := httptest.NewRequest("POST", "/infill", nil)
rec := httptest.NewRecorder()
ginCtx, _ := gin.CreateTestContext(rec)

err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler)
assert.NoError(t, err)

metrics := mm.getMetrics()
assert.Equal(t, 1, len(metrics))
assert.Equal(t, "test-model", metrics[0].Model)
assert.Equal(t, 150, metrics[0].InputTokens)
assert.Equal(t, 75, metrics[0].OutputTokens)
assert.Equal(t, 30, metrics[0].CachedTokens)
assert.Equal(t, 200.5, metrics[0].PromptPerSecond)
assert.Equal(t, 35.5, metrics[0].TokensPerSecond)
assert.Equal(t, 2400, metrics[0].DurationMs) // 600 + 1800
})

t.Run("infill request with empty array records minimal metrics", func(t *testing.T) {
mm := newMetricsMonitor(testLogger, 10, 0)

responseBody := `[]`

nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(responseBody))
return nil
}

req := httptest.NewRequest("POST", "/infill", nil)
rec := httptest.NewRecorder()
ginCtx, _ := gin.CreateTestContext(rec)

err := mm.wrapHandler("test-model", ginCtx.Writer, req, nextHandler)
assert.NoError(t, err)

metrics := mm.getMetrics()
assert.Equal(t, 1, len(metrics))
assert.Equal(t, "test-model", metrics[0].Model)
assert.Equal(t, 0, metrics[0].InputTokens)
assert.Equal(t, 0, metrics[0].OutputTokens)
})
}

func TestMetricsMonitor_ResponseBodyCopier(t *testing.T) {
Expand Down
Loading