Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,18 @@ jobs:
cd test/srt
python3 -m unittest test_bench_serving.TestBenchServing.test_score_api_batch_scaling

- name: Benchmark Embeddings online latency and throughput
timeout-minutes: 10
run: |
cd test/srt
python3 -m unittest test_bench_serving.TestBenchServing.test_embeddings_api_latency_throughput

- name: Benchmark Embeddings online latency and throughput (batch size scaling)
timeout-minutes: 10
run: |
cd test/srt
python3 -m unittest test_bench_serving.TestBenchServing.test_embeddings_api_batch_scaling

performance-test-2-gpu:
needs: [check-changes, unit-test-backend-2-gpu, sgl-kernel-build-wheels]
if: always() && !failure() && !cancelled() &&
Expand Down
220 changes: 172 additions & 48 deletions python/sglang/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,79 @@ async def _run():
return res


async def _run_api_benchmark_requests(
base_url: str,
endpoint: str,
test_requests: List[dict],
num_requests: int,
response_validator: Callable[[dict], bool],
):
"""
Helper function to run API benchmark requests and collect metrics.

Args:
base_url: The base URL of the server
endpoint: The API endpoint to test (e.g., "/v1/score", "/v1/embeddings")
test_requests: List of request payloads to send
num_requests: Total number of requests expected
response_validator: Function to validate if response contains expected data

Returns:
Dictionary with benchmark metrics
"""
start_time = time.monotonic()
successful_requests = 0
total_latency = 0
latencies = []

async with aiohttp.ClientSession() as session:
for request_data in test_requests:
try:
request_start = time.monotonic()
async with session.post(
f"{base_url}{endpoint}",
json=request_data,
timeout=aiohttp.ClientTimeout(total=30),
) as response:
if response.status == 200:
response_data = await response.json()
request_end = time.monotonic()

if response_validator(response_data):
latency_ms = (request_end - request_start) * 1000
latencies.append(latency_ms)
total_latency += latency_ms
successful_requests += 1
except Exception:
continue

end_time = time.monotonic()
total_time = end_time - start_time

if successful_requests > 0:
throughput = successful_requests / total_time
avg_latency = total_latency / successful_requests
p95_latency = np.percentile(latencies, 95) if latencies else 0

return {
"completed": successful_requests,
"total_requests": num_requests,
"throughput": throughput,
"avg_latency_ms": avg_latency,
"p95_latency_ms": p95_latency,
"successful_requests": successful_requests,
}
else:
return {
"completed": 0,
"total_requests": num_requests,
"throughput": 0,
"avg_latency_ms": 0,
"p95_latency_ms": 0,
"successful_requests": 0,
}


def run_score_benchmark(
model,
num_requests=100,
Expand Down Expand Up @@ -929,59 +1002,110 @@ def generate_text_with_token_count(num_tokens):
}
test_requests.append(score_data)

start_time = time.monotonic()
successful_requests = 0
total_latency = 0
latencies = []
# Run benchmark requests using shared helper
return await _run_api_benchmark_requests(
base_url=base_url,
endpoint="/v1/score",
test_requests=test_requests,
num_requests=num_requests,
response_validator=lambda resp: "scores" in resp or "logprobs" in resp,
)

async with aiohttp.ClientSession() as session:
for request_data in test_requests:
try:
res = asyncio.run(_run_benchmark())
finally:
kill_process_tree(process.pid)

assert res["completed"] == res["successful_requests"]
return res


def run_embeddings_benchmark(
model,
num_requests=100,
batch_size=1,
input_tokens=500,
other_server_args=None,
need_warmup=False,
device="auto",
):
"""Embeddings API benchmark function compatible with run_bench_serving pattern"""
if other_server_args is None:
other_server_args = []

if device == "auto":
device = auto_config_device()

# Add --is-embedding flag for embedding models
server_args = ["--is-embedding"] + other_server_args

# Launch the server (consistent with run_bench_serving)
base_url = DEFAULT_URL_FOR_TEST
process = popen_launch_server(
model,
base_url,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
other_args=server_args,
)

async def _run_benchmark():

# Load tokenizer for generating test data
from sglang.srt.utils.hf_transformers_utils import get_tokenizer

tokenizer = get_tokenizer(model)

def generate_text_with_token_count(num_tokens):
"""Generate text with precise token count using special tokens."""
# Use a token that reliably produces 1 token
special_token = "<|im_start|>"
# Verify it's a single token
test_tokens = tokenizer.encode(special_token, add_special_tokens=False)
text = special_token * num_tokens
return text

# Generate input text
input_text = generate_text_with_token_count(input_tokens)

if need_warmup:
warmup_data = {
"input": input_text,
"model": model,
}

