diff --git a/event/default_test.go b/event/default_test.go index ded5a2e8b..2d845eed9 100644 --- a/event/default_test.go +++ b/event/default_test.go @@ -1,54 +1,54 @@ -// Copyright (c) Roman Atachiants and contributore. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for detaile. - -package event - -import ( - "sync" - "sync/atomic" - "testing" - - "github.com/stretchr/testify/assert" -) - -/* -cpu: 13th Gen Intel(R) Core(TM) i7-13700K -BenchmarkSubcribeConcurrent-24 1826686 606.3 ns/op 1648 B/op 5 allocs/op -*/ -func BenchmarkSubscribeConcurrent(b *testing.B) { - d := NewDispatcher() - b.ReportAllocs() - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - unsub := Subscribe(d, func(ev MyEvent1) {}) - unsub() - } - }) -} - -func TestDefaultPublish(t *testing.T) { - var wg sync.WaitGroup - - // Subscribe - var count int64 - defer On(func(ev MyEvent1) { - atomic.AddInt64(&count, 1) - wg.Done() - })() - - defer OnType(TypeEvent1, func(ev MyEvent1) { - atomic.AddInt64(&count, 1) - wg.Done() - })() - - // Publish - wg.Add(4) - Emit(MyEvent1{}) - Emit(MyEvent1{}) - - // Wait and check - wg.Wait() - assert.Equal(t, int64(4), count) -} +// Copyright (c) Roman Atachiants and contributore. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for detaile. + +package event + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" +) + +/* +cpu: 13th Gen Intel(R) Core(TM) i7-13700K +BenchmarkSubcribeConcurrent-24 1826686 606.3 ns/op 1648 B/op 5 allocs/op +*/ +func BenchmarkSubscribeConcurrent(b *testing.B) { + d := NewDispatcher() + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + unsub := Subscribe(d, func(ev MyEvent1) {}) + unsub() + } + }) +} + +func TestDefaultPublish(t *testing.T) { + var wg sync.WaitGroup + + // Subscribe + var count int64 + defer On(func(ev MyEvent1) { + atomic.AddInt64(&count, 1) + wg.Done() + })() + + defer OnType(TypeEvent1, func(ev MyEvent1) { + atomic.AddInt64(&count, 1) + wg.Done() + })() + + // Publish + wg.Add(4) + Emit(MyEvent1{}) + Emit(MyEvent1{}) + + // Wait and check + wg.Wait() + assert.Equal(t, int64(4), count) +} diff --git a/event/event_test.go b/event/event_test.go index 1868ef021..e10cd542a 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -1,324 +1,324 @@ -// Copyright (c) Roman Atachiants and contributore. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for detaile. - -package event - -import ( - "fmt" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestPublish(t *testing.T) { - d := NewDispatcher() - var wg sync.WaitGroup - - // Subscribe, must be received in order - var count int64 - defer Subscribe(d, func(ev MyEvent1) { - assert.Equal(t, int(atomic.AddInt64(&count, 1)), ev.Number) - wg.Done() - })() - - // Publish - wg.Add(3) - Publish(d, MyEvent1{Number: 1}) - Publish(d, MyEvent1{Number: 2}) - Publish(d, MyEvent1{Number: 3}) - - // Wait and check - wg.Wait() - assert.Equal(t, int64(3), count) -} - -func TestUnsubscribe(t *testing.T) { - d := NewDispatcher() - assert.Equal(t, 0, d.count(TypeEvent1)) - unsubscribe := Subscribe(d, func(ev MyEvent1) { - // Nothing - }) - - assert.Equal(t, 1, d.count(TypeEvent1)) - unsubscribe() - assert.Equal(t, 0, d.count(TypeEvent1)) -} - -func TestConcurrent(t *testing.T) { - const max = 1000000 - var count int64 - var wg sync.WaitGroup - wg.Add(1) - - d := NewDispatcher() - defer Subscribe(d, func(ev MyEvent1) { - if current := atomic.AddInt64(&count, 1); current == max { - wg.Done() - } - })() - - // Asynchronously publish - go func() { - for i := 0; i < max; i++ { - Publish(d, MyEvent1{}) - } - }() - - defer Subscribe(d, func(ev MyEvent1) { - // Subscriber that does nothing - })() - - wg.Wait() - assert.Equal(t, max, int(count)) -} - -func TestSubscribeDifferentType(t *testing.T) { - d := NewDispatcher() - assert.Panics(t, func() { - SubscribeTo(d, TypeEvent1, func(ev MyEvent1) {}) - SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {}) - }) -} - -func TestPublishDifferentType(t *testing.T) { - d := NewDispatcher() - assert.Panics(t, func() { - SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {}) - Publish(d, MyEvent1{}) - }) -} - -func TestCloseDispatcher(t *testing.T) { - d := NewDispatcher() - defer SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})() - - assert.NoError(t, d.Close()) - assert.Panics(t, func() { - SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {}) - }) -} - -func TestMatrix(t *testing.T) { - const amount = 1000 - for _, subs := range []int{1, 10, 100} { - for _, topics := range []int{1, 10} { - expected := subs * topics * amount - t.Run(fmt.Sprintf("%dx%d", topics, subs), func(t *testing.T) { - var count atomic.Int64 - var wg sync.WaitGroup - wg.Add(expected) - - d := NewDispatcher() - for i := 0; i < subs; i++ { - for id := 0; id < topics; id++ { - defer SubscribeTo(d, uint32(id), func(ev MyEvent3) { - count.Add(1) - wg.Done() - })() - } - } - - for n := 0; n < amount; n++ { - for id := 0; id < topics; id++ { - go Publish(d, MyEvent3{ID: id}) - } - } - - wg.Wait() - assert.Equal(t, expected, int(count.Load())) - }) - } - } -} - -func TestConcurrentSubscriptionRace(t *testing.T) { - // This test specifically targets the race condition that occurs when multiple - // goroutines try to subscribe to different event types simultaneously. - // Without the CAS loop, subscriptions could be lost due to registry corruption. - - const numGoroutines = 100 - const numEventTypes = 50 - - d := NewDispatcher() - defer d.Close() - - var wg sync.WaitGroup - var receivedCount int64 - var subscribedTypes sync.Map // Thread-safe map - - wg.Add(numGoroutines) - - // Start multiple goroutines that subscribe to different event types concurrently - for i := 0; i < numGoroutines; i++ { - go func(goroutineID int) { - defer wg.Done() - - // Each goroutine subscribes to a unique event type - eventType := uint32(goroutineID%numEventTypes + 1000) // Offset to avoid collision with other tests - - // Subscribe to the event type - SubscribeTo(d, eventType, func(ev MyEvent3) { - atomic.AddInt64(&receivedCount, 1) - }) - - // Record that this type was subscribed - subscribedTypes.Store(eventType, true) - }(i) - } - - // Wait for all subscriptions to complete - wg.Wait() - - // Count the number of unique event types subscribed - expectedTypes := 0 - subscribedTypes.Range(func(key, value interface{}) bool { - expectedTypes++ - return true - }) - - // Small delay to ensure all subscriptions are fully processed - time.Sleep(10 * time.Millisecond) - - // Publish events to each subscribed type - subscribedTypes.Range(func(key, value interface{}) bool { - eventType := key.(uint32) - Publish(d, MyEvent3{ID: int(eventType)}) - return true - }) - - // Wait for all events to be processed - time.Sleep(50 * time.Millisecond) - - // Verify that we received at least the expected number of events - // (there might be more if multiple goroutines subscribed to the same event type) - received := atomic.LoadInt64(&receivedCount) - assert.GreaterOrEqual(t, int(received), expectedTypes, - "Should have received at least %d events, got %d", expectedTypes, received) - - // Verify that we have the expected number of unique event types - assert.Equal(t, numEventTypes, expectedTypes, - "Should have exactly %d unique event types", numEventTypes) -} - -func TestConcurrentHandlerRegistration(t *testing.T) { - const numGoroutines = 100 - - // Test concurrent subscriptions to the same event type - t.Run("SameEventType", func(t *testing.T) { - d := NewDispatcher() - var handlerCount int64 - var wg sync.WaitGroup - - // Start multiple goroutines subscribing to the same event type (0x1) - for i := 0; i < numGoroutines; i++ { - wg.Add(1) - go func() { - defer wg.Done() - SubscribeTo(d, uint32(0x1), func(ev MyEvent1) { - atomic.AddInt64(&handlerCount, 1) - }) - }() - } - - wg.Wait() - - // Verify all handlers were registered by publishing an event - atomic.StoreInt64(&handlerCount, 0) - Publish(d, MyEvent1{}) - - // Small delay to ensure all handlers have executed - time.Sleep(10 * time.Millisecond) - - assert.Equal(t, int64(numGoroutines), atomic.LoadInt64(&handlerCount), - "Not all handlers were registered due to race condition") - }) - - // Test concurrent subscriptions to different event types - t.Run("DifferentEventTypes", func(t *testing.T) { - d := NewDispatcher() - var wg sync.WaitGroup - receivedEvents := make(map[uint32]*int64) - - // Create multiple event types and subscribe concurrently - for i := 0; i < numGoroutines; i++ { - eventType := uint32(100 + i) - counter := new(int64) - receivedEvents[eventType] = counter - - wg.Add(1) - go func(et uint32, cnt *int64) { - defer wg.Done() - SubscribeTo(d, et, func(ev MyEvent3) { - atomic.AddInt64(cnt, 1) - }) - }(eventType, counter) - } - - wg.Wait() - - // Publish events to all types - for eventType := uint32(100); eventType < uint32(100+numGoroutines); eventType++ { - Publish(d, MyEvent3{ID: int(eventType)}) - } - - // Small delay to ensure all handlers have executed - time.Sleep(10 * time.Millisecond) - - // Verify all event types received their events - for eventType, counter := range receivedEvents { - assert.Equal(t, int64(1), atomic.LoadInt64(counter), - "Event type %d did not receive its event", eventType) - } - }) -} - -func TestBackpressure(t *testing.T) { - d := NewDispatcher() - d.maxQueue = 10 - - var processedCount int64 - unsub := SubscribeTo(d, uint32(0x200), func(ev MyEvent3) { - atomic.AddInt64(&processedCount, 1) - }) - defer unsub() - - const eventsToPublish = 1000 - for i := 0; i < eventsToPublish; i++ { - Publish(d, MyEvent3{ID: 0x200}) - } - - time.Sleep(100 * time.Millisecond) - - // Verify all events were eventually processed - finalProcessed := atomic.LoadInt64(&processedCount) - assert.Equal(t, int64(eventsToPublish), finalProcessed) - t.Logf("Events processed: %d/%d", finalProcessed, eventsToPublish) -} - -// ------------------------------------- Test Events ------------------------------------- - -const ( - TypeEvent1 = 0x1 - TypeEvent2 = 0x2 -) - -type MyEvent1 struct { - Number int -} - -func (t MyEvent1) Type() uint32 { return TypeEvent1 } - -type MyEvent2 struct { - Text string -} - -func (t MyEvent2) Type() uint32 { return TypeEvent2 } - -type MyEvent3 struct { - ID int -} - -func (t MyEvent3) Type() uint32 { return uint32(t.ID) } +// Copyright (c) Roman Atachiants and contributore. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for detaile. + +package event + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPublish(t *testing.T) { + d := NewDispatcher() + var wg sync.WaitGroup + + // Subscribe, must be received in order + var count int64 + defer Subscribe(d, func(ev MyEvent1) { + assert.Equal(t, int(atomic.AddInt64(&count, 1)), ev.Number) + wg.Done() + })() + + // Publish + wg.Add(3) + Publish(d, MyEvent1{Number: 1}) + Publish(d, MyEvent1{Number: 2}) + Publish(d, MyEvent1{Number: 3}) + + // Wait and check + wg.Wait() + assert.Equal(t, int64(3), count) +} + +func TestUnsubscribe(t *testing.T) { + d := NewDispatcher() + assert.Equal(t, 0, d.count(TypeEvent1)) + unsubscribe := Subscribe(d, func(ev MyEvent1) { + // Nothing + }) + + assert.Equal(t, 1, d.count(TypeEvent1)) + unsubscribe() + assert.Equal(t, 0, d.count(TypeEvent1)) +} + +func TestConcurrent(t *testing.T) { + const max = 1000000 + var count int64 + var wg sync.WaitGroup + wg.Add(1) + + d := NewDispatcher() + defer Subscribe(d, func(ev MyEvent1) { + if current := atomic.AddInt64(&count, 1); current == max { + wg.Done() + } + })() + + // Asynchronously publish + go func() { + for i := 0; i < max; i++ { + Publish(d, MyEvent1{}) + } + }() + + defer Subscribe(d, func(ev MyEvent1) { + // Subscriber that does nothing + })() + + wg.Wait() + assert.Equal(t, max, int(count)) +} + +func TestSubscribeDifferentType(t *testing.T) { + d := NewDispatcher() + assert.Panics(t, func() { + SubscribeTo(d, TypeEvent1, func(ev MyEvent1) {}) + SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {}) + }) +} + +func TestPublishDifferentType(t *testing.T) { + d := NewDispatcher() + assert.Panics(t, func() { + SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {}) + Publish(d, MyEvent1{}) + }) +} + +func TestCloseDispatcher(t *testing.T) { + d := NewDispatcher() + defer SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})() + + assert.NoError(t, d.Close()) + assert.Panics(t, func() { + SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {}) + }) +} + +func TestMatrix(t *testing.T) { + const amount = 1000 + for _, subs := range []int{1, 10, 100} { + for _, topics := range []int{1, 10} { + expected := subs * topics * amount + t.Run(fmt.Sprintf("%dx%d", topics, subs), func(t *testing.T) { + var count atomic.Int64 + var wg sync.WaitGroup + wg.Add(expected) + + d := NewDispatcher() + for i := 0; i < subs; i++ { + for id := 0; id < topics; id++ { + defer SubscribeTo(d, uint32(id), func(ev MyEvent3) { + count.Add(1) + wg.Done() + })() + } + } + + for n := 0; n < amount; n++ { + for id := 0; id < topics; id++ { + go Publish(d, MyEvent3{ID: id}) + } + } + + wg.Wait() + assert.Equal(t, expected, int(count.Load())) + }) + } + } +} + +func TestConcurrentSubscriptionRace(t *testing.T) { + // This test specifically targets the race condition that occurs when multiple + // goroutines try to subscribe to different event types simultaneously. + // Without the CAS loop, subscriptions could be lost due to registry corruption. + + const numGoroutines = 100 + const numEventTypes = 50 + + d := NewDispatcher() + defer d.Close() + + var wg sync.WaitGroup + var receivedCount int64 + var subscribedTypes sync.Map // Thread-safe map + + wg.Add(numGoroutines) + + // Start multiple goroutines that subscribe to different event types concurrently + for i := 0; i < numGoroutines; i++ { + go func(goroutineID int) { + defer wg.Done() + + // Each goroutine subscribes to a unique event type + eventType := uint32(goroutineID%numEventTypes + 1000) // Offset to avoid collision with other tests + + // Subscribe to the event type + SubscribeTo(d, eventType, func(ev MyEvent3) { + atomic.AddInt64(&receivedCount, 1) + }) + + // Record that this type was subscribed + subscribedTypes.Store(eventType, true) + }(i) + } + + // Wait for all subscriptions to complete + wg.Wait() + + // Count the number of unique event types subscribed + expectedTypes := 0 + subscribedTypes.Range(func(key, value interface{}) bool { + expectedTypes++ + return true + }) + + // Small delay to ensure all subscriptions are fully processed + time.Sleep(10 * time.Millisecond) + + // Publish events to each subscribed type + subscribedTypes.Range(func(key, value interface{}) bool { + eventType := key.(uint32) + Publish(d, MyEvent3{ID: int(eventType)}) + return true + }) + + // Wait for all events to be processed + time.Sleep(50 * time.Millisecond) + + // Verify that we received at least the expected number of events + // (there might be more if multiple goroutines subscribed to the same event type) + received := atomic.LoadInt64(&receivedCount) + assert.GreaterOrEqual(t, int(received), expectedTypes, + "Should have received at least %d events, got %d", expectedTypes, received) + + // Verify that we have the expected number of unique event types + assert.Equal(t, numEventTypes, expectedTypes, + "Should have exactly %d unique event types", numEventTypes) +} + +func TestConcurrentHandlerRegistration(t *testing.T) { + const numGoroutines = 100 + + // Test concurrent subscriptions to the same event type + t.Run("SameEventType", func(t *testing.T) { + d := NewDispatcher() + var handlerCount int64 + var wg sync.WaitGroup + + // Start multiple goroutines subscribing to the same event type (0x1) + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + SubscribeTo(d, uint32(0x1), func(ev MyEvent1) { + atomic.AddInt64(&handlerCount, 1) + }) + }() + } + + wg.Wait() + + // Verify all handlers were registered by publishing an event + atomic.StoreInt64(&handlerCount, 0) + Publish(d, MyEvent1{}) + + // Small delay to ensure all handlers have executed + time.Sleep(10 * time.Millisecond) + + assert.Equal(t, int64(numGoroutines), atomic.LoadInt64(&handlerCount), + "Not all handlers were registered due to race condition") + }) + + // Test concurrent subscriptions to different event types + t.Run("DifferentEventTypes", func(t *testing.T) { + d := NewDispatcher() + var wg sync.WaitGroup + receivedEvents := make(map[uint32]*int64) + + // Create multiple event types and subscribe concurrently + for i := 0; i < numGoroutines; i++ { + eventType := uint32(100 + i) + counter := new(int64) + receivedEvents[eventType] = counter + + wg.Add(1) + go func(et uint32, cnt *int64) { + defer wg.Done() + SubscribeTo(d, et, func(ev MyEvent3) { + atomic.AddInt64(cnt, 1) + }) + }(eventType, counter) + } + + wg.Wait() + + // Publish events to all types + for eventType := uint32(100); eventType < uint32(100+numGoroutines); eventType++ { + Publish(d, MyEvent3{ID: int(eventType)}) + } + + // Small delay to ensure all handlers have executed + time.Sleep(10 * time.Millisecond) + + // Verify all event types received their events + for eventType, counter := range receivedEvents { + assert.Equal(t, int64(1), atomic.LoadInt64(counter), + "Event type %d did not receive its event", eventType) + } + }) +} + +func TestBackpressure(t *testing.T) { + d := NewDispatcher() + d.maxQueue = 10 + + var processedCount int64 + unsub := SubscribeTo(d, uint32(0x200), func(ev MyEvent3) { + atomic.AddInt64(&processedCount, 1) + }) + defer unsub() + + const eventsToPublish = 1000 + for i := 0; i < eventsToPublish; i++ { + Publish(d, MyEvent3{ID: 0x200}) + } + + time.Sleep(100 * time.Millisecond) + + // Verify all events were eventually processed + finalProcessed := atomic.LoadInt64(&processedCount) + assert.Equal(t, int64(eventsToPublish), finalProcessed) + t.Logf("Events processed: %d/%d", finalProcessed, eventsToPublish) +} + +// ------------------------------------- Test Events ------------------------------------- + +const ( + TypeEvent1 = 0x1 + TypeEvent2 = 0x2 +) + +type MyEvent1 struct { + Number int +} + +func (t MyEvent1) Type() uint32 { return TypeEvent1 } + +type MyEvent2 struct { + Text string +} + +func (t MyEvent2) Type() uint32 { return TypeEvent2 } + +type MyEvent3 struct { + ID int +} + +func (t MyEvent3) Type() uint32 { return uint32(t.ID) } diff --git a/proxy/metrics_monitor.go b/proxy/metrics_monitor.go index 283c16623..0e0a04cf3 100644 --- a/proxy/metrics_monitor.go +++ b/proxy/metrics_monitor.go @@ -15,6 +15,7 @@ import ( "github.com/gin-gonic/gin" "github.com/klauspost/compress/zstd" "github.com/mostlygeek/llama-swap/event" + "github.com/mostlygeek/llama-swap/proxy/config" "github.com/tidwall/gjson" ) @@ -95,6 +96,7 @@ func (e TokenMetricsEvent) Type() uint32 { // metricsMonitor parses llama-server output for token statistics type metricsMonitor struct { + config config.Config mu sync.RWMutex metrics []TokenMetrics maxMetrics int @@ -111,8 +113,9 @@ type metricsMonitor struct { // newMetricsMonitor creates a new metricsMonitor. captureBufferMB is the // capture buffer size in megabytes; 0 disables captures. -func newMetricsMonitor(logger *LogMonitor, maxMetrics int, captureBufferMB int) *metricsMonitor { +func newMetricsMonitor(cfg config.Config, logger *LogMonitor, maxMetrics int, captureBufferMB int) *metricsMonitor { return &metricsMonitor{ + config: cfg, logger: logger, maxMetrics: maxMetrics, enableCaptures: captureBufferMB > 0, @@ -130,6 +133,10 @@ func (mp *metricsMonitor) addMetrics(metric TokenMetrics) int { defer mp.mu.Unlock() metric.ID = mp.nextID + // Resolve modelID to display name (first alias or modelID itself) + if modelConfig, exists := mp.config.Models[metric.Model]; exists && len(modelConfig.Aliases) > 0 { + metric.Model = modelConfig.Aliases[0] + } mp.nextID++ mp.metrics = append(mp.metrics, metric) if len(mp.metrics) > mp.maxMetrics { @@ -271,6 +278,9 @@ func (mp *metricsMonitor) wrapHandler( request.Header.Set("Accept-Encoding", filterAcceptEncoding(ae)) } + // Capture wall clock time before proxying the request + requestStart := time.Now() + if err := next(modelID, recorder, request); err != nil { return err } @@ -287,7 +297,7 @@ func (mp *metricsMonitor) wrapHandler( tm := TokenMetrics{ Timestamp: time.Now(), Model: modelID, - DurationMs: int(time.Since(recorder.StartTime()).Milliseconds()), + DurationMs: int(time.Since(requestStart).Milliseconds()), } body := recorder.body.Bytes() @@ -308,7 +318,7 @@ func (mp *metricsMonitor) wrapHandler( } } if strings.Contains(recorder.Header().Get("Content-Type"), "text/event-stream") { - if parsed, err := processStreamingResponse(modelID, recorder.StartTime(), body); err != nil { + if parsed, err := processStreamingResponse(modelID, requestStart, body); err != nil { mp.logger.Warnf("error processing streaming response: %v, path=%s, recording minimal metrics", err, request.URL.Path) } else { tm = parsed @@ -328,7 +338,7 @@ func (mp *metricsMonitor) wrapHandler( } if usage.Exists() || timings.Exists() { - if parsedMetrics, err := parseMetrics(modelID, recorder.StartTime(), usage, timings); err != nil { + if parsedMetrics, err := parseMetrics(modelID, requestStart, usage, timings); err != nil { mp.logger.Warnf("error parsing metrics: %v, path=%s, recording minimal metrics", err, request.URL.Path) } else { tm = parsedMetrics @@ -481,6 +491,17 @@ func parseMetrics(modelID string, start time.Time, usage, timings gjson.Result) } } + // Fallback: estimate speeds from wall clock when timings unavailable (e.g., vLLM) + if !timings.Exists() && wallDurationMs > 0 { + durationSec := float64(wallDurationMs) / 1000.0 + if inputTokens > 0 { + promptPerSecond = float64(inputTokens) / durationSec + } + if outputTokens > 0 { + tokensPerSecond = float64(outputTokens) / durationSec + } + } + return TokenMetrics{ Timestamp: time.Now(), Model: modelID, diff --git a/proxy/metrics_monitor_test.go b/proxy/metrics_monitor_test.go index 48372d9e2..223ff937d 100644 --- a/proxy/metrics_monitor_test.go +++ b/proxy/metrics_monitor_test.go @@ -14,13 +14,14 @@ import ( "github.com/gin-gonic/gin" "github.com/mostlygeek/llama-swap/event" + "github.com/mostlygeek/llama-swap/proxy/config" "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" ) func TestMetricsMonitor_AddMetrics(t *testing.T) { t.Run("adds metrics and assigns ID", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) metric := TokenMetrics{ Model: "test-model", @@ -39,7 +40,7 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { }) t.Run("increments ID for each metric", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) for i := 0; i < 5; i++ { mm.addMetrics(TokenMetrics{Model: "model"}) @@ -53,7 +54,7 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { }) t.Run("respects max metrics limit", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 3, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 3, 0) // Add 5 metrics for i := 0; i < 5; i++ { @@ -73,7 +74,7 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { }) t.Run("emits TokenMetricsEvent", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) receivedEvent := make(chan TokenMetricsEvent, 1) cancel := event.On(func(e TokenMetricsEvent) { @@ -103,14 +104,14 @@ func TestMetricsMonitor_AddMetrics(t *testing.T) { func TestMetricsMonitor_GetMetrics(t *testing.T) { t.Run("returns empty slice when no metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) metrics := mm.getMetrics() assert.NotNil(t, metrics) assert.Equal(t, 0, len(metrics)) }) t.Run("returns copy of metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) mm.addMetrics(TokenMetrics{Model: "model1"}) mm.addMetrics(TokenMetrics{Model: "model2"}) @@ -130,7 +131,7 @@ func TestMetricsMonitor_GetMetrics(t *testing.T) { func TestMetricsMonitor_GetMetricsJSON(t *testing.T) { t.Run("returns valid JSON for empty metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) jsonData, err := mm.getMetricsJSON() assert.NoError(t, err) assert.NotNil(t, jsonData) @@ -142,7 +143,7 @@ func TestMetricsMonitor_GetMetricsJSON(t *testing.T) { }) t.Run("returns valid JSON with metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) mm.addMetrics(TokenMetrics{ Model: "model1", InputTokens: 100, @@ -170,7 +171,7 @@ func TestMetricsMonitor_GetMetricsJSON(t *testing.T) { func TestMetricsMonitor_WrapHandler(t *testing.T) { t.Run("successful non-streaming request with usage data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{ "usage": { @@ -201,7 +202,7 @@ func TestMetricsMonitor_WrapHandler(t *testing.T) { }) t.Run("successful request with timings data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{ "timings": { @@ -241,7 +242,7 @@ func TestMetricsMonitor_WrapHandler(t *testing.T) { }) t.Run("streaming request with SSE format", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Note: SSE format requires proper line breaks - each data line followed by blank line responseBody := `data: {"choices":[{"text":"Hello"}]} @@ -277,7 +278,7 @@ data: [DONE] }) t.Run("non-OK status code does not record metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { w.WriteHeader(http.StatusBadRequest) @@ -297,7 +298,7 @@ data: [DONE] }) t.Run("empty response body records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { w.WriteHeader(http.StatusOK) @@ -319,7 +320,7 @@ data: [DONE] }) t.Run("invalid JSON records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { w.Header().Set("Content-Type", "application/json") @@ -343,7 +344,7 @@ data: [DONE] }) t.Run("next handler error is propagated", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) expectedErr := assert.AnError nextHandler := func(modelID string, w http.ResponseWriter, r *http.Request) error { @@ -362,7 +363,7 @@ data: [DONE] }) t.Run("response without usage or timings records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{"result": "ok"}` @@ -388,7 +389,7 @@ data: [DONE] }) t.Run("infill request extracts timings from last array element", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Infill response is an array with timings in the last element responseBody := `[ @@ -431,7 +432,7 @@ data: [DONE] }) t.Run("infill request with empty array records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `[]` @@ -508,7 +509,7 @@ func TestMetricsMonitor_ResponseBodyCopier(t *testing.T) { func TestMetricsMonitor_Concurrent(t *testing.T) { t.Run("concurrent addMetrics is safe", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 1000, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 1000, 0) var wg sync.WaitGroup numGoroutines := 10 @@ -535,7 +536,7 @@ func TestMetricsMonitor_Concurrent(t *testing.T) { }) t.Run("concurrent reads and writes are safe", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 100, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 100, 0) done := make(chan bool) @@ -594,7 +595,7 @@ func TestMetricsMonitor_ParseMetrics(t *testing.T) { }) t.Run("prefers timings over usage data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Timings should take precedence over usage responseBody := `{ @@ -634,7 +635,7 @@ func TestMetricsMonitor_ParseMetrics(t *testing.T) { }) t.Run("handles missing cache_n in timings", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{ "timings": { @@ -669,7 +670,7 @@ func TestMetricsMonitor_ParseMetrics(t *testing.T) { func TestMetricsMonitor_StreamingResponse(t *testing.T) { t.Run("finds metrics in last valid SSE data", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Metrics should be found in the last data line before [DONE] responseBody := `data: {"choices":[{"text":"First"}]} @@ -703,7 +704,7 @@ data: [DONE] }) t.Run("handles streaming with no valid JSON records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `data: not json @@ -733,7 +734,7 @@ data: [DONE] }) t.Run("v1/responses format with nested response.usage", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // v1/responses SSE format: usage is nested under response.usage responseBody := "event: response.completed\n" + @@ -762,7 +763,7 @@ data: [DONE] }) t.Run("handles empty streaming response records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `` @@ -790,7 +791,7 @@ data: [DONE] // Benchmark tests func BenchmarkMetricsMonitor_AddMetrics(b *testing.B) { - mm := newMetricsMonitor(testLogger, 1000, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 1000, 0) metric := TokenMetrics{ Model: "test-model", @@ -811,7 +812,7 @@ func BenchmarkMetricsMonitor_AddMetrics(b *testing.B) { func BenchmarkMetricsMonitor_AddMetrics_SmallBuffer(b *testing.B) { // Test performance with a smaller buffer where wrapping occurs more frequently - mm := newMetricsMonitor(testLogger, 100, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 100, 0) metric := TokenMetrics{ Model: "test-model", @@ -832,7 +833,7 @@ func BenchmarkMetricsMonitor_AddMetrics_SmallBuffer(b *testing.B) { func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { t.Run("gzip encoded response", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{"usage": {"prompt_tokens": 100, "completion_tokens": 50}}` @@ -866,7 +867,7 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { }) t.Run("deflate encoded response", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{"usage": {"prompt_tokens": 200, "completion_tokens": 75}}` @@ -900,7 +901,7 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { }) t.Run("invalid gzip data records minimal metrics", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) // Invalid compressed data invalidData := []byte("this is not gzip data") @@ -928,7 +929,7 @@ func TestMetricsMonitor_WrapHandler_Compression(t *testing.T) { }) t.Run("unknown encoding treated as uncompressed", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) responseBody := `{"usage": {"prompt_tokens": 300, "completion_tokens": 100}}` @@ -980,7 +981,7 @@ func TestReqRespCapture_CompressedSize(t *testing.T) { func TestMetricsMonitor_AddCapture(t *testing.T) { t.Run("does nothing when captures disabled", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) capture := ReqRespCapture{ ID: 0, @@ -993,7 +994,7 @@ func TestMetricsMonitor_AddCapture(t *testing.T) { }) t.Run("adds capture when enabled", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) capture := ReqRespCapture{ ID: 0, @@ -1014,7 +1015,7 @@ func TestMetricsMonitor_AddCapture(t *testing.T) { }) t.Run("evicts oldest when exceeding max size", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) // Each full ReqRespCapture with 80 bytes random data compresses to ~185 bytes. // 2 captures = ~370 bytes, 3 captures = ~555 bytes. Set limit so only 2 fit. mm.maxCaptureSize = 450 @@ -1041,7 +1042,7 @@ func TestMetricsMonitor_AddCapture(t *testing.T) { }) t.Run("skips capture larger than max size", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) mm.maxCaptureSize = 100 // Use random data that doesn't compress well to create an oversized capture @@ -1056,13 +1057,13 @@ func TestMetricsMonitor_AddCapture(t *testing.T) { func TestMetricsMonitor_GetCaptureByID(t *testing.T) { t.Run("returns nil for non-existent ID", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) assert.Nil(t, mm.getCaptureByID(999, false)) }) t.Run("returns decompressed capture by ID", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) capture := ReqRespCapture{ ID: 42, @@ -1083,7 +1084,7 @@ func TestMetricsMonitor_GetCaptureByID(t *testing.T) { }) t.Run("returns compressed bytes when decompress=false", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) capture := ReqRespCapture{ ID: 42, @@ -1145,7 +1146,7 @@ func TestRedactHeaders(t *testing.T) { func TestMetricsMonitor_WrapHandler_Capture(t *testing.T) { t.Run("captures request and response when enabled", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 5) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 5) requestBody := `{"model": "test", "prompt": "hello"}` responseBody := `{"usage": {"prompt_tokens": 100, "completion_tokens": 50}}` @@ -1190,7 +1191,7 @@ func TestMetricsMonitor_WrapHandler_Capture(t *testing.T) { }) t.Run("does not capture when disabled", func(t *testing.T) { - mm := newMetricsMonitor(testLogger, 10, 0) + mm := newMetricsMonitor(config.Config{}, testLogger, 10, 0) requestBody := `{"model": "test"}` responseBody := `{"usage": {"prompt_tokens": 100, "completion_tokens": 50}}` diff --git a/proxy/proxymanager.go b/proxy/proxymanager.go index ee1d34849..f16aa5a80 100644 --- a/proxy/proxymanager.go +++ b/proxy/proxymanager.go @@ -190,7 +190,7 @@ func New(proxyConfig config.Config) *ProxyManager { muxLogger: muxLogger, upstreamLogger: upstreamLogger, - metricsMonitor: newMetricsMonitor(proxyLogger, maxMetrics, proxyConfig.CaptureBuffer), + metricsMonitor: newMetricsMonitor(proxyConfig, proxyLogger, maxMetrics, proxyConfig.CaptureBuffer), processGroups: make(map[string]*ProcessGroup),