From b80ae3e63392943a187db59338b99fc841604cad Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Sun, 14 Dec 2025 10:33:00 -0800 Subject: [PATCH 1/5] proxy,ai-plans: add baseline benchmark and plans --- ai-plans/efficient-ring-buffer.md | 85 +++++++++++++++++++++++++++++++ proxy/logMonitor_test.go | 66 ++++++++++++++++++++++++ 2 files changed, 151 insertions(+) create mode 100644 ai-plans/efficient-ring-buffer.md diff --git a/ai-plans/efficient-ring-buffer.md b/ai-plans/efficient-ring-buffer.md new file mode 100644 index 00000000..dd66abdf --- /dev/null +++ b/ai-plans/efficient-ring-buffer.md @@ -0,0 +1,85 @@ +# Replace ring.Ring with Efficient Circular Byte Buffer + +## Overview + +Replace the inefficient `container/ring.Ring` implementation in `logMonitor.go` with a simple circular byte buffer that uses a single contiguous `[]byte` slice. This eliminates per-write allocations, improves cache locality, and correctly implements a 10KB buffer. + +## Current Issues + +1. `ring.New(10 * 1024)` creates 10,240 ring **elements**, not 10KB of storage +2. Every `Write()` call allocates a new `[]byte` slice inside the lock +3. `GetHistory()` iterates all 10,240 elements and appends repeatedly (geometric reallocs) +4. Linked list structure has poor cache locality and pointer overhead + +## Design Requirements + +### New CircularBuffer Type + +Create a simple circular byte buffer with: +- Single pre-allocated `[]byte` of fixed capacity (10KB) +- `head` and `size` integers to track write position and data length +- No per-write allocations + +### API Requirements + +The new buffer must support: +1. **Write(p []byte)** - Append bytes, overwriting oldest data when full +2. **GetHistory() []byte** - Return all buffered data in correct order (oldest to newest) + +### Implementation Details + +```go +type circularBuffer struct { + data []byte // pre-allocated capacity + head int // next write position + size int // current number of bytes stored (0 to cap) +} +``` + +**Write logic:** +- If `len(p) >= capacity`: just keep the last `capacity` bytes +- Otherwise: write bytes at `head`, wrapping around if needed +- Update `head` and `size` accordingly +- Data is copied into the internal buffer (not stored by reference) + +**GetHistory logic:** +- Calculate start position: `(head - size + cap) % cap` +- If not wrapped: single slice copy +- If wrapped: two copies (end of buffer + beginning) +- Returns a **new slice** (copy), not a view into internal buffer + +### Immutability Guarantees (must preserve) + +Per existing tests: +1. Modifying input `[]byte` after `Write()` must not affect stored data +2. `GetHistory()` returns independent copy - modifications don't affect buffer + +## Files to Modify + +- `proxy/logMonitor.go` - Replace `buffer *ring.Ring` with new circular buffer + +## Testing Plan + +Existing tests in `logMonitor_test.go` should continue to pass: +- `TestLogMonitor` - Basic write/read and subscriber notification +- `TestWrite_ImmutableBuffer` - Verify writes don't affect returned history +- `TestWrite_LogTimeFormat` - Timestamp formatting + +Add new tests: +- Test buffer wrap-around behavior +- Test large writes that exceed buffer capacity +- Test exact capacity boundary conditions + +## Checklist + +- [ ] Create `circularBuffer` struct in `logMonitor.go` +- [ ] Implement `Write()` method for circular buffer +- [ ] Implement `GetHistory()` method for circular buffer +- [ ] Update `LogMonitor` struct to use new buffer +- [ ] Update `NewLogMonitorWriter()` to initialize new buffer +- [ ] Update `LogMonitor.Write()` to use new buffer +- [ ] Update `LogMonitor.GetHistory()` to use new buffer +- [ ] Remove `"container/ring"` import +- [ ] Run `make test-dev` to verify existing tests pass +- [ ] Add wrap-around test case +- [ ] Run `make test-all` for final validation diff --git a/proxy/logMonitor_test.go b/proxy/logMonitor_test.go index a25b637b..67c2fe9b 100644 --- a/proxy/logMonitor_test.go +++ b/proxy/logMonitor_test.go @@ -113,3 +113,69 @@ func TestWrite_LogTimeFormat(t *testing.T) { t.Fatalf("Cannot find timestamp: %v", err) } } + +func BenchmarkLogMonitorWrite(b *testing.B) { + // Test data of varying sizes + smallMsg := []byte("small message\n") + mediumMsg := []byte(strings.Repeat("medium message content ", 10) + "\n") + largeMsg := []byte(strings.Repeat("large message content for benchmarking ", 100) + "\n") + + b.Run("SmallWrite", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(smallMsg) + } + }) + + b.Run("MediumWrite", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(mediumMsg) + } + }) + + b.Run("LargeWrite", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(largeMsg) + } + }) + + b.Run("WithSubscribers", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + // Add some subscribers + for i := 0; i < 5; i++ { + lm.OnLogData(func(data []byte) {}) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(mediumMsg) + } + }) + + b.Run("GetHistory", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + // Pre-populate with data + for i := 0; i < 1000; i++ { + lm.Write(mediumMsg) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.GetHistory() + } + }) +} + +/* +Baseline +| Benchmark | ops/sec | ns/op | bytes/op | allocs/op | +|-----------|---------|-------|----------|-----------| +| SmallWrite (14B) | ~27M | 43 ns | 40 B | 2 | +| MediumWrite (241B) | ~16M | 76 ns | 264 B | 2 | +| LargeWrite (4KB) | ~2.3M | 504 ns | 4120 B | 2 | +| WithSubscribers (5 subs) | ~3.3M | 355 ns | 264 B | 2 | +| GetHistory (after 1000 writes) | ~8K | 145 µs | 1.2 MB | 22 | +*/ From e2a47d494978d841c41de5a6c2eefd22e868f28b Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Sun, 14 Dec 2025 10:33:50 -0800 Subject: [PATCH 2/5] ai-plans: rename plan --- ...ficient-ring-buffer.md => 2025-12-14-efficient-ring-buffer.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename ai-plans/{efficient-ring-buffer.md => 2025-12-14-efficient-ring-buffer.md} (100%) diff --git a/ai-plans/efficient-ring-buffer.md b/ai-plans/2025-12-14-efficient-ring-buffer.md similarity index 100% rename from ai-plans/efficient-ring-buffer.md rename to ai-plans/2025-12-14-efficient-ring-buffer.md From c054c9e2aa0fcf7ba291361e706cec922f4b1129 Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Sun, 14 Dec 2025 10:46:02 -0800 Subject: [PATCH 3/5] proxy: replace ring.Ring with efficient circular byte buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace container/ring.Ring with a custom circularBuffer that uses a single contiguous []byte slice. This fixes the original implementation which created 10,240 ring elements instead of 10KB of storage. GetHistory is now 139x faster (145μs → 1μs) and uses 117x less memory (1.2MB → 10KB). Allocations reduced from 2 to 1 per write operation. ref #421 --- proxy/logMonitor.go | 103 ++++++++++++++++++++++++++++++++------- proxy/logMonitor_test.go | 83 ++++++++++++++++++++++++++++--- 2 files changed, 161 insertions(+), 25 deletions(-) diff --git a/proxy/logMonitor.go b/proxy/logMonitor.go index 9597c8a6..123ccbca 100644 --- a/proxy/logMonitor.go +++ b/proxy/logMonitor.go @@ -1,7 +1,6 @@ package proxy import ( - "container/ring" "context" "fmt" "io" @@ -12,6 +11,85 @@ import ( "github.com/mostlygeek/llama-swap/event" ) +// circularBuffer is a fixed-size circular byte buffer that overwrites +// oldest data when full. It provides O(1) writes and O(n) reads. +type circularBuffer struct { + data []byte // pre-allocated capacity + head int // next write position + size int // current number of bytes stored (0 to cap) +} + +func newCircularBuffer(capacity int) *circularBuffer { + return &circularBuffer{ + data: make([]byte, capacity), + head: 0, + size: 0, + } +} + +// Write appends bytes to the buffer, overwriting oldest data when full. +// Data is copied into the internal buffer (not stored by reference). +func (cb *circularBuffer) Write(p []byte) { + if len(p) == 0 { + return + } + + cap := len(cb.data) + + // If input is larger than capacity, only keep the last cap bytes + if len(p) >= cap { + copy(cb.data, p[len(p)-cap:]) + cb.head = 0 + cb.size = cap + return + } + + // Calculate how much space is available from head to end of buffer + firstPart := cap - cb.head + if firstPart >= len(p) { + // All data fits without wrapping + copy(cb.data[cb.head:], p) + cb.head = (cb.head + len(p)) % cap + } else { + // Data wraps around + copy(cb.data[cb.head:], p[:firstPart]) + copy(cb.data[:len(p)-firstPart], p[firstPart:]) + cb.head = len(p) - firstPart + } + + // Update size + cb.size += len(p) + if cb.size > cap { + cb.size = cap + } +} + +// GetHistory returns all buffered data in correct order (oldest to newest). +// Returns a new slice (copy), not a view into internal buffer. +func (cb *circularBuffer) GetHistory() []byte { + if cb.size == 0 { + return nil + } + + result := make([]byte, cb.size) + cap := len(cb.data) + + // Calculate start position (oldest data) + start := (cb.head - cb.size + cap) % cap + + if start+cb.size <= cap { + // Data is contiguous, single copy + copy(result, cb.data[start:start+cb.size]) + } else { + // Data wraps around, two copies + firstPart := cap - start + copy(result[:firstPart], cb.data[start:]) + copy(result[firstPart:], cb.data[:cb.size-firstPart]) + } + + return result +} + type LogLevel int const ( @@ -24,7 +102,7 @@ const ( type LogMonitor struct { eventbus *event.Dispatcher mu sync.RWMutex - buffer *ring.Ring + buffer *circularBuffer bufferMu sync.RWMutex // typically this can be os.Stdout @@ -45,7 +123,7 @@ func NewLogMonitor() *LogMonitor { func NewLogMonitorWriter(stdout io.Writer) *LogMonitor { return &LogMonitor{ eventbus: event.NewDispatcherConfig(1000), - buffer: ring.New(10 * 1024), // keep 10KB of buffered logs + buffer: newCircularBuffer(10 * 1024), // keep 10KB of buffered logs stdout: stdout, level: LevelInfo, prefix: "", @@ -64,12 +142,12 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) { } w.bufferMu.Lock() - bufferCopy := make([]byte, len(p)) - copy(bufferCopy, p) - w.buffer.Value = bufferCopy - w.buffer = w.buffer.Next() + w.buffer.Write(p) w.bufferMu.Unlock() + // Make a copy for broadcast to preserve immutability + bufferCopy := make([]byte, len(p)) + copy(bufferCopy, p) w.broadcast(bufferCopy) return n, nil } @@ -77,16 +155,7 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) { func (w *LogMonitor) GetHistory() []byte { w.bufferMu.RLock() defer w.bufferMu.RUnlock() - - var history []byte - w.buffer.Do(func(p any) { - if p != nil { - if content, ok := p.([]byte); ok { - history = append(history, content...) - } - } - }) - return history + return w.buffer.GetHistory() } func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc { diff --git a/proxy/logMonitor_test.go b/proxy/logMonitor_test.go index 67c2fe9b..771100f2 100644 --- a/proxy/logMonitor_test.go +++ b/proxy/logMonitor_test.go @@ -114,6 +114,57 @@ func TestWrite_LogTimeFormat(t *testing.T) { } } +func TestCircularBuffer_WrapAround(t *testing.T) { + // Create a small buffer to test wrap-around + cb := newCircularBuffer(10) + + // Write "hello" (5 bytes) + cb.Write([]byte("hello")) + if got := string(cb.GetHistory()); got != "hello" { + t.Errorf("Expected 'hello', got %q", got) + } + + // Write "world" (5 bytes) - buffer now full + cb.Write([]byte("world")) + if got := string(cb.GetHistory()); got != "helloworld" { + t.Errorf("Expected 'helloworld', got %q", got) + } + + // Write "12345" (5 bytes) - should overwrite "hello" + cb.Write([]byte("12345")) + if got := string(cb.GetHistory()); got != "world12345" { + t.Errorf("Expected 'world12345', got %q", got) + } + + // Write data larger than buffer capacity + cb.Write([]byte("abcdefghijklmnop")) // 16 bytes, only last 10 kept + if got := string(cb.GetHistory()); got != "ghijklmnop" { + t.Errorf("Expected 'ghijklmnop', got %q", got) + } +} + +func TestCircularBuffer_BoundaryConditions(t *testing.T) { + // Test empty buffer + cb := newCircularBuffer(10) + if got := cb.GetHistory(); got != nil { + t.Errorf("Expected nil for empty buffer, got %q", got) + } + + // Test exact capacity + cb.Write([]byte("1234567890")) + if got := string(cb.GetHistory()); got != "1234567890" { + t.Errorf("Expected '1234567890', got %q", got) + } + + // Test write exactly at capacity boundary + cb = newCircularBuffer(10) + cb.Write([]byte("12345")) + cb.Write([]byte("67890")) + if got := string(cb.GetHistory()); got != "1234567890" { + t.Errorf("Expected '1234567890', got %q", got) + } +} + func BenchmarkLogMonitorWrite(b *testing.B) { // Test data of varying sizes smallMsg := []byte("small message\n") @@ -170,12 +221,28 @@ func BenchmarkLogMonitorWrite(b *testing.B) { } /* -Baseline -| Benchmark | ops/sec | ns/op | bytes/op | allocs/op | -|-----------|---------|-------|----------|-----------| -| SmallWrite (14B) | ~27M | 43 ns | 40 B | 2 | -| MediumWrite (241B) | ~16M | 76 ns | 264 B | 2 | -| LargeWrite (4KB) | ~2.3M | 504 ns | 4120 B | 2 | -| WithSubscribers (5 subs) | ~3.3M | 355 ns | 264 B | 2 | -| GetHistory (after 1000 writes) | ~8K | 145 µs | 1.2 MB | 22 | +Benchmark Results - MBP M1 Pro + +Before (ring.Ring): +| Benchmark | ns/op | bytes/op | allocs/op | +|---------------------------------|------------|----------|-----------| +| SmallWrite (14B) | 43 ns | 40 B | 2 | +| MediumWrite (241B) | 76 ns | 264 B | 2 | +| LargeWrite (4KB) | 504 ns | 4,120 B | 2 | +| WithSubscribers (5 subs) | 355 ns | 264 B | 2 | +| GetHistory (after 1000 writes) | 145,000 ns | 1.2 MB | 22 | + +After (circularBuffer): +| Benchmark | ns/op | bytes/op | allocs/op | +|---------------------------------|------------|----------|-----------| +| SmallWrite (14B) | 26 ns | 16 B | 1 | +| MediumWrite (241B) | 67 ns | 240 B | 1 | +| LargeWrite (4KB) | 774 ns | 4,096 B | 1 | +| WithSubscribers (5 subs) | 325 ns | 240 B | 1 | +| GetHistory (after 1000 writes) | 1,042 ns | 10,240 B | 1 | + +Summary: +- GetHistory: 139x faster, 117x less memory +- Allocations: reduced from 2 to 1 across all operations +- Small/medium writes: ~1.1-1.6x faster */ From d38b898113caee13eda31079162071fea14a49f5 Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Sun, 14 Dec 2025 10:54:56 -0800 Subject: [PATCH 4/5] proxy: increase logMonitor to 100KB of history --- proxy/logMonitor.go | 4 +++- proxy/logMonitor_test.go | 13 +++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/proxy/logMonitor.go b/proxy/logMonitor.go index 123ccbca..5fe1f702 100644 --- a/proxy/logMonitor.go +++ b/proxy/logMonitor.go @@ -97,6 +97,8 @@ const ( LevelInfo LevelWarn LevelError + + LogBufferSize = 100 * 1024 ) type LogMonitor struct { @@ -123,7 +125,7 @@ func NewLogMonitor() *LogMonitor { func NewLogMonitorWriter(stdout io.Writer) *LogMonitor { return &LogMonitor{ eventbus: event.NewDispatcherConfig(1000), - buffer: newCircularBuffer(10 * 1024), // keep 10KB of buffered logs + buffer: newCircularBuffer(LogBufferSize), stdout: stdout, level: LevelInfo, prefix: "", diff --git a/proxy/logMonitor_test.go b/proxy/logMonitor_test.go index 771100f2..2ede99b4 100644 --- a/proxy/logMonitor_test.go +++ b/proxy/logMonitor_test.go @@ -232,7 +232,7 @@ Before (ring.Ring): | WithSubscribers (5 subs) | 355 ns | 264 B | 2 | | GetHistory (after 1000 writes) | 145,000 ns | 1.2 MB | 22 | -After (circularBuffer): +After (circularBuffer 10KB): | Benchmark | ns/op | bytes/op | allocs/op | |---------------------------------|------------|----------|-----------| | SmallWrite (14B) | 26 ns | 16 B | 1 | @@ -241,8 +241,17 @@ After (circularBuffer): | WithSubscribers (5 subs) | 325 ns | 240 B | 1 | | GetHistory (after 1000 writes) | 1,042 ns | 10,240 B | 1 | +After (circularBuffer 100KB): +| Benchmark | ns/op | bytes/op | allocs/op | +|---------------------------------|------------|-----------|-----------| +| SmallWrite (14B) | 26 ns | 16 B | 1 | +| MediumWrite (241B) | 66 ns | 240 B | 1 | +| LargeWrite (4KB) | 753 ns | 4,096 B | 1 | +| WithSubscribers (5 subs) | 309 ns | 240 B | 1 | +| GetHistory (after 1000 writes) | 7,788 ns | 106,496 B | 1 | + Summary: -- GetHistory: 139x faster, 117x less memory +- GetHistory: 139x faster (10KB), 18x faster (100KB) - Allocations: reduced from 2 to 1 across all operations - Small/medium writes: ~1.1-1.6x faster */ From fea99ab75625db1545cef9f905ace0099489a27a Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Thu, 18 Dec 2025 21:40:41 -0800 Subject: [PATCH 5/5] proxy: one logger per Process This update creates a LogMonitor per proxy.Process, replacing the usage of a shared one. The buffer in LogMonitor is lazy allocated on the first call to Write and freed when the Process is stopped. This reduces unncessary memory usage when a model is not active. The /logs/stream/{model_id} endpoint was added to stream logs from a specific process. --- README.md | 19 +++++----- proxy/logMonitor.go | 16 ++++++++- proxy/logMonitor_test.go | 59 +++++++++++++++++++++++++++++++ proxy/process.go | 9 ++++- proxy/processgroup.go | 10 +++++- proxy/proxymanager_loghandlers.go | 28 +++++++++------ 6 files changed, 119 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 1a729f97..09b80b9e 100644 --- a/README.md +++ b/README.md @@ -203,23 +203,26 @@ As a safeguard, llama-swap also sets `X-Accel-Buffering: no` on SSE responses. H ## Monitoring Logs on the CLI -```shell +```sh # sends up to the last 10KB of logs -curl http://host/logs' +$ curl http://host/logs # streams combined logs -curl -Ns 'http://host/logs/stream' +curl -Ns http://host/logs/stream + +# stream llama-swap's proxy status logs +curl -Ns http://host/logs/stream/proxy -# just llama-swap's logs -curl -Ns 'http://host/logs/stream/proxy' +# stream logs from upstream processes that llama-swap loads +curl -Ns http://host/logs/stream/upstream -# just upstream's logs -curl -Ns 'http://host/logs/stream/upstream' +# stream logs only from a specific model +curl -Ns http://host/logs/stream/{model_id} # stream and filter logs with linux pipes curl -Ns http://host/logs/stream | grep 'eval time' -# skips history and just streams new log entries +# appending ?no-history will disable sending buffered history first curl -Ns 'http://host/logs/stream?no-history' ``` diff --git a/proxy/logMonitor.go b/proxy/logMonitor.go index 5fe1f702..e440a6f1 100644 --- a/proxy/logMonitor.go +++ b/proxy/logMonitor.go @@ -125,7 +125,7 @@ func NewLogMonitor() *LogMonitor { func NewLogMonitorWriter(stdout io.Writer) *LogMonitor { return &LogMonitor{ eventbus: event.NewDispatcherConfig(1000), - buffer: newCircularBuffer(LogBufferSize), + buffer: nil, // lazy initialized on first Write stdout: stdout, level: LevelInfo, prefix: "", @@ -144,6 +144,9 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) { } w.bufferMu.Lock() + if w.buffer == nil { + w.buffer = newCircularBuffer(LogBufferSize) + } w.buffer.Write(p) w.bufferMu.Unlock() @@ -157,9 +160,20 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) { func (w *LogMonitor) GetHistory() []byte { w.bufferMu.RLock() defer w.bufferMu.RUnlock() + if w.buffer == nil { + return nil + } return w.buffer.GetHistory() } +// Clear releases the buffer memory, making it eligible for GC. +// The buffer will be lazily re-allocated on the next Write. +func (w *LogMonitor) Clear() { + w.bufferMu.Lock() + w.buffer = nil + w.bufferMu.Unlock() +} + func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc { return event.Subscribe(w.eventbus, func(e LogDataEvent) { callback(e.Data) diff --git a/proxy/logMonitor_test.go b/proxy/logMonitor_test.go index 2ede99b4..aff0d3e3 100644 --- a/proxy/logMonitor_test.go +++ b/proxy/logMonitor_test.go @@ -165,6 +165,65 @@ func TestCircularBuffer_BoundaryConditions(t *testing.T) { } } +func TestLogMonitor_LazyInit(t *testing.T) { + lm := NewLogMonitorWriter(io.Discard) + + // Buffer should be nil before any writes + if lm.buffer != nil { + t.Error("Expected buffer to be nil before first write") + } + + // GetHistory should return nil when buffer is nil + if got := lm.GetHistory(); got != nil { + t.Errorf("Expected nil history before first write, got %q", got) + } + + // Write should lazily initialize the buffer + lm.Write([]byte("test")) + + if lm.buffer == nil { + t.Error("Expected buffer to be initialized after write") + } + + if got := string(lm.GetHistory()); got != "test" { + t.Errorf("Expected 'test', got %q", got) + } +} + +func TestLogMonitor_Clear(t *testing.T) { + lm := NewLogMonitorWriter(io.Discard) + + // Write some data + lm.Write([]byte("hello")) + if got := string(lm.GetHistory()); got != "hello" { + t.Errorf("Expected 'hello', got %q", got) + } + + // Clear should release the buffer + lm.Clear() + + if lm.buffer != nil { + t.Error("Expected buffer to be nil after Clear") + } + + if got := lm.GetHistory(); got != nil { + t.Errorf("Expected nil history after Clear, got %q", got) + } +} + +func TestLogMonitor_ClearAndReuse(t *testing.T) { + lm := NewLogMonitorWriter(io.Discard) + + // Write, clear, then write again + lm.Write([]byte("first")) + lm.Clear() + lm.Write([]byte("second")) + + if got := string(lm.GetHistory()); got != "second" { + t.Errorf("Expected 'second' after clear and reuse, got %q", got) + } +} + func BenchmarkLogMonitorWrite(b *testing.B) { // Test data of varying sizes smallMsg := []byte("small message\n") diff --git a/proxy/process.go b/proxy/process.go index 640ba34a..41427059 100644 --- a/proxy/process.go +++ b/proxy/process.go @@ -414,6 +414,9 @@ func (p *Process) stopCommand() { stopStartTime := time.Now() defer func() { p.proxyLogger.Debugf("<%s> stopCommand took %v", p.ID, time.Since(stopStartTime)) + + // free the buffer in processLogger so the memory can be recovered + p.processLogger.Clear() }() p.cmdMutex.RLock() @@ -646,6 +649,11 @@ func (p *Process) cmdStopUpstreamProcess() error { return nil } +// Logger returns the logger for this process. +func (p *Process) Logger() *LogMonitor { + return p.processLogger +} + var loadingRemarks = []string{ "Still faster than your last standup meeting...", "Reticulating splines...", @@ -864,7 +872,6 @@ func (s *statusResponseWriter) WriteHeader(statusCode int) { s.Flush() } -// Add Flush method func (s *statusResponseWriter) Flush() { if flusher, ok := s.writer.(http.Flusher); ok { flusher.Flush() diff --git a/proxy/processgroup.go b/proxy/processgroup.go index e0b06008..b401d8a6 100644 --- a/proxy/processgroup.go +++ b/proxy/processgroup.go @@ -46,7 +46,8 @@ func NewProcessGroup(id string, config config.Config, proxyLogger *LogMonitor, u // Create a Process for each member in the group for _, modelID := range groupConfig.Members { modelConfig, modelID, _ := pg.config.FindConfig(modelID) - process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger) + processLogger := NewLogMonitorWriter(upstreamLogger) + process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, processLogger, pg.proxyLogger) pg.processes[modelID] = process } @@ -88,6 +89,13 @@ func (pg *ProcessGroup) HasMember(modelName string) bool { return slices.Contains(pg.config.Groups[pg.id].Members, modelName) } +func (pg *ProcessGroup) GetMember(modelName string) (*Process, bool) { + if pg.HasMember(modelName) { + return pg.processes[modelName], true + } + return nil, false +} + func (pg *ProcessGroup) StopProcess(modelID string, strategy StopStrategy) error { pg.Lock() diff --git a/proxy/proxymanager_loghandlers.go b/proxy/proxymanager_loghandlers.go index a3de806a..d4a59e88 100644 --- a/proxy/proxymanager_loghandlers.go +++ b/proxy/proxymanager_loghandlers.go @@ -83,18 +83,24 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) { // getLogger searches for the appropriate logger based on the logMonitorId func (pm *ProxyManager) getLogger(logMonitorId string) (*LogMonitor, error) { - var logger *LogMonitor - - if logMonitorId == "" { + switch logMonitorId { + case "": // maintain the default - logger = pm.muxLogger - } else if logMonitorId == "proxy" { - logger = pm.proxyLogger - } else if logMonitorId == "upstream" { - logger = pm.upstreamLogger - } else { + return pm.muxLogger, nil + case "proxy": + return pm.proxyLogger, nil + case "upstream": + return pm.upstreamLogger, nil + default: + // search for a models specific logger + if name, found := pm.config.RealModelName(logMonitorId); found { + for _, group := range pm.processGroups { + if process, found := group.GetMember(name); found { + return process.Logger(), nil + } + } + } + return nil, fmt.Errorf("invalid logger. Use 'proxy' or 'upstream'") } - - return logger, nil }