diff --git a/proxy/metrics_monitor.go b/proxy/metrics_monitor.go index 283c16623..64ae028ee 100644 --- a/proxy/metrics_monitor.go +++ b/proxy/metrics_monitor.go @@ -13,6 +13,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/mostlygeek/llama-swap/proxy/config" "github.com/klauspost/compress/zstd" "github.com/mostlygeek/llama-swap/event" "github.com/tidwall/gjson" @@ -95,6 +96,7 @@ func (e TokenMetricsEvent) Type() uint32 { // metricsMonitor parses llama-server output for token statistics type metricsMonitor struct { + config config.Config mu sync.RWMutex metrics []TokenMetrics maxMetrics int @@ -111,8 +113,9 @@ type metricsMonitor struct { // newMetricsMonitor creates a new metricsMonitor. captureBufferMB is the // capture buffer size in megabytes; 0 disables captures. -func newMetricsMonitor(logger *LogMonitor, maxMetrics int, captureBufferMB int) *metricsMonitor { +func newMetricsMonitor(cfg config.Config, logger *LogMonitor, maxMetrics int, captureBufferMB int) *metricsMonitor { return &metricsMonitor{ + config: cfg, logger: logger, maxMetrics: maxMetrics, enableCaptures: captureBufferMB > 0, @@ -130,6 +133,10 @@ func (mp *metricsMonitor) addMetrics(metric TokenMetrics) int { defer mp.mu.Unlock() metric.ID = mp.nextID + // Resolve modelID to display name (first alias or modelID itself) + if modelConfig, exists := mp.config.Models[metric.Model]; exists && len(modelConfig.Aliases) > 0 { + metric.Model = modelConfig.Aliases[0] + } mp.nextID++ mp.metrics = append(mp.metrics, metric) if len(mp.metrics) > mp.maxMetrics { @@ -271,6 +278,9 @@ func (mp *metricsMonitor) wrapHandler( request.Header.Set("Accept-Encoding", filterAcceptEncoding(ae)) } + // Capture wall clock time before proxying the request + requestStart := time.Now() + if err := next(modelID, recorder, request); err != nil { return err } @@ -287,7 +297,7 @@ func (mp *metricsMonitor) wrapHandler( tm := TokenMetrics{ Timestamp: time.Now(), Model: modelID, - DurationMs: int(time.Since(recorder.StartTime()).Milliseconds()), + DurationMs: int(time.Since(requestStart).Milliseconds()), } body := recorder.body.Bytes() @@ -308,7 +318,7 @@ func (mp *metricsMonitor) wrapHandler( } } if strings.Contains(recorder.Header().Get("Content-Type"), "text/event-stream") { - if parsed, err := processStreamingResponse(modelID, recorder.StartTime(), body); err != nil { + if parsed, err := processStreamingResponse(modelID, requestStart, body); err != nil { mp.logger.Warnf("error processing streaming response: %v, path=%s, recording minimal metrics", err, request.URL.Path) } else { tm = parsed @@ -328,7 +338,7 @@ func (mp *metricsMonitor) wrapHandler( } if usage.Exists() || timings.Exists() { - if parsedMetrics, err := parseMetrics(modelID, recorder.StartTime(), usage, timings); err != nil { + if parsedMetrics, err := parseMetrics(modelID, requestStart, usage, timings); err != nil { mp.logger.Warnf("error parsing metrics: %v, path=%s, recording minimal metrics", err, request.URL.Path) } else { tm = parsedMetrics @@ -481,6 +491,17 @@ func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result) } } + // Fallback: estimate speeds from wall clock when timings unavailable (e.g., vLLM) + if !timings.Exists() && wallDurationMs > 0 { + durationSec := float64(wallDurationMs) / 1000.0 + if inputTokens > 0 { + promptPerSecond = float64(inputTokens) / durationSec + } + if outputTokens > 0 { + tokensPerSecond = float64(outputTokens) / durationSec + } + } + return TokenMetrics{ Timestamp: time.Now(), Model: modelID, diff --git a/proxy/metrics_monitor_test.go b/proxy/metrics_monitor_test.go index 48372d9e2..d908f3138 100644 --- a/proxy/metrics_monitor_test.go +++ b/proxy/metrics_monitor_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/mostlygeek/llama-swap/proxy/config" "github.com/mostlygeek/llama-swap/event" "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" @@ -20,7 +21,7 @@ import ( func TestMetricsMonitor_AddMetrics(t *testing.T) { t.Run("adds metrics and assigns ID", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) metric := TokenMetrics{ Model: "test-model", @@ -39,7 +40,7 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { }) t.Run("increments ID for each metric", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) for i := 0; i < 5; i++ { mm.addMetrics(TokenMetrics{Model: "model"}) @@ -53,7 +54,7 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { }) t.Run("respects max metrics limit", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 3, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 3, 0) // Add 5 metrics for i := 0; i < 5; i++ { @@ -73,7 +74,7 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { }) t.Run("emits TokenMetricsEvent", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) receivedEvent := make(chan TokenMetricsEvent, 1) cancel := event.On(func(e TokenMetricsEvent) { @@ -103,14 +104,14 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { func TestMetricsMonitor_GetMetrics(t *testing.T) { t.Run("returns empty slice when no metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) metrics := mm.getMetrics() assert.NotNil(t, metrics) assert.Equal(t, 0, len(metrics)) }) t.Run("returns copy of metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) mm.addMetrics(TokenMetrics{Model: "model1"}) mm.addMetrics(TokenMetrics{Model: "model2"}) @@ -130,7 +131,7 @@ func TestMetricsMonitor_GetMetrics(t *testing.T) { func TestMetricsMonitor_GetMetricsJSON(t *testing.T) { t.Run("returns valid JSON for empty metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) jsonData, err := mm.getMetricsJSON() assert.NoError(t, err) assert.NotNil(t, jsonData) @@ -142,7 +143,7 @@ func TestMetricsMonitor_GetMetricsJSON(t *testing.T) { }) t.Run("returns valid JSON with metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) mm.addMetrics(TokenMetrics{ Model: "model1", InputTokens: 100, @@ -170,7 +171,7 @@ func TestMetricsMonitor_GetMetricsJSON(t *testing.T) { func TestMetricsMonitor_WrapHandler(t *testing.T) { t.Run("successful non-streaming request with usage data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{ "usage": { @@ -201,7 +202,7 @@ func TestMetricsMonitor_WrapHandler(t *testing.T) { }) t.Run("successful request with timings data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{ "timings": { @@ -241,7 +242,7 @@ func TestMetricsMonitor_WrapHandler(t *testing.T) { }) t.Run("streaming request with SSE format", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Note: SSE format requires proper line breaks - each data line followed by blank line responseBody := `data: {"choices":[{"text":"Hello"}]} @@ -277,7 +278,7 @@ data: [DONE] }) t.Run("non-OK status code does not record metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { w.WriteHeader(http.StatusBadRequest) @@ -297,7 +298,7 @@ data: [DONE] }) t.Run("empty response body records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { w.WriteHeader(http.StatusOK) @@ -319,7 +320,7 @@ data: [DONE] }) t.Run("invalid JSON records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { w.Header().Set("Content-Type", "application/json") @@ -343,7 +344,7 @@ data: [DONE] }) t.Run("next handler error is propagated", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) expectedErr := assert.AnError nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { @@ -362,7 +363,7 @@ data: [DONE] }) t.Run("response without usage or timings records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{"result": "ok"}` @@ -388,7 +389,7 @@ data: [DONE] }) t.Run("infill request extracts timings from last array element", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Infill response is an array with timings in the last element responseBody := `[ @@ -431,7 +432,7 @@ data: [DONE] }) t.Run("infill request with empty array records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `[]` @@ -508,7 +509,7 @@ func TestMetricsMonitor_ResponseBodyCopier(t *testing.T) { func TestMetricsMonitor_Concurrent(t *testing.T) { t.Run("concurrent addMetrics is safe", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 1000, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 1000, 0) var wg sync.WaitGroup numGoroutines := 10 @@ -535,7 +536,7 @@ func TestMetricsMonitor_Concurrent(t *testing.T) { }) t.Run("concurrent reads and writes are safe", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 100, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 100, 0) done := make(chan bool) @@ -594,7 +595,7 @@ func TestMetricsMonitor_ParseMetrics(t *testing.T) { }) t.Run("prefers timings over usage data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Timings should take precedence over usage responseBody := `{ @@ -634,7 +635,7 @@ func TestMetricsMonitor_ParseMetrics(t *testing.T) { }) t.Run("handles missing cache_n in timings", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{ "timings": { @@ -669,7 +670,7 @@ func TestMetricsMonitor_ParseMetrics(t *testing.T) { func TestMetricsMonitor_StreamingResponse(t *testing.T) { t.Run("finds metrics in last valid SSE data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Metrics should be found in the last data line before [DONE] responseBody := `data: {"choices":[{"text":"First"}]} @@ -703,7 +704,7 @@ data: [DONE] }) t.Run("handles streaming with no valid JSON records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `data: not json @@ -733,7 +734,7 @@ data: [DONE] }) t.Run("v1/responses format with nested response.usage", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // v1/responses SSE format: usage is nested under response.usage responseBody := "event: response.completed\n" + @@ -762,7 +763,7 @@ data: [DONE] }) t.Run("handles empty streaming response records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `` @@ -790,7 +791,7 @@ data: [DONE] // Benchmark tests func BenchmarkMetricsMonitor_AddMetrics(b *testing.B) { - mm := newMetricsMonitor(testLogger, 1000, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 1000, 0) metric := TokenMetrics{ Model: "test-model", @@ -811,7 +812,7 @@ func BenchmarkMetricsMonitor_AddMetrics(b *testing.B) { func BenchmarkMetricsMonitor_AddMetrics_SmallBuffer(b *testing.B) { // Test performance with a smaller buffer where wrapping occurs more frequently - mm := newMetricsMonitor(testLogger, 100, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 100, 0) metric := TokenMetrics{ Model: "test-model", @@ -832,7 +833,7 @@ func BenchmarkMetricsMonitor_AddMetrics_SmallBuffer(b *testing.B) { func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { t.Run("gzip encoded response", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{"usage": {"prompt_tokens": 100, "completion_tokens": 50}}` @@ -866,7 +867,7 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { }) t.Run("deflate encoded response", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{"usage": {"prompt_tokens": 200, "completion_tokens": 75}}` @@ -900,7 +901,7 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { }) t.Run("invalid gzip data records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Invalid compressed data invalidData := []byte("this is not gzip data") @@ -928,7 +929,7 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { }) t.Run("unknown encoding treated as uncompressed", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{"usage": {"prompt_tokens": 300, "completion_tokens": 100}}` @@ -980,7 +981,7 @@ func TestReqRespCapture_CompressedSize(t *testing.T) { func TestMetricsMonitor_AddCapture(t *testing.T) { t.Run("does nothing when captures disabled", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) capture := ReqRespCapture{ ID: 0, @@ -993,7 +994,7 @@ func TestMetricsMonitor_AddCapture(t *testing.T) { }) t.Run("adds capture when enabled", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) capture := ReqRespCapture{ ID: 0, @@ -1014,7 +1015,7 @@ func TestMetricsMonitor_AddCapture(t *testing.T) { }) t.Run("evicts oldest when exceeding max size", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) // Each full ReqRespCapture with 80 bytes random data compresses to ~185 bytes. // 2 captures = ~370 bytes, 3 captures = ~555 bytes. Set limit so only 2 fit. mm.maxCaptureSize = 450 @@ -1041,7 +1042,7 @@ func TestMetricsMonitor_AddCapture(t *testing.T) { }) t.Run("skips capture larger than max size", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) mm.maxCaptureSize = 100 // Use random data that doesn't compress well to create an oversized capture @@ -1056,13 +1057,13 @@ func TestMetricsMonitor_AddCapture(t *testing.T) { func TestMetricsMonitor_GetCaptureByID(t *testing.T) { t.Run("returns nil for non-existent ID", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) assert.Nil(t, mm.getCaptureByID(999, false)) }) t.Run("returns decompressed capture by ID", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) capture := ReqRespCapture{ ID: 42, @@ -1083,7 +1084,7 @@ func TestMetricsMonitor_GetCaptureByID(t *testing.T) { }) t.Run("returns compressed bytes when decompress=false", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) capture := ReqRespCapture{ ID: 42, @@ -1145,7 +1146,7 @@ func TestRedactHeaders(t *testing.T) { func TestMetricsMonitor_WrapHandler_Capture(t *testing.T) { t.Run("captures request and response when enabled", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) requestBody := `{"model": "test", "prompt": "hello"}` responseBody := `{"usage": {"prompt_tokens": 100, "completion_tokens": 50}}` @@ -1190,7 +1191,7 @@ func TestMetricsMonitor_WrapHandler_Capture(t *testing.T) { }) t.Run("does not capture when disabled", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) requestBody := `{"model": "test"}` responseBody := `{"usage": {"prompt_tokens": 100, "completion_tokens": 50}}` diff --git a/proxy/proxymanager.go b/proxy/proxymanager.go index ee1d34849..f16aa5a80 100644 --- a/proxy/proxymanager.go +++ b/proxy/proxymanager.go @@ -190,7 +190,7 @@ func New(proxyConfig config.Config) *ProxyManager { muxLogger: muxLogger, upstreamLogger: upstreamLogger, - metricsMonitor: newMetricsMonitor(proxyLogger, maxMetrics, proxyConfig.CaptureBuffer), + metricsMonitor: newMetricsMonitor(proxyConfig, proxyLogger, maxMetrics, proxyConfig.CaptureBuffer), processGroups: make(map[string]*ProcessGroup),