Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,26 @@ As a safeguard, llama-swap also sets `X-Accel-Buffering: no` on SSE responses. H

## Monitoring Logs on the CLI

```shell
```sh
# sends up to the last 10KB of logs
curl http://host/logs'
$ curl http://host/logs

# streams combined logs
curl -Ns 'http://host/logs/stream'
curl -Ns http://host/logs/stream

# stream llama-swap's proxy status logs
curl -Ns http://host/logs/stream/proxy

# just llama-swap's logs
curl -Ns 'http://host/logs/stream/proxy'
# stream logs from upstream processes that llama-swap loads
curl -Ns http://host/logs/stream/upstream

# just upstream's logs
curl -Ns 'http://host/logs/stream/upstream'
# stream logs only from a specific model
curl -Ns http://host/logs/stream/{model_id}

# stream and filter logs with linux pipes
curl -Ns http://host/logs/stream | grep 'eval time'

# skips history and just streams new log entries
# appending ?no-history will disable sending buffered history first
curl -Ns 'http://host/logs/stream?no-history'
```

Expand Down
85 changes: 85 additions & 0 deletions ai-plans/2025-12-14-efficient-ring-buffer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Replace ring.Ring with Efficient Circular Byte Buffer

## Overview

Replace the inefficient `container/ring.Ring` implementation in `logMonitor.go` with a simple circular byte buffer that uses a single contiguous `[]byte` slice. This eliminates per-write allocations, improves cache locality, and correctly implements a 10KB buffer.

## Current Issues

1. `ring.New(10 * 1024)` creates 10,240 ring **elements**, not 10KB of storage
2. Every `Write()` call allocates a new `[]byte` slice inside the lock
3. `GetHistory()` iterates all 10,240 elements and appends repeatedly (geometric reallocs)
4. Linked list structure has poor cache locality and pointer overhead

## Design Requirements

### New CircularBuffer Type

Create a simple circular byte buffer with:
- Single pre-allocated `[]byte` of fixed capacity (10KB)
- `head` and `size` integers to track write position and data length
- No per-write allocations

### API Requirements

The new buffer must support:
1. **Write(p []byte)** - Append bytes, overwriting oldest data when full
2. **GetHistory() []byte** - Return all buffered data in correct order (oldest to newest)

### Implementation Details

```go
type circularBuffer struct {
data []byte // pre-allocated capacity
head int // next write position
size int // current number of bytes stored (0 to cap)
}
```

**Write logic:**
- If `len(p) >= capacity`: just keep the last `capacity` bytes
- Otherwise: write bytes at `head`, wrapping around if needed
- Update `head` and `size` accordingly
- Data is copied into the internal buffer (not stored by reference)

**GetHistory logic:**
- Calculate start position: `(head - size + cap) % cap`
- If not wrapped: single slice copy
- If wrapped: two copies (end of buffer + beginning)
- Returns a **new slice** (copy), not a view into internal buffer

### Immutability Guarantees (must preserve)

Per existing tests:
1. Modifying input `[]byte` after `Write()` must not affect stored data
2. `GetHistory()` returns independent copy - modifications don't affect buffer

## Files to Modify

- `proxy/logMonitor.go` - Replace `buffer *ring.Ring` with new circular buffer

## Testing Plan

Existing tests in `logMonitor_test.go` should continue to pass:
- `TestLogMonitor` - Basic write/read and subscriber notification
- `TestWrite_ImmutableBuffer` - Verify writes don't affect returned history
- `TestWrite_LogTimeFormat` - Timestamp formatting

Add new tests:
- Test buffer wrap-around behavior
- Test large writes that exceed buffer capacity
- Test exact capacity boundary conditions

## Checklist

- [ ] Create `circularBuffer` struct in `logMonitor.go`
- [ ] Implement `Write()` method for circular buffer
- [ ] Implement `GetHistory()` method for circular buffer
- [ ] Update `LogMonitor` struct to use new buffer
- [ ] Update `NewLogMonitorWriter()` to initialize new buffer
- [ ] Update `LogMonitor.Write()` to use new buffer
- [ ] Update `LogMonitor.GetHistory()` to use new buffer
- [ ] Remove `"container/ring"` import
- [ ] Run `make test-dev` to verify existing tests pass
- [ ] Add wrap-around test case
- [ ] Run `make test-all` for final validation
117 changes: 101 additions & 16 deletions proxy/logMonitor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package proxy

import (
"container/ring"
"context"
"fmt"
"io"
Expand All @@ -12,19 +11,100 @@ import (
"github.com/mostlygeek/llama-swap/event"
)

// circularBuffer is a fixed-size circular byte buffer that overwrites
// oldest data when full. It provides O(1) writes and O(n) reads.
type circularBuffer struct {
data []byte // pre-allocated capacity
head int // next write position
size int // current number of bytes stored (0 to cap)
}

func newCircularBuffer(capacity int) *circularBuffer {
return &circularBuffer{
data: make([]byte, capacity),
head: 0,
size: 0,
}
}

// Write appends bytes to the buffer, overwriting oldest data when full.
// Data is copied into the internal buffer (not stored by reference).
func (cb *circularBuffer) Write(p []byte) {
if len(p) == 0 {
return
}

cap := len(cb.data)

// If input is larger than capacity, only keep the last cap bytes
if len(p) >= cap {
copy(cb.data, p[len(p)-cap:])
cb.head = 0
cb.size = cap
return
}

// Calculate how much space is available from head to end of buffer
firstPart := cap - cb.head
if firstPart >= len(p) {
// All data fits without wrapping
copy(cb.data[cb.head:], p)
cb.head = (cb.head + len(p)) % cap
} else {
// Data wraps around
copy(cb.data[cb.head:], p[:firstPart])
copy(cb.data[:len(p)-firstPart], p[firstPart:])
cb.head = len(p) - firstPart
}

// Update size
cb.size += len(p)
if cb.size > cap {
cb.size = cap
}
}

// GetHistory returns all buffered data in correct order (oldest to newest).
// Returns a new slice (copy), not a view into internal buffer.
func (cb *circularBuffer) GetHistory() []byte {
if cb.size == 0 {
return nil
}

result := make([]byte, cb.size)
cap := len(cb.data)

// Calculate start position (oldest data)
start := (cb.head - cb.size + cap) % cap

if start+cb.size <= cap {
// Data is contiguous, single copy
copy(result, cb.data[start:start+cb.size])
} else {
// Data wraps around, two copies
firstPart := cap - start
copy(result[:firstPart], cb.data[start:])
copy(result[firstPart:], cb.data[:cb.size-firstPart])
}

return result
}

type LogLevel int

const (
LevelDebug LogLevel = iota
LevelInfo
LevelWarn
LevelError

LogBufferSize = 100 * 1024
)

type LogMonitor struct {
eventbus *event.Dispatcher
mu sync.RWMutex
buffer *ring.Ring
buffer *circularBuffer
bufferMu sync.RWMutex

// typically this can be os.Stdout
Expand All @@ -45,7 +125,7 @@ func NewLogMonitor() *LogMonitor {
func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
return &LogMonitor{
eventbus: event.NewDispatcherConfig(1000),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
buffer: nil, // lazy initialized on first Write
stdout: stdout,
level: LevelInfo,
prefix: "",
Expand All @@ -64,29 +144,34 @@ func (w *LogMonitor) Write(p []byte) (n int, err error) {
}

w.bufferMu.Lock()
bufferCopy := make([]byte, len(p))
copy(bufferCopy, p)
w.buffer.Value = bufferCopy
w.buffer = w.buffer.Next()
if w.buffer == nil {
w.buffer = newCircularBuffer(LogBufferSize)
}
w.buffer.Write(p)
w.bufferMu.Unlock()

// Make a copy for broadcast to preserve immutability
bufferCopy := make([]byte, len(p))
copy(bufferCopy, p)
w.broadcast(bufferCopy)
return n, nil
}

func (w *LogMonitor) GetHistory() []byte {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
if w.buffer == nil {
return nil
}
return w.buffer.GetHistory()
}

var history []byte
w.buffer.Do(func(p any) {
if p != nil {
if content, ok := p.([]byte); ok {
history = append(history, content...)
}
}
})
return history
// Clear releases the buffer memory, making it eligible for GC.
// The buffer will be lazily re-allocated on the next Write.
func (w *LogMonitor) Clear() {
w.bufferMu.Lock()
w.buffer = nil
w.bufferMu.Unlock()
}

func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc {
Expand Down
Loading
Loading