async with aiohttp.ClientSession() as session:
try:
request_start = time.monotonic()
async with session.post(
f"{base_url}/v1/score",
json=request_data,
await session.post(
f"{base_url}/v1/embeddings",
json=warmup_data,
timeout=aiohttp.ClientTimeout(total=30),
) as response:
if response.status == 200:
response_data = await response.json()
request_end = time.monotonic()

if "scores" in response_data or "logprobs" in response_data:
latency_ms = (request_end - request_start) * 1000
latencies.append(latency_ms)
total_latency += latency_ms
successful_requests += 1
except Exception:
continue

end_time = time.monotonic()
total_time = end_time - start_time

if successful_requests > 0:
throughput = successful_requests / total_time
avg_latency = total_latency / successful_requests
latencies.sort()
p95_latency = latencies[int(len(latencies) * 0.95)] if latencies else 0

return {
"completed": successful_requests,
"total_requests": num_requests,
"throughput": throughput,
"avg_latency_ms": avg_latency,
"p95_latency_ms": p95_latency,
"successful_requests": successful_requests,
}
else:
return {
"completed": 0,
"total_requests": num_requests,
"throughput": 0,
"avg_latency_ms": 0,
"p95_latency_ms": 0,
"successful_requests": 0,
)
except:
pass # Ignore warmup errors

test_requests = []
for i in range(num_requests):
if batch_size == 1:
input_data = input_text
else:
input_data = [input_text for _ in range(batch_size)]

embeddings_data = {
"input": input_data,
"model": model,
}

test_requests.append(embeddings_data)

# Run benchmark requests using shared helper
return await _run_api_benchmark_requests(
base_url=base_url,
endpoint="/v1/embeddings",
test_requests=test_requests,
num_requests=num_requests,
response_validator=lambda resp: "data" in resp,
)

try:
res = asyncio.run(_run_benchmark())
finally:
Expand Down
82 changes: 66 additions & 16 deletions test/srt/test_bench_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
DEFAULT_MODEL_NAME_FOR_TEST,
DEFAULT_MODEL_NAME_FOR_TEST_FP8,
DEFAULT_MOE_MODEL_NAME_FOR_TEST,
DEFAULT_SMALL_EMBEDDING_MODEL_NAME_FOR_TEST,
DEFAULT_SMALL_MODEL_NAME_FOR_TEST_SCORE,
DEFAULT_SMALL_VLM_MODEL_NAME_FOR_TEST,
CustomTestCase,
is_in_amd_ci,
is_in_ci,
run_bench_serving,
run_embeddings_benchmark,
run_score_benchmark,
write_github_step_summary,
)
Expand Down Expand Up @@ -488,23 +490,71 @@ def test_score_api_batch_scaling(self):
)

self.assertEqual(res["successful_requests"], res["total_requests"])
if batch_size == 10:
avg_latency_bound = 45
elif batch_size == 25:
avg_latency_bound = 50
elif batch_size == 50:
avg_latency_bound = 60
else:
avg_latency_bound = 60
bounds = {
10: (45, 50),
25: (50, 60),
50: (60, 65),
}
avg_latency_bound, p95_latency_bound = bounds.get(batch_size, (60, 65))
self.assertLess(res["avg_latency_ms"], avg_latency_bound)
self.assertLess(res["p95_latency_ms"], p95_latency_bound)

def test_embeddings_api_latency_throughput(self):
"""Test embeddings API latency and throughput performance"""
res = run_embeddings_benchmark(
model=DEFAULT_SMALL_EMBEDDING_MODEL_NAME_FOR_TEST,
num_requests=1000,
batch_size=1,
input_tokens=500,
other_server_args=[],
need_warmup=True,
)

if is_in_ci():
write_github_step_summary(
f"### test_embeddings_api_throughput\n"
f"Average latency: {res['avg_latency_ms']:.2f} ms\n"
f"P95 latency: {res['p95_latency_ms']:.2f} ms\n"
f"Embeddings API throughput: {res['throughput']:.2f} req/s\n"
f"Successful requests: {res['successful_requests']}/{res['total_requests']}\n"
)

self.assertEqual(res["successful_requests"], res["total_requests"])
# Bounds based on actual performance on 1xH100: avg=15ms, p95=15ms, throughput=67req/s
self.assertLess(res["avg_latency_ms"], 20)
self.assertLess(res["p95_latency_ms"], 25)
self.assertGreater(res["throughput"], 60)

def test_embeddings_api_batch_scaling(self):
"""Test embeddings API performance with different batch sizes"""
batch_sizes = [10, 25, 50]

for batch_size in batch_sizes:
res = run_embeddings_benchmark(
model=DEFAULT_SMALL_EMBEDDING_MODEL_NAME_FOR_TEST,
num_requests=500,
batch_size=batch_size,
input_tokens=500,
)

if is_in_ci():
write_github_step_summary(
f"### test_embeddings_api_batch_scaling_size_{batch_size}\n"
f"Batch size: {batch_size}\n"
f"Average latency: {res['avg_latency_ms']:.2f} ms\n"
f"P95 latency: {res['p95_latency_ms']:.2f} ms\n"
f"Throughput: {res['throughput']:.2f} req/s\n"
f"Successful requests: {res['successful_requests']}/{res['total_requests']}\n"
)

self.assertEqual(res["successful_requests"], res["total_requests"])
bounds = {
10: (60, 65),
25: (115, 120),
50: (190, 195),
}
avg_latency_bound, p95_latency_bound = bounds.get(batch_size, (250, 250))
self.assertLess(res["avg_latency_ms"], avg_latency_bound)
if batch_size == 10:
p95_latency_bound = 50
elif batch_size == 25:
p95_latency_bound = 60
elif batch_size == 50:
p95_latency_bound = 65
else:
p95_latency_bound = 65
self.assertLess(res["p95_latency_ms"], p95_latency_bound)


Expand Down
Loading