diff --git a/.github/workflows/pr-test.yml b/.github/workflows/pr-test.yml index 4440ce71d786..fc008715cf56 100644 --- a/.github/workflows/pr-test.yml +++ b/.github/workflows/pr-test.yml @@ -712,6 +712,18 @@ jobs: cd test/srt python3 -m unittest test_bench_serving.TestBenchServing.test_score_api_batch_scaling + - name: Benchmark Embeddings online latency and throughput + timeout-minutes: 10 + run: | + cd test/srt + python3 -m unittest test_bench_serving.TestBenchServing.test_embeddings_api_latency_throughput + + - name: Benchmark Embeddings online latency and throughput (batch size scaling) + timeout-minutes: 10 + run: | + cd test/srt + python3 -m unittest test_bench_serving.TestBenchServing.test_embeddings_api_batch_scaling + performance-test-2-gpu: needs: [check-changes, unit-test-backend-2-gpu, sgl-kernel-build-wheels] if: always() && !failure() && !cancelled() && diff --git a/python/sglang/test/test_utils.py b/python/sglang/test/test_utils.py index b34544d57782..a7fa5eae6a12 100644 --- a/python/sglang/test/test_utils.py +++ b/python/sglang/test/test_utils.py @@ -844,6 +844,79 @@ async def _run(): return res +async def _run_api_benchmark_requests( + base_url: str, + endpoint: str, + test_requests: List[dict], + num_requests: int, + response_validator: Callable[[dict], bool], +): + """ + Helper function to run API benchmark requests and collect metrics. + + Args: + base_url: The base URL of the server + endpoint: The API endpoint to test (e.g., "/v1/score", "/v1/embeddings") + test_requests: List of request payloads to send + num_requests: Total number of requests expected + response_validator: Function to validate if response contains expected data + + Returns: + Dictionary with benchmark metrics + """ + start_time = time.monotonic() + successful_requests = 0 + total_latency = 0 + latencies = [] + + async with aiohttp.ClientSession() as session: + for request_data in test_requests: + try: + request_start = time.monotonic() + async with session.post( + f"{base_url}{endpoint}", + json=request_data, + timeout=aiohttp.ClientTimeout(total=30), + ) as response: + if response.status == 200: + response_data = await response.json() + request_end = time.monotonic() + + if response_validator(response_data): + latency_ms = (request_end - request_start) * 1000 + latencies.append(latency_ms) + total_latency += latency_ms + successful_requests += 1 + except Exception: + continue + + end_time = time.monotonic() + total_time = end_time - start_time + + if successful_requests > 0: + throughput = successful_requests / total_time + avg_latency = total_latency / successful_requests + p95_latency = np.percentile(latencies, 95) if latencies else 0 + + return { + "completed": successful_requests, + "total_requests": num_requests, + "throughput": throughput, + "avg_latency_ms": avg_latency, + "p95_latency_ms": p95_latency, + "successful_requests": successful_requests, + } + else: + return { + "completed": 0, + "total_requests": num_requests, + "throughput": 0, + "avg_latency_ms": 0, + "p95_latency_ms": 0, + "successful_requests": 0, + } + + def run_score_benchmark( model, num_requests=100, @@ -929,59 +1002,110 @@ def generate_text_with_token_count(num_tokens): } test_requests.append(score_data) - start_time = time.monotonic() - successful_requests = 0 - total_latency = 0 - latencies = [] + # Run benchmark requests using shared helper + return await _run_api_benchmark_requests( + base_url=base_url, + endpoint="/v1/score", + test_requests=test_requests, + num_requests=num_requests, + response_validator=lambda resp: "scores" in resp or "logprobs" in resp, + ) - async with aiohttp.ClientSession() as session: - for request_data in test_requests: + try: + res = asyncio.run(_run_benchmark()) + finally: + kill_process_tree(process.pid) + + assert res["completed"] == res["successful_requests"] + return res + + +def run_embeddings_benchmark( + model, + num_requests=100, + batch_size=1, + input_tokens=500, + other_server_args=None, + need_warmup=False, + device="auto", +): + """Embeddings API benchmark function compatible with run_bench_serving pattern""" + if other_server_args is None: + other_server_args = [] + + if device == "auto": + device = auto_config_device() + + # Add --is-embedding flag for embedding models + server_args = ["--is-embedding"] + other_server_args + + # Launch the server (consistent with run_bench_serving) + base_url = DEFAULT_URL_FOR_TEST + process = popen_launch_server( + model, + base_url, + timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH, + other_args=server_args, + ) + + async def _run_benchmark(): + + # Load tokenizer for generating test data + from sglang.srt.utils.hf_transformers_utils import get_tokenizer + + tokenizer = get_tokenizer(model) + + def generate_text_with_token_count(num_tokens): + """Generate text with precise token count using special tokens.""" + # Use a token that reliably produces 1 token + special_token = "<|im_start|>" + # Verify it's a single token + test_tokens = tokenizer.encode(special_token, add_special_tokens=False) + text = special_token * num_tokens + return text + + # Generate input text + input_text = generate_text_with_token_count(input_tokens) + + if need_warmup: + warmup_data = { + "input": input_text, + "model": model, + } + + async with aiohttp.ClientSession() as session: try: - request_start = time.monotonic() - async with session.post( - f"{base_url}/v1/score", - json=request_data, + await session.post( + f"{base_url}/v1/embeddings", + json=warmup_data, timeout=aiohttp.ClientTimeout(total=30), - ) as response: - if response.status == 200: - response_data = await response.json() - request_end = time.monotonic() - - if "scores" in response_data or "logprobs" in response_data: - latency_ms = (request_end - request_start) * 1000 - latencies.append(latency_ms) - total_latency += latency_ms - successful_requests += 1 - except Exception: - continue - - end_time = time.monotonic() - total_time = end_time - start_time - - if successful_requests > 0: - throughput = successful_requests / total_time - avg_latency = total_latency / successful_requests - latencies.sort() - p95_latency = latencies[int(len(latencies) * 0.95)] if latencies else 0 - - return { - "completed": successful_requests, - "total_requests": num_requests, - "throughput": throughput, - "avg_latency_ms": avg_latency, - "p95_latency_ms": p95_latency, - "successful_requests": successful_requests, - } - else: - return { - "completed": 0, - "total_requests": num_requests, - "throughput": 0, - "avg_latency_ms": 0, - "p95_latency_ms": 0, - "successful_requests": 0, + ) + except: + pass # Ignore warmup errors + + test_requests = [] + for i in range(num_requests): + if batch_size == 1: + input_data = input_text + else: + input_data = [input_text for _ in range(batch_size)] + + embeddings_data = { + "input": input_data, + "model": model, } + test_requests.append(embeddings_data) + + # Run benchmark requests using shared helper + return await _run_api_benchmark_requests( + base_url=base_url, + endpoint="/v1/embeddings", + test_requests=test_requests, + num_requests=num_requests, + response_validator=lambda resp: "data" in resp, + ) + try: res = asyncio.run(_run_benchmark()) finally: diff --git a/test/srt/test_bench_serving.py b/test/srt/test_bench_serving.py index d78c16ba8c25..69ef3feaa053 100644 --- a/test/srt/test_bench_serving.py +++ b/test/srt/test_bench_serving.py @@ -10,12 +10,14 @@ DEFAULT_MODEL_NAME_FOR_TEST, DEFAULT_MODEL_NAME_FOR_TEST_FP8, DEFAULT_MOE_MODEL_NAME_FOR_TEST, + DEFAULT_SMALL_EMBEDDING_MODEL_NAME_FOR_TEST, DEFAULT_SMALL_MODEL_NAME_FOR_TEST_SCORE, DEFAULT_SMALL_VLM_MODEL_NAME_FOR_TEST, CustomTestCase, is_in_amd_ci, is_in_ci, run_bench_serving, + run_embeddings_benchmark, run_score_benchmark, write_github_step_summary, ) @@ -488,23 +490,71 @@ def test_score_api_batch_scaling(self): ) self.assertEqual(res["successful_requests"], res["total_requests"]) - if batch_size == 10: - avg_latency_bound = 45 - elif batch_size == 25: - avg_latency_bound = 50 - elif batch_size == 50: - avg_latency_bound = 60 - else: - avg_latency_bound = 60 + bounds = { + 10: (45, 50), + 25: (50, 60), + 50: (60, 65), + } + avg_latency_bound, p95_latency_bound = bounds.get(batch_size, (60, 65)) + self.assertLess(res["avg_latency_ms"], avg_latency_bound) + self.assertLess(res["p95_latency_ms"], p95_latency_bound) + + def test_embeddings_api_latency_throughput(self): + """Test embeddings API latency and throughput performance""" + res = run_embeddings_benchmark( + model=DEFAULT_SMALL_EMBEDDING_MODEL_NAME_FOR_TEST, + num_requests=1000, + batch_size=1, + input_tokens=500, + other_server_args=[], + need_warmup=True, + ) + + if is_in_ci(): + write_github_step_summary( + f"### test_embeddings_api_throughput\n" + f"Average latency: {res['avg_latency_ms']:.2f} ms\n" + f"P95 latency: {res['p95_latency_ms']:.2f} ms\n" + f"Embeddings API throughput: {res['throughput']:.2f} req/s\n" + f"Successful requests: {res['successful_requests']}/{res['total_requests']}\n" + ) + + self.assertEqual(res["successful_requests"], res["total_requests"]) + # Bounds based on actual performance on 1xH100: avg=15ms, p95=15ms, throughput=67req/s + self.assertLess(res["avg_latency_ms"], 20) + self.assertLess(res["p95_latency_ms"], 25) + self.assertGreater(res["throughput"], 60) + + def test_embeddings_api_batch_scaling(self): + """Test embeddings API performance with different batch sizes""" + batch_sizes = [10, 25, 50] + + for batch_size in batch_sizes: + res = run_embeddings_benchmark( + model=DEFAULT_SMALL_EMBEDDING_MODEL_NAME_FOR_TEST, + num_requests=500, + batch_size=batch_size, + input_tokens=500, + ) + + if is_in_ci(): + write_github_step_summary( + f"### test_embeddings_api_batch_scaling_size_{batch_size}\n" + f"Batch size: {batch_size}\n" + f"Average latency: {res['avg_latency_ms']:.2f} ms\n" + f"P95 latency: {res['p95_latency_ms']:.2f} ms\n" + f"Throughput: {res['throughput']:.2f} req/s\n" + f"Successful requests: {res['successful_requests']}/{res['total_requests']}\n" + ) + + self.assertEqual(res["successful_requests"], res["total_requests"]) + bounds = { + 10: (60, 65), + 25: (115, 120), + 50: (190, 195), + } + avg_latency_bound, p95_latency_bound = bounds.get(batch_size, (250, 250)) self.assertLess(res["avg_latency_ms"], avg_latency_bound) - if batch_size == 10: - p95_latency_bound = 50 - elif batch_size == 25: - p95_latency_bound = 60 - elif batch_size == 50: - p95_latency_bound = 65 - else: - p95_latency_bound = 65 self.assertLess(res["p95_latency_ms"], p95_latency_bound)