Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proxy/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const ConfigFileChangedEventID = 0x03
const LogDataEventID = 0x04
const TokenMetricsEventID = 0x05
const ModelPreloadedEventID = 0x06
const RequestEventID = 0x07

type ProcessStateChangeEvent struct {
ProcessName string
Expand Down
16 changes: 12 additions & 4 deletions proxy/metrics_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ func (mp *metricsMonitor) wrapHandler(
request *http.Request,
next func(modelID string, w http.ResponseWriter, r *http.Request) error,
) error {
recorder := newBodyCopier(writer)
recorder, ok := writer.(*responseBodyCopier)
if !ok {
recorder = newBodyCopier(writer)
}

// Filter Accept-Encoding to only include encodings we can decompress for metrics
if ae := request.Header.Get("Accept-Encoding"); ae != "" {
Expand Down Expand Up @@ -301,9 +304,10 @@ func decompressBody(body []byte, encoding string) ([]byte, error) {
// while also capturing it in a buffer for later processing
type responseBodyCopier struct {
gin.ResponseWriter
body *bytes.Buffer
tee io.Writer
start time.Time
body *bytes.Buffer
tee io.Writer
start time.Time
onWrite func([]byte)
}

func newBodyCopier(w gin.ResponseWriter) *responseBodyCopier {
Expand All @@ -320,6 +324,10 @@ func (w *responseBodyCopier) Write(b []byte) (int, error) {
w.start = time.Now()
}

if w.onWrite != nil {
w.onWrite(b)
}

// Single write operation that writes to both the response and buffer
return w.tee.Write(b)
}
Expand Down
65 changes: 58 additions & 7 deletions proxy/proxymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ProxyManager struct {
muxLogger *LogMonitor

metricsMonitor *metricsMonitor
requestMonitor *requestMonitor

processGroups map[string]*ProcessGroup

Expand Down Expand Up @@ -152,6 +153,7 @@ func New(proxyConfig config.Config) *ProxyManager {
upstreamLogger: upstreamLogger,

metricsMonitor: newMetricsMonitor(proxyLogger, maxMetrics),
requestMonitor: newRequestMonitor(maxMetrics),

processGroups: make(map[string]*ProcessGroup),

Expand Down Expand Up @@ -598,15 +600,29 @@ func (pm *ProxyManager) proxyToUpstream(c *gin.Context) {
originalPath := c.Request.URL.Path
c.Request.URL.Path = remainingPath

var requestBody string
if c.Request.ContentLength > 0 && c.Request.ContentLength < 1024*1024 { // Only capture small bodies
bodyBytes, err := io.ReadAll(c.Request.Body)
if err != nil {
pm.proxyLogger.Errorf("Error reading request body for recording: %v", err)
} else {
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
requestBody = string(bodyBytes)
}
}
Comment on lines +603 to +612
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t continue proxying after a failed body read.
If io.ReadAll fails, the request body may be partially consumed, and the upstream call will see a truncated or empty body. Bail out early and return an error response.

🔧 Suggested fix
-		bodyBytes, err := io.ReadAll(c.Request.Body)
-		if err != nil {
-			pm.proxyLogger.Errorf("Error reading request body for recording: %v", err)
-		} else {
-			c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
-			requestBody = string(bodyBytes)
-		}
+		bodyBytes, err := io.ReadAll(c.Request.Body)
+		if err != nil {
+			pm.sendErrorResponse(c, http.StatusBadRequest, "failed to read request body")
+			pm.proxyLogger.Errorf("Error reading request body for recording: %v", err)
+			return
+		}
+		c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
+		requestBody = string(bodyBytes)
🤖 Prompt for AI Agents
In `@proxy/proxymanager.go` around lines 603 - 612, The read-failure branch must
stop proxying and return an error to the client: when io.ReadAll(c.Request.Body)
returns an err, log the error with pm.proxyLogger.Errorf and then abort the
request handling (do not continue to upstream) by sending an HTTP error response
(e.g., 500) and returning from the handler; do not attempt to proxy with a
partially consumed c.Request.Body or set requestBody in that case. Update the
block around io.ReadAll, c.Request.Body, requestBody and the surrounding handler
logic to perform this early return on error (use the framework's abort/return
method such as c.AbortWithStatus/AbortWithStatusJSON or equivalent).


recorder, done := pm.recordRequest(c, modelID, requestBody)
defer done()

// attempt to record metrics if it is a POST request
if pm.metricsMonitor != nil && c.Request.Method == "POST" {
if err := pm.metricsMonitor.wrapHandler(modelID, c.Writer, c.Request, processGroup.ProxyRequest); err != nil {
if err := pm.metricsMonitor.wrapHandler(modelID, recorder, c.Request, processGroup.ProxyRequest); err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error proxying metrics wrapped request: %s", err.Error()))
pm.proxyLogger.Errorf("Error proxying wrapped upstream request for model %s, path=%s", modelID, originalPath)
pm.proxyLogger.Errorf("Error proxying metrics wrapped request for model %s, path=%s", modelID, originalPath)
return
}
} else {
if err := processGroup.ProxyRequest(modelID, c.Writer, c.Request); err != nil {
if err := processGroup.ProxyRequest(modelID, recorder, c.Request); err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error proxying request: %s", err.Error()))
pm.proxyLogger.Errorf("Error proxying upstream request for model %s, path=%s", modelID, originalPath)
return
Expand Down Expand Up @@ -720,20 +736,23 @@ func (pm *ProxyManager) proxyInferenceHandler(c *gin.Context) {
c.Request.Header.Set("content-length", strconv.Itoa(len(bodyBytes)))
c.Request.ContentLength = int64(len(bodyBytes))

recorder, done := pm.recordRequest(c, modelID, string(bodyBytes))
defer done()

// issue #366 extract values that downstream handlers may need
isStreaming := gjson.GetBytes(bodyBytes, "stream").Bool()
ctx := context.WithValue(c.Request.Context(), proxyCtxKey("streaming"), isStreaming)
ctx = context.WithValue(ctx, proxyCtxKey("model"), modelID)
c.Request = c.Request.WithContext(ctx)

if pm.metricsMonitor != nil && c.Request.Method == "POST" {
if err := pm.metricsMonitor.wrapHandler(modelID, c.Writer, c.Request, nextHandler); err != nil {
if err := pm.metricsMonitor.wrapHandler(modelID, recorder, c.Request, nextHandler); err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error proxying metrics wrapped request: %s", err.Error()))
pm.proxyLogger.Errorf("Error Proxying Metrics Wrapped Request model %s", modelID)
return
}
} else {
if err := nextHandler(modelID, c.Writer, c.Request); err != nil {
if err := nextHandler(modelID, recorder, c.Request); err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error proxying request: %s", err.Error()))
pm.proxyLogger.Errorf("Error Proxying Request for model %s", modelID)
return
Expand Down Expand Up @@ -862,7 +881,10 @@ func (pm *ProxyManager) proxyOAIPostFormHandler(c *gin.Context) {
modifiedReq.ContentLength = int64(requestBuffer.Len())

// Use the modified request for proxying
if err := nextHandler(modelID, c.Writer, modifiedReq); err != nil {
recorder, done := pm.recordRequest(c, modelID, "")
defer done()

if err := nextHandler(modelID, recorder, modifiedReq); err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error proxying request: %s", err.Error()))
pm.proxyLogger.Errorf("Error Proxying Request for model %s", modelID)
return
Expand Down Expand Up @@ -899,7 +921,10 @@ func (pm *ProxyManager) proxyGETModelHandler(c *gin.Context) {
return
}

if err := nextHandler(modelID, c.Writer, c.Request); err != nil {
recorder, done := pm.recordRequest(c, modelID, "")
defer done()

if err := nextHandler(modelID, recorder, c.Request); err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error proxying request: %s", err.Error()))
pm.proxyLogger.Errorf("Error Proxying GET Request for model %s", modelID)
return
Expand Down Expand Up @@ -1026,3 +1051,29 @@ func (pm *ProxyManager) SetVersion(buildDate string, commit string, version stri
pm.commit = commit
pm.version = version
}

func (pm *ProxyManager) recordRequest(c *gin.Context, modelID string, requestBody string) (*responseBodyCopier, func()) {
startTime := time.Now()
requestID := pm.requestMonitor.Add(&RequestEntry{
Timestamp: startTime,
Method: c.Request.Method,
Path: c.Request.URL.Path,
Model: modelID,
RequestBody: requestBody,
})

recorder := newBodyCopier(c.Writer)
recorder.onWrite = func(b []byte) {
pm.requestMonitor.AppendResponse(requestID, string(b))
}

return recorder, func() {
duration := time.Since(startTime)
respBody := ""
isStreaming := strings.Contains(recorder.Header().Get("Content-Type"), "text/event-stream")
if !isStreaming {
respBody = recorder.body.String()
}
pm.requestMonitor.Update(requestID, recorder.Status(), duration, respBody)
}
Comment on lines +1055 to +1078
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, let's locate the responseBodyCopier implementation
cd /tmp/repo 2>/dev/null || cd . 
git ls-files | head -20
rg -l "responseBodyCopier|newBodyCopier" --type go

Repository: mostlygeek/llama-swap

Length of output: 686


🏁 Script executed:

#!/bin/bash
# Find the responseBodyCopier type definition and newBodyCopier function
rg "type responseBodyCopier|newBodyCopier" -A 5 --type go

Repository: mostlygeek/llama-swap

Length of output: 2756


🏁 Script executed:

#!/bin/bash
# Find all methods implemented by responseBodyCopier
rg "func \(.*\*?responseBodyCopier\)" --type go

Repository: mostlygeek/llama-swap

Length of output: 388


🏁 Script executed:

#!/bin/bash
# Check how sendErrorResponse is implemented
rg "func.*sendErrorResponse" -A 20 --type go

Repository: mostlygeek/llama-swap

Length of output: 1185


🏁 Script executed:

#!/bin/bash
# Find where recordRequest is called
rg "recordRequest" -B 3 -A 10 --type go

Repository: mostlygeek/llama-swap

Length of output: 4222


🏁 Script executed:

#!/bin/bash
# Find the handler/middleware where recordRequest is used
rg "func.*gin.HandlerFunc|func.*gin.Context" -B 2 -A 25 --type go | head -200

Repository: mostlygeek/llama-swap

Length of output: 10675


🏁 Script executed:

#!/bin/bash
# Check if wrapHandler or other handlers assign c.Writer = recorder
rg "c.Writer\s*=" -B 5 -A 5 --type go proxy/

Repository: mostlygeek/llama-swap

Length of output: 47


🏁 Script executed:

#!/bin/bash
# Check the wrapHandler signature and implementation
rg "func.*wrapHandler" -A 30 --type go

Repository: mostlygeek/llama-swap

Length of output: 1814


Assign the recorder to c.Writer to capture error responses.

recordRequest creates a recorder but doesn't attach it to c.Writer, so error responses written via sendErrorResponse bypass the recorder entirely. This leaves request logs with empty bodies on error paths.

The fix is valid: responseBodyCopier embeds gin.ResponseWriter, satisfying the interface for assignment to c.Writer.

Suggested fix
 recorder := newBodyCopier(c.Writer)
+c.Writer = recorder
 recorder.onWrite = func(b []byte) {
🤖 Prompt for AI Agents
In `@proxy/proxymanager.go` around lines 1055 - 1078, In recordRequest, the
responseBodyCopier created by newBodyCopier is never assigned to the Gin context
writer so error responses (e.g., those sent via sendErrorResponse) bypass it;
fix by assigning the recorder (responseBodyCopier) to c.Writer immediately after
creation so it implements gin.ResponseWriter and captures all writes (ensure
recorder.onWrite remains set and cleanup restores original writer if needed).

}
62 changes: 61 additions & 1 deletion proxy/proxymanager_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func addApiHandlers(pm *ProxyManager) {
apiGroup.POST("/models/unload/*model", pm.apiUnloadSingleModelHandler)
apiGroup.GET("/events", pm.apiSendEvents)
apiGroup.GET("/metrics", pm.apiGetMetrics)
apiGroup.GET("/requests", pm.apiGetRequests)
apiGroup.GET("/requests/:id", pm.apiGetRequest)
apiGroup.GET("/version", pm.apiGetVersion)
}
}
Expand Down Expand Up @@ -105,6 +107,7 @@ const (
msgTypeModelStatus messageType = "modelStatus"
msgTypeLogData messageType = "logData"
msgTypeMetrics messageType = "metrics"
msgTypeRequest messageType = "request"
)

type messageEnvelope struct {
Expand All @@ -121,7 +124,7 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
// prevent nginx from buffering SSE
c.Header("X-Accel-Buffering", "no")

sendBuffer := make(chan messageEnvelope, 25)
sendBuffer := make(chan messageEnvelope, 100)
ctx, cancel := context.WithCancel(c.Request.Context())
sendModels := func() {
data, err := json.Marshal(pm.getModelStatus())
Expand Down Expand Up @@ -164,6 +167,18 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
}
}

sendRequest := func(req RequestEntry) {
jsonData, err := json.Marshal(req)
if err == nil {
select {
case sendBuffer <- messageEnvelope{Type: msgTypeRequest, Data: string(jsonData)}:
case <-ctx.Done():
return
default:
}
}
}

/**
* Send updated models list
*/
Expand Down Expand Up @@ -191,12 +206,30 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
sendMetrics([]TokenMetrics{e.Metrics})
})()

/**
* Send Request data
*/
defer event.On(func(e RequestEvent) {
sendRequest(e.Entry)
})()

// send initial batch of data
sendLogData("proxy", pm.proxyLogger.GetHistory())
sendLogData("upstream", pm.upstreamLogger.GetHistory())
sendModels()
sendMetrics(pm.metricsMonitor.getMetrics())

// Send recent requests (without bodies to keep initial sync light)
requests := pm.requestMonitor.GetEntries()
if len(requests) > 20 {
requests = requests[len(requests)-20:]
}
for _, r := range requests {
r.RequestBody = ""
r.ResponseBody = ""
sendRequest(r)
}

for {
select {
case <-c.Request.Context().Done():
Expand All @@ -221,6 +254,33 @@ func (pm *ProxyManager) apiGetMetrics(c *gin.Context) {
c.Data(http.StatusOK, "application/json", jsonData)
}

func (pm *ProxyManager) apiGetRequests(c *gin.Context) {
entries := pm.requestMonitor.GetEntries()
// Strip bodies for list view
for i := range entries {
entries[i].RequestBody = ""
entries[i].ResponseBody = ""
}
c.JSON(http.StatusOK, entries)
}
Comment on lines +257 to +265
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same mutation issue — modifying returned slice elements.

Similar to the SSE initial sync, this modifies entries[i] directly. If GetEntries() returns references, this corrupts the stored data.

Suggested fix
 func (pm *ProxyManager) apiGetRequests(c *gin.Context) {
 	entries := pm.requestMonitor.GetEntries()
-	// Strip bodies for list view
-	for i := range entries {
-		entries[i].RequestBody = ""
-		entries[i].ResponseBody = ""
-	}
-	c.JSON(http.StatusOK, entries)
+	// Strip bodies for list view - create copies to avoid mutating stored data
+	stripped := make([]RequestEntry, len(entries))
+	for i, e := range entries {
+		stripped[i] = e
+		stripped[i].RequestBody = ""
+		stripped[i].ResponseBody = ""
+	}
+	c.JSON(http.StatusOK, stripped)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (pm *ProxyManager) apiGetRequests(c *gin.Context) {
entries := pm.requestMonitor.GetEntries()
// Strip bodies for list view
for i := range entries {
entries[i].RequestBody = ""
entries[i].ResponseBody = ""
}
c.JSON(http.StatusOK, entries)
}
func (pm *ProxyManager) apiGetRequests(c *gin.Context) {
entries := pm.requestMonitor.GetEntries()
// Strip bodies for list view - create copies to avoid mutating stored data
stripped := make([]RequestEntry, len(entries))
for i, e := range entries {
stripped[i] = e
stripped[i].RequestBody = ""
stripped[i].ResponseBody = ""
}
c.JSON(http.StatusOK, stripped)
}
🤖 Prompt for AI Agents
In `@proxy/proxymanager_api.go` around lines 257 - 265, apiGetRequests is mutating
the entries returned by pm.requestMonitor.GetEntries() which may be references
to stored data; instead create a new slice of copied entries and clear
RequestBody/ResponseBody on the copies so the original data isn't modified.
Locate apiGetRequests and GetEntries usage, allocate a new slice with the same
length, copy each entry (e.g., by value or deep copy), set the RequestBody and
ResponseBody on the copy to empty, and return the new slice in c.JSON to avoid
corrupting the stored entries.


func (pm *ProxyManager) apiGetRequest(c *gin.Context) {
idStr := c.Param("id")
var id int
if _, err := fmt.Sscanf(idStr, "%d", &id); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request id"})
return
}

entry, found := pm.requestMonitor.GetEntry(id)
if !found {
c.JSON(http.StatusNotFound, gin.H{"error": "request not found"})
return
}

c.JSON(http.StatusOK, entry)
}

func (pm *ProxyManager) apiUnloadSingleModelHandler(c *gin.Context) {
requestedModel := strings.TrimPrefix(c.Param("model"), "/")
realModelName, found := pm.config.RealModelName(requestedModel)
Expand Down
Loading
Loading