Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions misc/benchmark-chatcompletion/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package main

// created for issue: #252 https://github.com/mostlygeek/llama-swap/issues/252
// this simple benchmark tool sends a lot of small chat completion requests to llama-swap
// to make sure all the requests are accounted for.
//
// requests can be sent in parallel, and the tool will report the results.
// usage: go run main.go -baseurl http://localhost:8080/v1 -model llama3 -requests 1000 -par 5

import (
"bytes"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"sync"
"time"
)

func main() {
// ----- CLI arguments ----------------------------------------------------
var (
baseurl string
modelName string
totalRequests int
parallelization int
)

flag.StringVar(&baseurl, "baseurl", "http://localhost:8080/v1", "Base URL of the API (e.g., https://api.example.com)")
flag.StringVar(&modelName, "model", "", "Model name to use")
flag.IntVar(&totalRequests, "requests", 1, "Total number of requests to send")
flag.IntVar(&parallelization, "par", 1, "Maximum number of concurrent requests")
flag.Parse()

if baseurl == "" || modelName == "" {
fmt.Println("Error: both -baseurl and -model are required.")
flag.Usage()
os.Exit(1)
}
if totalRequests <= 0 {
fmt.Println("Error: -requests must be greater than 0.")
os.Exit(1)
}
if parallelization <= 0 {
fmt.Println("Error: -parallelization must be greater than 0.")
os.Exit(1)
}

// ----- HTTP client -------------------------------------------------------
client := &http.Client{
Timeout: 30 * time.Second,
}

// ----- Tracking response codes -------------------------------------------
statusCounts := make(map[int]int) // map[statusCode]count
var mu sync.Mutex // protects statusCounts

// ----- Request queue (buffered channel) ----------------------------------
requests := make(chan int, 10) // Buffered channel with capacity 10

// Goroutine to fill the request queue
go func() {
for i := 0; i < totalRequests; i++ {
requests <- i + 1
}
close(requests)
}()

// ----- Worker pool -------------------------------------------------------
var wg sync.WaitGroup
for i := 0; i < parallelization; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()

for reqID := range requests {
// Build request payload as a single line JSON string
payload := `{"model":"` + modelName + `","max_tokens":100,"stream":false,"messages":[{"role":"user","content":"write a snake game in python"}]}`

// Send POST request
req, err := http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/chat/completions", baseurl),
bytes.NewReader([]byte(payload)))
if err != nil {
log.Printf("[worker %d][req %d] request creation error: %v", workerID, reqID, err)
mu.Lock()
statusCounts[-1]++
mu.Unlock()
continue
}
req.Header.Set("Content-Type", "application/json")

resp, err := client.Do(req)
if err != nil {
log.Printf("[worker %d][req %d] HTTP request error: %v", workerID, reqID, err)
mu.Lock()
statusCounts[-1]++
mu.Unlock()
continue
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()

// Record status code
mu.Lock()
statusCounts[resp.StatusCode]++
mu.Unlock()
}
}(i + 1)
}

// ----- Status ticker (prints every second) -------------------------------
done := make(chan struct{})
tickerDone := make(chan struct{})
go func() {
ticker := time.NewTicker(1 * time.Second)
startTime := time.Now()
for {
select {
case <-ticker.C:
mu.Lock()
// Compute how many requests have completed so far
completed := 0
for _, cnt := range statusCounts {
completed += cnt
}
// Calculate duration and progress
duration := time.Since(startTime)
progress := completed * 100 / totalRequests
fmt.Printf("Duration: %v, Completed: %d%% requests\n", duration, progress)
mu.Unlock()
case <-done:
duration := time.Since(startTime)
fmt.Printf("Duration: %v, Completed: %d%% requests\n", duration, 100)
close(tickerDone)
return
}
}
}()

// Wait for all workers to finish
wg.Wait()
close(done) // stops the status-update goroutine
<-tickerDone // give ticker time to finish / print

// ----- Summary ------------------------------------------------------------
fmt.Println("\n\n=== HTTP response code summary ===")
mu.Lock()
for code, cnt := range statusCounts {
if code == -1 {
fmt.Printf("Client-side errors (no HTTP response): %d\n", cnt)
} else {
fmt.Printf("%d : %d\n", code, cnt)
}
}
mu.Unlock()
}
8 changes: 3 additions & 5 deletions proxy/proxymanager_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
}
}

