diff --git a/LICENSE.md b/LICENSE.md index 6dbacecf..85469145 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,6 +1,7 @@ MIT License Copyright (c) 2024 Benson Wong +Copyright (c) 2025 Aleksei Leshikhin Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/README.md b/README.md index 8e6c60a8..314f91f9 100644 --- a/README.md +++ b/README.md @@ -192,23 +192,26 @@ As a safeguard, llmsnap also sets `X-Accel-Buffering: no` on SSE responses. Howe ## Monitoring Logs on the CLI -```shell +```sh # sends up to the last 10KB of logs -curl http://host/logs' +$ curl http://host/logs # streams combined logs -curl -Ns 'http://host/logs/stream' +curl -Ns http://host/logs/stream + +# stream llmsnap's proxy status logs +curl -Ns http://host/logs/stream/proxy -# just llama-swap's logs -curl -Ns 'http://host/logs/stream/proxy' +# stream logs from upstream processes that llmsnap loads +curl -Ns http://host/logs/stream/upstream -# just upstream's logs -curl -Ns 'http://host/logs/stream/upstream' +# stream logs only from a specific model +curl -Ns http://host/logs/stream/{model_id} # stream and filter logs with linux pipes curl -Ns http://host/logs/stream | grep 'eval time' -# skips history and just streams new log entries +# appending ?no-history will disable sending buffered history first curl -Ns 'http://host/logs/stream?no-history' ``` diff --git a/ai-plans/2025-12-14-efficient-ring-buffer.md b/ai-plans/2025-12-14-efficient-ring-buffer.md new file mode 100644 index 00000000..dd66abdf --- /dev/null +++ b/ai-plans/2025-12-14-efficient-ring-buffer.md @@ -0,0 +1,85 @@ +# Replace ring.Ring with Efficient Circular Byte Buffer + +## Overview + +Replace the inefficient `container/ring.Ring` implementation in `logMonitor.go` with a simple circular byte buffer that uses a single contiguous `[]byte` slice. This eliminates per-write allocations, improves cache locality, and correctly implements a 10KB buffer. + +## Current Issues + +1. `ring.New(10 * 1024)` creates 10,240 ring **elements**, not 10KB of storage +2. Every `Write()` call allocates a new `[]byte` slice inside the lock +3. `GetHistory()` iterates all 10,240 elements and appends repeatedly (geometric reallocs) +4. Linked list structure has poor cache locality and pointer overhead + +## Design Requirements + +### New CircularBuffer Type + +Create a simple circular byte buffer with: +- Single pre-allocated `[]byte` of fixed capacity (10KB) +- `head` and `size` integers to track write position and data length +- No per-write allocations + +### API Requirements + +The new buffer must support: +1. **Write(p []byte)** - Append bytes, overwriting oldest data when full +2. **GetHistory() []byte** - Return all buffered data in correct order (oldest to newest) + +### Implementation Details + +```go +type circularBuffer struct { + data []byte // pre-allocated capacity + head int // next write position + size int // current number of bytes stored (0 to cap) +} +``` + +**Write logic:** +- If `len(p) >= capacity`: just keep the last `capacity` bytes +- Otherwise: write bytes at `head`, wrapping around if needed +- Update `head` and `size` accordingly +- Data is copied into the internal buffer (not stored by reference) + +**GetHistory logic:** +- Calculate start position: `(head - size + cap) % cap` +- If not wrapped: single slice copy +- If wrapped: two copies (end of buffer + beginning) +- Returns a **new slice** (copy), not a view into internal buffer + +### Immutability Guarantees (must preserve) + +Per existing tests: +1. Modifying input `[]byte` after `Write()` must not affect stored data +2. `GetHistory()` returns independent copy - modifications don't affect buffer + +## Files to Modify + +- `proxy/logMonitor.go` - Replace `buffer *ring.Ring` with new circular buffer + +## Testing Plan + +Existing tests in `logMonitor_test.go` should continue to pass: +- `TestLogMonitor` - Basic write/read and subscriber notification +- `TestWrite_ImmutableBuffer` - Verify writes don't affect returned history +- `TestWrite_LogTimeFormat` - Timestamp formatting + +Add new tests: +- Test buffer wrap-around behavior +- Test large writes that exceed buffer capacity +- Test exact capacity boundary conditions + +## Checklist + +- [ ] Create `circularBuffer` struct in `logMonitor.go` +- [ ] Implement `Write()` method for circular buffer +- [ ] Implement `GetHistory()` method for circular buffer +- [ ] Update `LogMonitor` struct to use new buffer +- [ ] Update `NewLogMonitorWriter()` to initialize new buffer +- [ ] Update `LogMonitor.Write()` to use new buffer +- [ ] Update `LogMonitor.GetHistory()` to use new buffer +- [ ] Remove `"container/ring"` import +- [ ] Run `make test-dev` to verify existing tests pass +- [ ] Add wrap-around test case +- [ ] Run `make test-all` for final validation diff --git a/config.example.yaml b/config.example.yaml index 80de384f..2d4871d5 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -46,6 +46,16 @@ logLevel: info # - For more info, read: https://pkg.go.dev/time#pkg-constants logTimeFormat: "" +# logToStdout: controls what is logged to stdout +# - optional, default: "proxy" +# - valid values: +# - "proxy": logs generated by llmsnap when swapping models, +# handling requests, etc. +# - "upstream": a copy of an upstream processes stdout logs +# - "both": both the proxy and upstream logs interleaved together +# - "none": no logs are ever written to stdout +logToStdout: "proxy" + # metricsMaxInMemory: maximum number of metrics to keep in memory # - optional, default: 1000 # - controls how many metrics are stored in memory before older ones are discarded diff --git a/docs/configuration.md b/docs/configuration.md index 43e71704..faa6eeb5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -90,6 +90,9 @@ llmsnap supports many more features to customize how you want to manage your env > This is a copy of `config.example.yaml`. Always check that for the most up to date examples. ```yaml +# add this modeline for validation in vscode +# yaml-language-server: $schema=https://raw.githubusercontent.com/napmany/llmsnap/refs/heads/main/config-schema.json +# # llmsnap YAML configuration example # ----------------------------------- # @@ -115,6 +118,24 @@ healthCheckTimeout: 500 # - Valid log levels: debug, info, warn, error logLevel: info +# logTimeFormat: enables and sets the logging timestamp format +# - optional, default (disabled): "" +# - Valid values: "", "ansic", "unixdate", "rubydate", "rfc822", "rfc822z", +# "rfc850", "rfc1123", "rfc1123z", "rfc3339", "rfc3339nano", "kitchen", +# "stamp", "stampmilli", "stampmicro", and "stampnano". +# - For more info, read: https://pkg.go.dev/time#pkg-constants +logTimeFormat: "" + +# logToStdout: controls what is logged to stdout +# - optional, default: "proxy" +# - valid values: +# - "proxy": logs generated by llmsnap when swapping models, +# handling requests, etc. +# - "upstream": a copy of an upstream processes stdout logs +# - "both": both the proxy and upstream logs interleaved together +# - "none": no logs are ever written to stdout +logToStdout: "proxy" + # metricsMaxInMemory: maximum number of metrics to keep in memory # - optional, default: 1000 # - controls how many metrics are stored in memory before older ones are discarded @@ -139,6 +160,20 @@ wakeRequestTimeout: 20 # - it is automatically incremented for every model that uses it startPort: 10001 +# sendLoadingState: inject loading status updates into the reasoning (thinking) +# field +# - optional, default: false +# - when true, a stream of loading messages will be sent to the client in the +# reasoning field so chat UIs can show that loading is in progress. +# - see #366 for more details +sendLoadingState: true + +# includeAliasesInList: present aliases within the /v1/models OpenAI API listing +# - optional, default: false +# - when true, model aliases will be output to the API model listing duplicating +# all fields except for Id so chat UIs can use the alias equivalent to the original. +includeAliasesInList: false + # macros: a dictionary of string substitutions # - optional, default: empty dictionary # - macros are reusable snippets @@ -287,6 +322,10 @@ models: # - recommended to be omitted and the default used concurrencyLimit: 0 + # sendLoadingState: overrides the global sendLoadingState setting for this model + # - optional, default: undefined (use global setting) + sendLoadingState: false + # Unlisted model example: "qwen-unlisted": # unlisted: boolean, true or false diff --git a/proxy/config/config.go b/proxy/config/config.go index b262f62b..181b5aa2 100644 --- a/proxy/config/config.go +++ b/proxy/config/config.go @@ -15,6 +15,12 @@ import ( ) const DEFAULT_GROUP_ID = "(default)" +const ( + LogToStdoutProxy = "proxy" + LogToStdoutUpstream = "upstream" + LogToStdoutBoth = "both" + LogToStdoutNone = "none" +) type MacroEntry struct { Name string @@ -116,6 +122,7 @@ type Config struct { LogRequests bool `yaml:"logRequests"` LogLevel string `yaml:"logLevel"` LogTimeFormat string `yaml:"logTimeFormat"` + LogToStdout string `yaml:"logToStdout"` MetricsMaxInMemory int `yaml:"metricsMaxInMemory"` Models map[string]ModelConfig `yaml:"models"` /* key is model ID */ Profiles map[string][]string `yaml:"profiles"` @@ -181,6 +188,7 @@ func LoadConfigFromReader(r io.Reader) (Config, error) { StartPort: 5800, LogLevel: "info", LogTimeFormat: "", + LogToStdout: LogToStdoutProxy, MetricsMaxInMemory: 1000, } err = yaml.Unmarshal(data, &config) @@ -207,6 +215,12 @@ func LoadConfigFromReader(r io.Reader) (Config, error) { return Config{}, fmt.Errorf("startPort must be greater than 1") } + switch config.LogToStdout { + case LogToStdoutProxy, LogToStdoutUpstream, LogToStdoutBoth, LogToStdoutNone: + default: + return Config{}, fmt.Errorf("logToStdout must be one of: proxy, upstream, both, none") + } + // Populate the aliases map config.aliases = make(map[string]string) for modelName, modelConfig := range config.Models { diff --git a/proxy/config/config_posix_test.go b/proxy/config/config_posix_test.go index 59499a52..7127fc71 100644 --- a/proxy/config/config_posix_test.go +++ b/proxy/config/config_posix_test.go @@ -166,6 +166,7 @@ groups: expected := Config{ LogLevel: "info", LogTimeFormat: "", + LogToStdout: LogToStdoutProxy, StartPort: 5800, Macros: MacroList{ {"svr-path", "path/to/server"}, diff --git a/proxy/config/config_windows_test.go b/proxy/config/config_windows_test.go index 79010258..30c043c4 100644 --- a/proxy/config/config_windows_test.go +++ b/proxy/config/config_windows_test.go @@ -158,6 +158,7 @@ groups: expected := Config{ LogLevel: "info", LogTimeFormat: "", + LogToStdout: LogToStdoutProxy, StartPort: 5800, Macros: MacroList{ {"svr-path", "path/to/server"}, diff --git a/proxy/logMonitor.go b/proxy/logMonitor.go index 8343fb07..a0f85584 100644 --- a/proxy/logMonitor.go +++ b/proxy/logMonitor.go @@ -1,7 +1,6 @@ package proxy import ( - "container/ring" "context" "fmt" "io" @@ -12,6 +11,85 @@ import ( "github.com/napmany/llmsnap/event" ) +// circularBuffer is a fixed-size circular byte buffer that overwrites +// oldest data when full. It provides O(1) writes and O(n) reads. +type circularBuffer struct { + data []byte // pre-allocated capacity + head int // next write position + size int // current number of bytes stored (0 to cap) +} + +func newCircularBuffer(capacity int) *circularBuffer { + return &circularBuffer{ + data: make([]byte, capacity), + head: 0, + size: 0, + } +} + +// Write appends bytes to the buffer, overwriting oldest data when full. +// Data is copied into the internal buffer (not stored by reference). +func (cb *circularBuffer) Write(p []byte) { + if len(p) == 0 { + return + } + + cap := len(cb.data) + + // If input is larger than capacity, only keep the last cap bytes + if len(p) >= cap { + copy(cb.data, p[len(p)-cap:]) + cb.head = 0 + cb.size = cap + return + } + + // Calculate how much space is available from head to end of buffer + firstPart := cap - cb.head + if firstPart >= len(p) { + // All data fits without wrapping + copy(cb.data[cb.head:], p) + cb.head = (cb.head + len(p)) % cap + } else { + // Data wraps around + copy(cb.data[cb.head:], p[:firstPart]) + copy(cb.data[:len(p)-firstPart], p[firstPart:]) + cb.head = len(p) - firstPart + } + + // Update size + cb.size += len(p) + if cb.size > cap { + cb.size = cap + } +} + +// GetHistory returns all buffered data in correct order (oldest to newest). +// Returns a new slice (copy), not a view into internal buffer. +func (cb *circularBuffer) GetHistory() []byte { + if cb.size == 0 { + return nil + } + + result := make([]byte, cb.size) + cap := len(cb.data) + + // Calculate start position (oldest data) + start := (cb.head - cb.size + cap) % cap + + if start+cb.size <= cap { + // Data is contiguous, single copy + copy(result, cb.data[start:start+cb.size]) + } else { + // Data wraps around, two copies + firstPart := cap - start + copy(result[:firstPart], cb.data[start:]) + copy(result[firstPart:], cb.data[:cb.size-firstPart]) + } + + return result +} + type LogLevel int const ( @@ -19,12 +97,14 @@ const ( LevelInfo LevelWarn LevelError + + LogBufferSize = 100 * 1024 ) type LogMonitor struct { eventbus *event.Dispatcher mu sync.RWMutex - buffer *ring.Ring + buffer *circularBuffer bufferMu sync.RWMutex // typically this can be os.Stdout @@ -45,7 +125,7 @@ func NewLogMonitor() *LogMonitor { func NewLogMonitorWriter(stdout io.Writer) *LogMonitor { return &LogMonitor{ eventbus: event.NewDispatcherConfig(1000), - buffer: ring.New(10 * 1024), // keep 10KB of buffered logs + buffer: nil, // lazy initialized on first Write stdout: stdout, level: LevelInfo, prefix: "", @@ -64,12 +144,15 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) { } w.bufferMu.Lock() - bufferCopy := make([]byte, len(p)) - copy(bufferCopy, p) - w.buffer.Value = bufferCopy - w.buffer = w.buffer.Next() + if w.buffer == nil { + w.buffer = newCircularBuffer(LogBufferSize) + } + w.buffer.Write(p) w.bufferMu.Unlock() + // Make a copy for broadcast to preserve immutability + bufferCopy := make([]byte, len(p)) + copy(bufferCopy, p) w.broadcast(bufferCopy) return n, nil } @@ -77,16 +160,18 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) { func (w *LogMonitor) GetHistory() []byte { w.bufferMu.RLock() defer w.bufferMu.RUnlock() + if w.buffer == nil { + return nil + } + return w.buffer.GetHistory() +} - var history []byte - w.buffer.Do(func(p any) { - if p != nil { - if content, ok := p.([]byte); ok { - history = append(history, content...) - } - } - }) - return history +// Clear releases the buffer memory, making it eligible for GC. +// The buffer will be lazily re-allocated on the next Write. +func (w *LogMonitor) Clear() { + w.bufferMu.Lock() + w.buffer = nil + w.bufferMu.Unlock() } func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc { diff --git a/proxy/logMonitor_test.go b/proxy/logMonitor_test.go index a25b637b..aff0d3e3 100644 --- a/proxy/logMonitor_test.go +++ b/proxy/logMonitor_test.go @@ -113,3 +113,204 @@ func TestWrite_LogTimeFormat(t *testing.T) { t.Fatalf("Cannot find timestamp: %v", err) } } + +func TestCircularBuffer_WrapAround(t *testing.T) { + // Create a small buffer to test wrap-around + cb := newCircularBuffer(10) + + // Write "hello" (5 bytes) + cb.Write([]byte("hello")) + if got := string(cb.GetHistory()); got != "hello" { + t.Errorf("Expected 'hello', got %q", got) + } + + // Write "world" (5 bytes) - buffer now full + cb.Write([]byte("world")) + if got := string(cb.GetHistory()); got != "helloworld" { + t.Errorf("Expected 'helloworld', got %q", got) + } + + // Write "12345" (5 bytes) - should overwrite "hello" + cb.Write([]byte("12345")) + if got := string(cb.GetHistory()); got != "world12345" { + t.Errorf("Expected 'world12345', got %q", got) + } + + // Write data larger than buffer capacity + cb.Write([]byte("abcdefghijklmnop")) // 16 bytes, only last 10 kept + if got := string(cb.GetHistory()); got != "ghijklmnop" { + t.Errorf("Expected 'ghijklmnop', got %q", got) + } +} + +func TestCircularBuffer_BoundaryConditions(t *testing.T) { + // Test empty buffer + cb := newCircularBuffer(10) + if got := cb.GetHistory(); got != nil { + t.Errorf("Expected nil for empty buffer, got %q", got) + } + + // Test exact capacity + cb.Write([]byte("1234567890")) + if got := string(cb.GetHistory()); got != "1234567890" { + t.Errorf("Expected '1234567890', got %q", got) + } + + // Test write exactly at capacity boundary + cb = newCircularBuffer(10) + cb.Write([]byte("12345")) + cb.Write([]byte("67890")) + if got := string(cb.GetHistory()); got != "1234567890" { + t.Errorf("Expected '1234567890', got %q", got) + } +} + +func TestLogMonitor_LazyInit(t *testing.T) { + lm := NewLogMonitorWriter(io.Discard) + + // Buffer should be nil before any writes + if lm.buffer != nil { + t.Error("Expected buffer to be nil before first write") + } + + // GetHistory should return nil when buffer is nil + if got := lm.GetHistory(); got != nil { + t.Errorf("Expected nil history before first write, got %q", got) + } + + // Write should lazily initialize the buffer + lm.Write([]byte("test")) + + if lm.buffer == nil { + t.Error("Expected buffer to be initialized after write") + } + + if got := string(lm.GetHistory()); got != "test" { + t.Errorf("Expected 'test', got %q", got) + } +} + +func TestLogMonitor_Clear(t *testing.T) { + lm := NewLogMonitorWriter(io.Discard) + + // Write some data + lm.Write([]byte("hello")) + if got := string(lm.GetHistory()); got != "hello" { + t.Errorf("Expected 'hello', got %q", got) + } + + // Clear should release the buffer + lm.Clear() + + if lm.buffer != nil { + t.Error("Expected buffer to be nil after Clear") + } + + if got := lm.GetHistory(); got != nil { + t.Errorf("Expected nil history after Clear, got %q", got) + } +} + +func TestLogMonitor_ClearAndReuse(t *testing.T) { + lm := NewLogMonitorWriter(io.Discard) + + // Write, clear, then write again + lm.Write([]byte("first")) + lm.Clear() + lm.Write([]byte("second")) + + if got := string(lm.GetHistory()); got != "second" { + t.Errorf("Expected 'second' after clear and reuse, got %q", got) + } +} + +func BenchmarkLogMonitorWrite(b *testing.B) { + // Test data of varying sizes + smallMsg := []byte("small message\n") + mediumMsg := []byte(strings.Repeat("medium message content ", 10) + "\n") + largeMsg := []byte(strings.Repeat("large message content for benchmarking ", 100) + "\n") + + b.Run("SmallWrite", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(smallMsg) + } + }) + + b.Run("MediumWrite", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(mediumMsg) + } + }) + + b.Run("LargeWrite", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(largeMsg) + } + }) + + b.Run("WithSubscribers", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + // Add some subscribers + for i := 0; i < 5; i++ { + lm.OnLogData(func(data []byte) {}) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.Write(mediumMsg) + } + }) + + b.Run("GetHistory", func(b *testing.B) { + lm := NewLogMonitorWriter(io.Discard) + // Pre-populate with data + for i := 0; i < 1000; i++ { + lm.Write(mediumMsg) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + lm.GetHistory() + } + }) +} + +/* +Benchmark Results - MBP M1 Pro + +Before (ring.Ring): +| Benchmark | ns/op | bytes/op | allocs/op | +|---------------------------------|------------|----------|-----------| +| SmallWrite (14B) | 43 ns | 40 B | 2 | +| MediumWrite (241B) | 76 ns | 264 B | 2 | +| LargeWrite (4KB) | 504 ns | 4,120 B | 2 | +| WithSubscribers (5 subs) | 355 ns | 264 B | 2 | +| GetHistory (after 1000 writes) | 145,000 ns | 1.2 MB | 22 | + +After (circularBuffer 10KB): +| Benchmark | ns/op | bytes/op | allocs/op | +|---------------------------------|------------|----------|-----------| +| SmallWrite (14B) | 26 ns | 16 B | 1 | +| MediumWrite (241B) | 67 ns | 240 B | 1 | +| LargeWrite (4KB) | 774 ns | 4,096 B | 1 | +| WithSubscribers (5 subs) | 325 ns | 240 B | 1 | +| GetHistory (after 1000 writes) | 1,042 ns | 10,240 B | 1 | + +After (circularBuffer 100KB): +| Benchmark | ns/op | bytes/op | allocs/op | +|---------------------------------|------------|-----------|-----------| +| SmallWrite (14B) | 26 ns | 16 B | 1 | +| MediumWrite (241B) | 66 ns | 240 B | 1 | +| LargeWrite (4KB) | 753 ns | 4,096 B | 1 | +| WithSubscribers (5 subs) | 309 ns | 240 B | 1 | +| GetHistory (after 1000 writes) | 7,788 ns | 106,496 B | 1 | + +Summary: +- GetHistory: 139x faster (10KB), 18x faster (100KB) +- Allocations: reduced from 2 to 1 across all operations +- Small/medium writes: ~1.1-1.6x faster +*/ diff --git a/proxy/process.go b/proxy/process.go index 27a359e3..d2ee2ee2 100644 --- a/proxy/process.go +++ b/proxy/process.go @@ -612,6 +612,9 @@ func (p *Process) stopCommand() { stopStartTime := time.Now() defer func() { p.proxyLogger.Debugf("<%s> stopCommand took %v", p.ID, time.Since(stopStartTime)) + + // free the buffer in processLogger so the memory can be recovered + p.processLogger.Clear() }() p.cmdMutex.RLock() @@ -888,6 +891,11 @@ func (p *Process) cmdStopUpstreamProcess() error { return nil } +// Logger returns the logger for this process. +func (p *Process) Logger() *LogMonitor { + return p.processLogger +} + var loadingRemarks = []string{ "Still faster than your last standup meeting...", "Reticulating splines...", @@ -1106,7 +1114,6 @@ func (s *statusResponseWriter) WriteHeader(statusCode int) { s.Flush() } -// Add Flush method func (s *statusResponseWriter) Flush() { if flusher, ok := s.writer.(http.Flusher); ok { flusher.Flush() diff --git a/proxy/processgroup.go b/proxy/processgroup.go index d9c5557b..a6355b85 100644 --- a/proxy/processgroup.go +++ b/proxy/processgroup.go @@ -46,7 +46,8 @@ func NewProcessGroup(id string, config config.Config, proxyLogger *LogMonitor, u // Create a Process for each member in the group for _, modelID := range groupConfig.Members { modelConfig, modelID, _ := pg.config.FindConfig(modelID) - process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger) + processLogger := NewLogMonitorWriter(upstreamLogger) + process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, processLogger, pg.proxyLogger) pg.processes[modelID] = process } @@ -89,6 +90,13 @@ func (pg *ProcessGroup) HasMember(modelName string) bool { return slices.Contains(pg.config.Groups[pg.id].Members, modelName) } +func (pg *ProcessGroup) GetMember(modelName string) (*Process, bool) { + if pg.HasMember(modelName) { + return pg.processes[modelName], true + } + return nil, false +} + func (pg *ProcessGroup) StopProcess(modelID string, strategy StopStrategy) error { pg.Lock() diff --git a/proxy/proxymanager.go b/proxy/proxymanager.go index aaabdd07..22239c1f 100644 --- a/proxy/proxymanager.go +++ b/proxy/proxymanager.go @@ -52,17 +52,37 @@ type ProxyManager struct { version string } -func New(config config.Config) *ProxyManager { +func New(proxyConfig config.Config) *ProxyManager { // set up loggers - stdoutLogger := NewLogMonitorWriter(os.Stdout) - upstreamLogger := NewLogMonitorWriter(stdoutLogger) - proxyLogger := NewLogMonitorWriter(stdoutLogger) - if config.LogRequests { + var muxLogger, upstreamLogger, proxyLogger *LogMonitor + switch proxyConfig.LogToStdout { + case config.LogToStdoutNone: + muxLogger = NewLogMonitorWriter(io.Discard) + upstreamLogger = NewLogMonitorWriter(io.Discard) + proxyLogger = NewLogMonitorWriter(io.Discard) + case config.LogToStdoutBoth: + muxLogger = NewLogMonitorWriter(os.Stdout) + upstreamLogger = NewLogMonitorWriter(muxLogger) + proxyLogger = NewLogMonitorWriter(muxLogger) + case config.LogToStdoutUpstream: + muxLogger = NewLogMonitorWriter(os.Stdout) + upstreamLogger = NewLogMonitorWriter(muxLogger) + proxyLogger = NewLogMonitorWriter(io.Discard) + default: + // same as config.LogToStdoutProxy + // helpful because some old tests create a config.Config directly and it + // may not have LogToStdout set explicitly + muxLogger = NewLogMonitorWriter(os.Stdout) + upstreamLogger = NewLogMonitorWriter(io.Discard) + proxyLogger = NewLogMonitorWriter(muxLogger) + } + + if proxyConfig.LogRequests { proxyLogger.Warn("LogRequests configuration is deprecated. Use logLevel instead.") } - switch strings.ToLower(strings.TrimSpace(config.LogLevel)) { + switch strings.ToLower(strings.TrimSpace(proxyConfig.LogLevel)) { case "debug": proxyLogger.SetLogLevel(LevelDebug) upstreamLogger.SetLogLevel(LevelDebug) @@ -99,7 +119,7 @@ func New(config config.Config) *ProxyManager { "stampnano": time.StampNano, } - if timeFormat, ok := timeFormats[strings.ToLower(strings.TrimSpace(config.LogTimeFormat))]; ok { + if timeFormat, ok := timeFormats[strings.ToLower(strings.TrimSpace(proxyConfig.LogTimeFormat))]; ok { proxyLogger.SetLogTimeFormat(timeFormat) upstreamLogger.SetLogTimeFormat(timeFormat) } @@ -107,18 +127,18 @@ func New(config config.Config) *ProxyManager { shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) var maxMetrics int - if config.MetricsMaxInMemory <= 0 { + if proxyConfig.MetricsMaxInMemory <= 0 { maxMetrics = 1000 // Default fallback } else { - maxMetrics = config.MetricsMaxInMemory + maxMetrics = proxyConfig.MetricsMaxInMemory } pm := &ProxyManager{ - config: config, + config: proxyConfig, ginEngine: gin.New(), proxyLogger: proxyLogger, - muxLogger: stdoutLogger, + muxLogger: muxLogger, upstreamLogger: upstreamLogger, metricsMonitor: newMetricsMonitor(proxyLogger, maxMetrics), @@ -134,19 +154,19 @@ func New(config config.Config) *ProxyManager { } // create the process groups - for groupID := range config.Groups { - processGroup := NewProcessGroup(groupID, config, proxyLogger, upstreamLogger) + for groupID := range proxyConfig.Groups { + processGroup := NewProcessGroup(groupID, proxyConfig, proxyLogger, upstreamLogger) pm.processGroups[groupID] = processGroup } pm.setupGinEngine() // run any startup hooks - if len(config.Hooks.OnStartup.Preload) > 0 { + if len(proxyConfig.Hooks.OnStartup.Preload) > 0 { // do it in the background, don't block startup -- not sure if good idea yet go func() { discardWriter := &DiscardWriter{} - for _, realModelName := range config.Hooks.OnStartup.Preload { + for _, realModelName := range proxyConfig.Hooks.OnStartup.Preload { proxyLogger.Infof("Preloading model: %s", realModelName) processGroup, _, err := pm.swapProcessGroup(realModelName) @@ -266,7 +286,7 @@ func (pm *ProxyManager) setupGinEngine() { // in proxymanager_loghandlers.go pm.ginEngine.GET("/logs", pm.sendLogsHandlers) pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler) - pm.ginEngine.GET("/logs/stream/:logMonitorID", pm.streamLogsHandler) + pm.ginEngine.GET("/logs/stream/*logMonitorID", pm.streamLogsHandler) /** * User Interface Endpoints @@ -466,61 +486,61 @@ func (pm *ProxyManager) listModelsHandler(c *gin.Context) { }) } -func (pm *ProxyManager) proxyToUpstream(c *gin.Context) { - upstreamPath := c.Param("upstreamPath") - - // split the upstream path by / and search for the model name - parts := strings.Split(strings.TrimSpace(upstreamPath), "/") - if len(parts) == 0 { - pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path") - return - } - - modelFound := false +// findModelInPath searches for a valid model name in a path with slashes. +// It iteratively builds up path segments until it finds a matching model. +// Returns: (searchModelName, realModelName, remainingPath, found) +// Example: "/author/model/endpoint" with model "author/model" -> ("author/model", "author/model", "/endpoint", true) +func (pm *ProxyManager) findModelInPath(path string) (searchName string, realName string, remainingPath string, found bool) { + parts := strings.Split(strings.TrimSpace(path), "/") searchModelName := "" - var modelName, remainingPath string + for i, part := range parts { - if parts[i] == "" { + if part == "" { continue } if searchModelName == "" { searchModelName = part } else { - searchModelName = searchModelName + "/" + parts[i] + searchModelName = searchModelName + "/" + part } if real, ok := pm.config.RealModelName(searchModelName); ok { - modelName = real - remainingPath = "/" + strings.Join(parts[i+1:], "/") - modelFound = true - - // Check if this is exactly a model name with no additional path - // and doesn't end with a trailing slash - if remainingPath == "/" && !strings.HasSuffix(upstreamPath, "/") { - // Build new URL with query parameters preserved - newPath := "/upstream/" + searchModelName + "/" - if c.Request.URL.RawQuery != "" { - newPath += "?" + c.Request.URL.RawQuery - } - - // Use 308 for non-GET/HEAD requests to preserve method - if c.Request.Method == http.MethodGet || c.Request.Method == http.MethodHead { - c.Redirect(http.StatusMovedPermanently, newPath) - } else { - c.Redirect(http.StatusPermanentRedirect, newPath) - } - return - } - break + return searchModelName, real, "/" + strings.Join(parts[i+1:], "/"), true } } + return "", "", "", false +} + +func (pm *ProxyManager) proxyToUpstream(c *gin.Context) { + upstreamPath := c.Param("upstreamPath") + + searchModelName, modelName, remainingPath, modelFound := pm.findModelInPath(upstreamPath) + if !modelFound { pm.sendErrorResponse(c, http.StatusBadRequest, "model id required in path") return } + // Check if this is exactly a model name with no additional path + // and doesn't end with a trailing slash + if remainingPath == "/" && !strings.HasSuffix(upstreamPath, "/") { + // Build new URL with query parameters preserved + newPath := "/upstream/" + searchModelName + "/" + if c.Request.URL.RawQuery != "" { + newPath += "?" + c.Request.URL.RawQuery + } + + // Use 308 for non-GET/HEAD requests to preserve method + if c.Request.Method == http.MethodGet || c.Request.Method == http.MethodHead { + c.Redirect(http.StatusMovedPermanently, newPath) + } else { + c.Redirect(http.StatusPermanentRedirect, newPath) + } + return + } + processGroup, realModelName, err := pm.swapProcessGroup(modelName) if err != nil { pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error swapping process group: %s", err.Error())) diff --git a/proxy/proxymanager_loghandlers.go b/proxy/proxymanager_loghandlers.go index a3de806a..daeb786c 100644 --- a/proxy/proxymanager_loghandlers.go +++ b/proxy/proxymanager_loghandlers.go @@ -31,7 +31,7 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) { // prevent nginx from buffering streamed logs c.Header("X-Accel-Buffering", "no") - logMonitorId := c.Param("logMonitorID") + logMonitorId := strings.TrimPrefix(c.Param("logMonitorID"), "/") logger, err := pm.getLogger(logMonitorId) if err != nil { c.String(http.StatusBadRequest, err.Error()) @@ -83,18 +83,25 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) { // getLogger searches for the appropriate logger based on the logMonitorId func (pm *ProxyManager) getLogger(logMonitorId string) (*LogMonitor, error) { - var logger *LogMonitor - - if logMonitorId == "" { + switch logMonitorId { + case "": // maintain the default - logger = pm.muxLogger - } else if logMonitorId == "proxy" { - logger = pm.proxyLogger - } else if logMonitorId == "upstream" { - logger = pm.upstreamLogger - } else { - return nil, fmt.Errorf("invalid logger. Use 'proxy' or 'upstream'") - } + return pm.muxLogger, nil + case "proxy": + return pm.proxyLogger, nil + case "upstream": + return pm.upstreamLogger, nil + default: + // search for a models specific logger using findModelInPath + // to handle model names with slashes (e.g., "author/model") + if _, name, _, found := pm.findModelInPath("/" + logMonitorId); found { + for _, group := range pm.processGroups { + if process, found := group.GetMember(name); found { + return process.Logger(), nil + } + } + } - return logger, nil + return nil, fmt.Errorf("invalid logger. Use 'proxy', 'upstream' or a model's ID") + } } diff --git a/proxy/proxymanager_test.go b/proxy/proxymanager_test.go index 29513809..d13db539 100644 --- a/proxy/proxymanager_test.go +++ b/proxy/proxymanager_test.go @@ -1078,7 +1078,8 @@ func TestProxyManager_StreamingEndpointsReturnNoBufferingHeader(t *testing.T) { config := config.AddDefaultGroupToConfig(config.Config{ HealthCheckTimeout: 15, Models: map[string]config.ModelConfig{ - "model1": getTestSimpleResponderConfig("model1"), + "model1": getTestSimpleResponderConfig("model1"), + "author/model": getTestSimpleResponderConfig("author/model"), }, LogLevel: "error", }) @@ -1091,6 +1092,7 @@ func TestProxyManager_StreamingEndpointsReturnNoBufferingHeader(t *testing.T) { "/logs/stream", "/logs/stream/proxy", "/logs/stream/upstream", + "/logs/stream/author/model", } for _, endpoint := range endpoints {