feat: add optional -enrich-cot flag to stream swap progress in reasoning_content#370
feat: add optional -enrich-cot flag to stream swap progress in reasoning_content#370ServeurpersoCom wants to merge 1 commit intomostlygeek:mainfrom
Conversation
…ing_content Implements issue mostlygeek#367 by adding an optional -enrich-cot flag to enrich streamed reasoning_content with live swap progress logs When enabled, llama-swap injects swap-related log lines and progress dots into the streamed reasoning_content of /v1 responses, improving UX during model swaps without affecting model context or API compatibility Key changes: - Added CLI flag handling (-enrich-cot) in llama-swap.go - Introduced cot_enrichment.go and cot_session.go for streaming logic - Integrated CoT lifecycle in Process, ProcessGroup, and ProxyManager - Added swap_progress_writer for real-time parsing of stdout/stderr - Added full unit tests for CoT sessions and progress writer
WalkthroughImplements Chain-of-Thought (CoT) enrichment via new Changes
Sequence DiagramsequenceDiagram
participant CLI as CLI/llama-swap.go
participant PM as ProxyManager
participant PG as ProcessGroup
participant Proc as Process
participant RoundTrip as cotRoundTripper
participant Session as cotSession
CLI->>PM: SetCoTEnrichment(true)
PM->>PG: SetCoTEnrichment(true)
PG->>Proc: SetCoTEnrichment(true)
Note over Proc: CoT Enabled
rect rgb(200, 240, 220)
Note over CLI,Session: Request Handling Flow
CLI->>PM: proxyOAIHandler(stream request)
PM->>PM: isStream=true, cotEnrichment=true
PM->>PM: withCoTEnrichment context
PM->>Proc: ProxyRequest(ctx, req)
Proc->>Session: PrepareCoTSession()
Proc->>Session: AppendCoTLine("swap info...")
end
rect rgb(240, 200, 220)
Note over RoundTrip,Session: Response Interception
RoundTrip->>RoundTrip: RoundTrip(req with CoT context)
RoundTrip->>Session: shouldInjectCoT(ctx, resp)
activate Session
RoundTrip->>Session: wrapCoTStream(body)
Session->>Session: stream buffered lines + original body
deactivate Session
end
Session-->>RoundTrip: io.ReadCloser (CoT-wrapped)
RoundTrip-->>PM: *http.Response with CoT content
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~35 minutes
Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
I’m going to record a short demo video to compare this behavior with my minimal swap.js prototype. |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (9)
proxy/swap_progress_writer_test.go (3)
8-31: Good coverage of line filtering and flush behavior.Test validates that only load_tensors lines are emitted and partial non-dot tails are ignored on Flush. Looks correct. Consider adding a CRLF case to ensure Windows line endings are handled as intended.
33-48: Whitespace preservation test is precise.Assertion matches TrimRight("\r") behavior. You could add t.Parallel() for faster test runs if there’s no shared state.
72-87: Dot line with newline behaves as expected.Consider adding a mixed-input test (dots followed by text without newline, then newline) to lock semantics around buffer carry-over.
proxy/swap_progress_writer.go (2)
62-82: Flush also calls handlers under lock.For consistency and to avoid long critical sections, release the lock before emitting the final line’s callbacks, then re-acquire if needed.
@@ func (w *swapProgressWriter) Flush() { - line := strings.TrimRight(string(w.buffer), "\r") - w.buffer = nil - if len(strings.TrimSpace(line)) != 0 { - if isDotLine(line) { - w.emitDots(line) - } else { - w.emitLine(line) - } - } + line := strings.TrimRight(string(w.buffer), "\r") + w.buffer = nil + shouldEmit := len(strings.TrimSpace(line)) != 0 + w.mu.Unlock() + if shouldEmit { + if isDotLine(line) { + w.emitDots(line) + } else { + w.emitLine(line) + } + }
120-130: isDotLine is correct; minor micro-optimizations optional.Current implementation is clear. You could bail early on first non-dot as you do, so this is fine.
llama-swap.go (1)
36-36: New flag name/description LGTM.Flag is concise and defaults off. Consider adding it to README/usage docs.
proxy/processgroup.go (2)
49-60: Avoid shadowing modelID for readability.modelConfig, modelID, _ := pg.config.FindConfig(modelID) reuses modelID. Rename the returned ID (e.g., resolvedID) to avoid confusion.
- modelConfig, modelID, _ := pg.config.FindConfig(modelID) - process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger) + modelConfig, resolvedID, _ := pg.config.FindConfig(modelID) + process := NewProcess(resolvedID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger) @@ - if displayName == "" { - displayName = modelID + if displayName == "" { + displayName = resolvedID } - pg.processes[modelID] = process + pg.processes[resolvedID] = process
86-94: CoT session prep while holding group lock.This trades concurrency for simplicity. If PrepareCoTSession/AppendCoTLine can block on I/O, consider moving them outside the lock and guarding swap state another way. If they’re fast/local, fine as-is.
proxy/process.go (1)
725-731: Consider handling attachCoTWriter failure.If
attachCoTWriterreturnsfalseat Line 730, the CoT session is neither attached nor explicitly aborted. While the session will eventually be cleaned up on the next request or process stop, consider explicitly aborting the session when attachment fails to free resources sooner.if p.cotEnabled() && !isCoTEnrichment(r.Context()) { defer p.abortCoTSession() } if p.cotEnabled() && isCoTEnrichment(r.Context()) { - p.attachCoTWriter(w) + if !p.attachCoTWriter(w) { + defer p.abortCoTSession() + } }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
llama-swap.go(3 hunks)proxy/cot_enrichment.go(1 hunks)proxy/cot_session.go(1 hunks)proxy/cot_session_test.go(1 hunks)proxy/process.go(9 hunks)proxy/processgroup.go(4 hunks)proxy/proxymanager.go(5 hunks)proxy/swap_progress_writer.go(1 hunks)proxy/swap_progress_writer_test.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
📄 CodeRabbit inference engine (CLAUDE.md)
Fix all staticcheck-reported issues in Go code
Files:
proxy/cot_session_test.goproxy/processgroup.goproxy/cot_session.goproxy/swap_progress_writer.gollama-swap.goproxy/cot_enrichment.goproxy/proxymanager.goproxy/process.goproxy/swap_progress_writer_test.go
🧬 Code graph analysis (4)
proxy/processgroup.go (1)
proxy/process.go (1)
Process(42-87)
llama-swap.go (1)
proxy/proxymanager.go (1)
New(50-143)
proxy/cot_enrichment.go (1)
proxy/process.go (1)
Process(42-87)
proxy/proxymanager.go (1)
proxy/processgroup.go (1)
ProcessGroup(13-30)
🔇 Additional comments (14)
proxy/swap_progress_writer_test.go (1)
50-70: Dot streaming semantics validated.Per-dot emission without newline is exercised and correct.
proxy/swap_progress_writer.go (1)
98-118: emitDotsFromBuffer correctness and complexity look good.Efficiently drains dot-only buffers and preserves CR. No issues.
llama-swap.go (2)
99-101: Apply CoT enrichment on reload: correct spot.Creating a fresh ProxyManager, then SetCoTEnrichment before assigning to srv.Handler is right.
116-118: Initial load wiring is consistent.Same ordering as reload path. Good.
proxy/cot_enrichment.go (1)
10-23: Context key pattern is correct.Unexported empty-struct key avoids collisions; helpers are straightforward.
proxy/processgroup.go (2)
29-30: New group-level cotEnrichment toggle: good addition.Field is private; default false; propagated via setter. LGTM.
65-78: SetCoTEnrichment implementation is safe.Copies processes slice under lock and calls out-of-lock. Good concurrency hygiene.
proxy/proxymanager.go (2)
303-316: LGTM! Excellent lock management pattern.The method correctly minimizes lock contention by copying the process groups slice before releasing the lock, then propagating the CoT enrichment setting outside the critical section.
573-574: CoT enrichment scoping is correctly implemented.The code properly gates enrichment with
pm.cotEnrichment && isStream, whereisStreamis extracted from the JSON request body. When enabled=false,withCoTEnrichmentreturns the context unchanged, ensuring non-streaming requests bypass enrichment entirely. The variable is properly declared before use and the logic is sound.proxy/cot_session_test.go (1)
1-185: LGTM! Comprehensive test coverage.The test suite effectively validates CoT session behavior including:
- Line ordering and streaming semantics
- Immediate writer attachment and delivery
- Inline segment handling without unwanted newlines
- Proper newline insertion between segments
The
recordingResponseWritertest helper is well-designed for capturing output.proxy/process.go (3)
610-616: LGTM! CoT session properly finalized on successful start.The CoT session is correctly finished after the progress writer is flushed and the "Starting inference" line is appended. This ensures all startup logs are captured before transitioning to the inference phase.
135-140: Verification complete: cotRoundTripper is correctly implemented.The
cotRoundTripperstruct is properly defined inproxy/cot_enrichment.go(lines 25-28) with the required fieldsbaseandprocess. TheRoundTripmethod (lines 30-48) correctly implements thehttp.RoundTripperinterface:
- Delegates to the base transport (with fallback to
http.DefaultTransport)- Handles errors appropriately
- Checks
shouldInjectCoTbefore intercepting- Wraps the response body with
wrapCoTStreamfor CoT content injection- Properly removes the
Content-Lengthheader before streamingThe implementation is sound and ready.
488-498: No resource cleanup concerns identified.After reviewing the complete
swapProgressWriterimplementation:
The struct contains only a
sync.Mutex, a[]bytebuffer, and two function pointers—all automatically managed Go types. No goroutines are created anywhere.The
Write()method processes incoming data synchronously: it appends bytes to the buffer, extracts complete lines when\nis encountered, and calls the provided handler functions. All operations are synchronous.The
Flush()method drains any remaining buffered data, calls the appropriate handlers, and sets the buffer to nil. This is standard cleanup.The
swapProgressWriterdoes not implementio.Closer, soos/execwill not attempt to close it.The
io.MultiWriterinproxy/process.gois a stateless standard library writer with no cleanup requirements.The code is correct and requires no changes.
proxy/cot_session.go (1)
268-283: SSE format is correct and compatible.The
formatCoTEntryfunction produces valid SSE format for reasoning content deltas. OpenAI streaming response format expects choices[0].delta.reasoning_content for reasoning deltas, which this code correctly provides. The minimal structure ({"choices":[{"delta":{"reasoning_content":...}}]}) is intentional: the function streams only reasoning chunks, while the complete OpenAI response (with id, object, model, and other metadata) is provided by the backend response body that follows viaio.Copy()incotStream.run().The codebase tests (cot_session_test.go) confirm this exact format is expected across all streaming scenarios.
| func (rt *cotRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { | ||
| transport := rt.base | ||
| if transport == nil { | ||
| transport = http.DefaultTransport | ||
| } | ||
|
|
||
| resp, err := transport.RoundTrip(req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| if rt.process == nil || !rt.process.shouldInjectCoT(req.Context(), resp) { | ||
| return resp, nil | ||
| } | ||
|
|
||
| resp.Header.Del("Content-Length") | ||
| resp.Body = rt.process.wrapCoTStream(resp.Body) | ||
| return resp, nil | ||
| } |
There was a problem hiding this comment.
Adjust Content-Length and guard nil body when wrapping.
When changing resp.Body, also set ContentLength = -1. Guard against nil Body (e.g., HEAD). Optionally clear Content-Encoding if upstream compressed output isn’t re-encoded by wrapCoTStream.
@@ func (rt *cotRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
- resp.Header.Del("Content-Length")
- resp.Body = rt.process.wrapCoTStream(resp.Body)
+ resp.Header.Del("Content-Length")
+ resp.ContentLength = -1
+ if resp.Body != nil {
+ resp.Body = rt.process.wrapCoTStream(resp.Body)
+ }
return resp, nilIf wrapCoTStream outputs plain (uncompressed) data while upstream set Content-Encoding, also:
resp.Header.Del("Content-Encoding")Based on coding guidelines
🤖 Prompt for AI Agents
In proxy/cot_enrichment.go around lines 30 to 48, update the RoundTrip response
handling to guard against a nil resp.Body before wrapping (returning resp
immediately if Body is nil), set resp.ContentLength = -1 whenever you replace
resp.Body, and remove resp.Header.Del("Content-Encoding") when wrapCoTStream
returns uncompressed/plain data (i.e., call resp.Header.Del("Content-Encoding")
after setting the new Body if the wrapper does not preserve upstream encoding);
keep removing resp.Header.Del("Content-Length") as well but ensure ContentLength
is explicitly set to -1 to avoid mismatches.
| func (s *cotSession) abort() { | ||
| s.mu.Lock() | ||
| if s.aborted { | ||
| s.mu.Unlock() | ||
| return | ||
| } | ||
| s.aborted = true | ||
| s.inlineBreak = false | ||
| if s.attached { | ||
| close(s.lines) | ||
| } | ||
| s.buffer = nil | ||
| s.writer = nil | ||
| s.flusher = nil | ||
| s.mu.Unlock() | ||
| } |
There was a problem hiding this comment.
Add panic protection for channel close in abort().
The close(s.lines) at Line 161 is not protected by defer recover(), unlike similar operations in finish() (Line 144) and attach() (Line 114). If abort() races with finish() or is called multiple times despite the aborted flag check, closing an already-closed channel will panic.
Apply this diff to add panic protection:
func (s *cotSession) abort() {
s.mu.Lock()
if s.aborted {
s.mu.Unlock()
return
}
s.aborted = true
s.inlineBreak = false
if s.attached {
- close(s.lines)
+ func() {
+ defer func() { _ = recover() }()
+ close(s.lines)
+ }()
}
s.buffer = nil
s.writer = nil
s.flusher = nil
s.mu.Unlock()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (s *cotSession) abort() { | |
| s.mu.Lock() | |
| if s.aborted { | |
| s.mu.Unlock() | |
| return | |
| } | |
| s.aborted = true | |
| s.inlineBreak = false | |
| if s.attached { | |
| close(s.lines) | |
| } | |
| s.buffer = nil | |
| s.writer = nil | |
| s.flusher = nil | |
| s.mu.Unlock() | |
| } | |
| func (s *cotSession) abort() { | |
| s.mu.Lock() | |
| if s.aborted { | |
| s.mu.Unlock() | |
| return | |
| } | |
| s.aborted = true | |
| s.inlineBreak = false | |
| if s.attached { | |
| func() { | |
| defer func() { _ = recover() }() | |
| close(s.lines) | |
| }() | |
| } | |
| s.buffer = nil | |
| s.writer = nil | |
| s.flusher = nil | |
| s.mu.Unlock() | |
| } |
🤖 Prompt for AI Agents
In proxy/cot_session.go around lines 152 to 167, the unprotected close(s.lines)
in abort() can panic if the channel is already closed; wrap the close call in a
small anonymous function that defers a recover() (like finish() and attach() do)
so any panic from closing an already-closed channel is swallowed, leaving the
rest of abort() unchanged (keep the lock/unlock and state updates as-is).
| w.mu.Lock() | ||
| defer w.mu.Unlock() | ||
|
|
||
| w.buffer = append(w.buffer, p...) | ||
| for { | ||
| idx := bytes.IndexByte(w.buffer, '\n') | ||
| if idx != -1 { | ||
| raw := string(w.buffer[:idx]) | ||
| w.buffer = w.buffer[idx+1:] | ||
|
|
||
| line := strings.TrimRight(raw, "\r") | ||
| if len(strings.TrimSpace(line)) != 0 { | ||
| if isDotLine(line) { | ||
| w.emitDots(line) | ||
| } else { | ||
| w.emitLine(line) | ||
| } | ||
| } | ||
| continue |
There was a problem hiding this comment.
Don’t call user callbacks while holding the mutex.
emitLine/emitDots are invoked under w.mu. If handlers block or re-enter, you can deadlock or stall writers. Unlock around handler calls.
Apply this minimal change to call handlers outside the lock:
@@ func (w *swapProgressWriter) Write(p []byte) (int, error) {
- line := strings.TrimRight(raw, "\r")
- if len(strings.TrimSpace(line)) != 0 {
- if isDotLine(line) {
- w.emitDots(line)
- } else {
- w.emitLine(line)
- }
- }
+ line := strings.TrimRight(raw, "\r")
+ if len(strings.TrimSpace(line)) != 0 {
+ // Invoke handlers outside the critical section.
+ w.mu.Unlock()
+ if isDotLine(line) {
+ w.emitDots(line)
+ } else {
+ w.emitLine(line)
+ }
+ w.mu.Lock()
+ }Mirror the same pattern in Flush when emitting a final line to avoid holding the lock across callbacks.
🤖 Prompt for AI Agents
In proxy/swap_progress_writer.go around lines 31 to 49, emitLine and emitDots
are being called while w.mu is held which can deadlock if handlers block or
re-enter; modify the loop so it extracts the next trimmed line and decides
whether it is a dot-line while still holding the lock, then remove that slice
from w.buffer, store a small struct/flag with the line and whether to call
emitDots or emitLine, release the lock, and only then invoke the appropriate
handler; repeat the loop acquiring the lock to pull the next line. Apply the
same pattern in Flush when emitting the final line: prepare the line under the
lock, drop the lock, then call the handler outside the lock.
|
Demo video with this llama-swap feature enabled : llama-swap-enrich-cot-swap-progress-AVC400kbps.mp4 |
|
Hey @mostlygeek I’m proposing this implementation for the -enrich-cot feature so you can review and see if it fits your vision for llama-swap. I’m happy to make any adjustments or follow-ups you think are best: just let me know what you’d like changed. |
|
Thanks for proving that it works. I opened #366 to implement it and I was going to take this approach:
|
|
Your config-based approach makes total sense, especially tying it directly to the process state. |
|
closed in favor of #371 |

Proposal for issue #367 by adding an optional -enrich-cot flag to enrich streamed reasoning_content with live swap progress logs
When enabled, llama-swap injects swap-related log lines and progress dots into the streamed reasoning_content of /v1 responses, improving UX during model swaps without affecting model context or API compatibility
Key changes:
Summary by CodeRabbit
Release Notes