sendMetrics := func(metrics TokenMetrics) {
sendMetrics := func(metrics []TokenMetrics) {
jsonData, err := json.Marshal(metrics)
if err == nil {
select {
Expand Down Expand Up @@ -168,16 +168,14 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
* Send Metrics data
*/
defer event.On(func(e TokenMetricsEvent) {
sendMetrics(e.Metrics)
sendMetrics([]TokenMetrics{e.Metrics})
})()

// send initial batch of data
sendLogData("proxy", pm.proxyLogger.GetHistory())
sendLogData("upstream", pm.upstreamLogger.GetHistory())
sendModels()
for _, metrics := range pm.metricsMonitor.GetMetrics() {
sendMetrics(metrics)
}
sendMetrics(pm.metricsMonitor.GetMetrics())

for {
select {
Expand Down
2 changes: 2 additions & 0 deletions ui/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { APIProvider } from "./contexts/APIProvider";
import LogViewerPage from "./pages/LogViewer";
import ModelPage from "./pages/Models";
import ActivityPage from "./pages/Activity";
import ConnectionStatus from "./components/ConnectionStatus";
import { RiSunFill, RiMoonFill } from "react-icons/ri";

function App() {
Expand Down Expand Up @@ -31,6 +32,7 @@ function App() {
<button className="" onClick={toggleTheme}>
{isDarkMode ? <RiMoonFill /> : <RiSunFill />}
</button>
<ConnectionStatus />
</div>
</div>
</nav>
Expand Down
36 changes: 36 additions & 0 deletions ui/src/components/ConnectionStatus.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { useAPI } from "../contexts/APIProvider";
import { useEffect, useState, useMemo } from "react";

type ConnectionStatus = "disconnected" | "connecting" | "connected";

const ConnectionStatus = () => {
const { getConnectionStatus } = useAPI();
const [eventStreamStatus, setEventStreamStatus] = useState<ConnectionStatus>("disconnected");

useEffect(() => {
const interval = setInterval(() => {
setEventStreamStatus(getConnectionStatus());
}, 1000);
return () => clearInterval(interval);
});

const eventStatusColor = useMemo(() => {
switch (eventStreamStatus) {
case "connected":
return "bg-green-500";
case "connecting":
return "bg-yellow-500";
case "disconnected":
default:
return "bg-red-500";
}
}, [eventStreamStatus]);

return (
<div className="flex items-center" title={`event stream: ${eventStreamStatus}`}>
<span className={`inline-block w-3 h-3 rounded-full ${eventStatusColor} mr-2`}></span>
</div>
);
};

export default ConnectionStatus;
24 changes: 22 additions & 2 deletions ui/src/contexts/APIProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ interface APIProviderType {
proxyLogs: string;
upstreamLogs: string;
metrics: Metrics[];
getConnectionStatus: () => "connected" | "connecting" | "disconnected";
}

interface Metrics {
Expand Down Expand Up @@ -63,6 +64,16 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
});
}, []);

const getConnectionStatus = useCallback(() => {
if (apiEventSource.current?.readyState === EventSource.OPEN) {
return "connected";
} else if (apiEventSource.current?.readyState === EventSource.CONNECTING) {
return "connecting";
} else {
return "disconnected";
}
}, []);

const enableAPIEvents = useCallback((enabled: boolean) => {
if (!enabled) {
apiEventSource.current?.close();
Expand All @@ -77,6 +88,14 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
const connect = () => {
const eventSource = new EventSource("/api/events");

eventSource.onopen = () => {
// clear everything out on connect to keep things in sync
setProxyLogs("");
setUpstreamLogs("");
setMetrics([]); // clear metrics on reconnect
setModels([]); // clear models on reconnect
};

eventSource.onmessage = (e: MessageEvent) => {
try {
const message = JSON.parse(e.data) as APIEventEnvelope;
Expand Down Expand Up @@ -108,9 +127,9 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider

case "metrics":
{
const newMetric = JSON.parse(message.data) as Metrics;
const newMetrics = JSON.parse(message.data) as Metrics[];
setMetrics((prevMetrics) => {
return [newMetric, ...prevMetrics];
return [...newMetrics, ...prevMetrics];
Comment on lines +130 to +132
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Batch handling for metrics should dedupe by id, cap length, and be backward-compatible with single-object payloads.

Current logic can:

  • Duplicate entries when the backend re-sends overlapping batches (common with replay-on-reconnect).
  • Grow unbounded in memory.
  • Break if a single Metrics object is sent (during transition or bugs).

Apply this diff to harden the handler:

-                const newMetrics = JSON.parse(message.data) as Metrics[];
-                setMetrics((prevMetrics) => {
-                  return [...newMetrics, ...prevMetrics];
-                });
+                const parsed = JSON.parse(message.data);
+                const incoming: Metrics[] = Array.isArray(parsed) ? parsed : [parsed];
+                setMetrics((prevMetrics) => {
+                  // Deduplicate by id and cap list size to avoid unbounded growth
+                  const seen = new Set(prevMetrics.map((m) => m.id));
+                  const merged = incoming.filter((m) => !seen.has(m.id)).concat(prevMetrics);
+                  return merged.slice(0, MAX_METRICS);
+                });

Add a top-level constant near LOG_LENGTH_LIMIT:

// Near top-level constants
const MAX_METRICS = 2000; // cap to prevent unbounded memory growth on the Activity page
🤖 Prompt for AI Agents
In ui/src/contexts/APIProvider.tsx around lines 119 to 121, the metrics handler
naively parses message.data and prepends newMetrics causing duplicates,
unbounded growth, and failures when a single Metrics object is sent; add a
top-level constant near LOG_LENGTH_LIMIT: const MAX_METRICS = 2000, then update
the handler to: parse message.data and normalize it to an array if a single
object is received, merge the incoming array with existing metrics while
deduplicating by id (prefer the newer entry from the incoming batch), maintain
desired ordering (new first), and finally trim the resulting array to
MAX_METRICS before calling setMetrics.

});
}
break;
Expand Down Expand Up @@ -194,6 +213,7 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider
proxyLogs,
upstreamLogs,
metrics,
getConnectionStatus,
}),
[models, listModels, unloadAllModels, loadModel, enableAPIEvents, proxyLogs, upstreamLogs, metrics]
);
Expand Down
27 changes: 7 additions & 20 deletions ui/src/pages/Activity.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { useState, useEffect } from "react";
import { useMemo } from "react";
import { useAPI } from "../contexts/APIProvider";

const formatTimestamp = (timestamp: string): string => {
Expand All @@ -15,25 +15,10 @@ const formatDuration = (ms: number): string => {

const ActivityPage = () => {
const { metrics } = useAPI();
const [error, setError] = useState<string | null>(null);

useEffect(() => {
if (metrics.length > 0) {
setError(null);
}
const sortedMetrics = useMemo(() => {
return [...metrics].sort((a, b) => b.id - a.id);
}, [metrics]);

if (error) {
return (
<div className="p-6">
<h1 className="text-2xl font-bold mb-4">Activity</h1>
<div className="bg-red-50 border border-red-200 rounded-md p-4">
<p className="text-red-800">{error}</p>
</div>
</div>
);
}

return (
<div className="p-6">
<h1 className="text-2xl font-bold mb-4">Activity</h1>
Expand All @@ -47,6 +32,7 @@ const ActivityPage = () => {
<table className="min-w-full divide-y">
<thead>
<tr>
<th className="px-4 py-3 text-left text-xs font-medium uppercase tracking-wider">Id</th>
<th className="px-6 py-3 text-left text-xs font-medium uppercase tracking-wider">Timestamp</th>
<th className="px-6 py-3 text-left text-xs font-medium uppercase tracking-wider">Model</th>
<th className="px-6 py-3 text-left text-xs font-medium uppercase tracking-wider">Input Tokens</th>
Expand All @@ -57,8 +43,9 @@ const ActivityPage = () => {
</tr>
</thead>
<tbody className="divide-y">
{metrics.map((metric, index) => (
<tr key={`${metric.id}-${index}`}>
{sortedMetrics.map((metric) => (
<tr key={`metric_${metric.id}`}>
<td className="px-4 py-4 whitespace-nowrap text-sm">{metric.id + 1 /* un-zero index */}</td>
<td className="px-6 py-4 whitespace-nowrap text-sm">{formatTimestamp(metric.timestamp)}</td>
<td className="px-6 py-4 whitespace-nowrap text-sm">{metric.model}</td>
<td className="px-6 py-4 whitespace-nowrap text-sm">{metric.input_tokens.toLocaleString()}</td>
Expand Down
Loading