diff --git a/tests/integration/defs/accuracy/test_disaggregated_serving.py b/tests/integration/defs/accuracy/test_disaggregated_serving.py index c9f94aa3fbe..2972e0ad4b3 100644 --- a/tests/integration/defs/accuracy/test_disaggregated_serving.py +++ b/tests/integration/defs/accuracy/test_disaggregated_serving.py @@ -122,6 +122,21 @@ def launch_disaggregated_llm( f"Using unified tp parameter for testing is not recommended. Please use server configs instead." ) + enable_perf = True + perf_max_requests = 10000 + + def _apply_perf_flags(cfg: Optional[Dict[str, Any]]): + if not isinstance(cfg, dict): + return + if enable_perf: + # Only set these if the switch is enabled. + # Use `setdefault` so explicit per-test overrides are preserved. + cfg.setdefault("return_perf_metrics", True) + cfg.setdefault("perf_metrics_max_requests", perf_max_requests) + + _apply_perf_flags(disaggregated_server_config) + _apply_perf_flags(ctx_server_config) + _apply_perf_flags(gen_server_config) disaggregated_server_config = revise_disaggregated_server_config_urls_with_free_ports( disaggregated_server_config) @@ -183,12 +198,16 @@ def launch_disaggregated_llm( ctx_servers = [] current_gpu_offset = 0 + kv_cache_perf_dir = os.path.join(temp_dir.name, "kv_cache_perf") + for i, port in enumerate(ctx_ports): - env_ctx = os.environ.copy() - env_ctx["TRTLLM_USE_UCX_KVCACHE"] = "1" + env = os.environ.copy() + env["TRTLLM_USE_UCX_KVCACHE"] = "1" + if enable_perf: + env["TRTLLM_KVCACHE_TIME_OUTPUT_PATH"] = kv_cache_perf_dir gpu_range = range(current_gpu_offset, current_gpu_offset + ctx_total_gpus) - env_ctx["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range)) + env["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range)) current_gpu_offset += ctx_total_gpus ctx_server_args = ctx_args + [ @@ -200,16 +219,18 @@ def launch_disaggregated_llm( ctx_server_args.append( f"--max_num_tokens={ctx_server_config['max_num_tokens']}") - ctx_servers.append((env_ctx, ctx_server_args)) + ctx_servers.append((env, ctx_server_args)) gen_servers = [] for i, port in enumerate(gen_ports): - env_gen = os.environ.copy() - env_gen["TRTLLM_USE_UCX_KVCACHE"] = "1" + env = os.environ.copy() + env["TRTLLM_USE_UCX_KVCACHE"] = "1" + if enable_perf: + env["TRTLLM_KVCACHE_TIME_OUTPUT_PATH"] = kv_cache_perf_dir gpu_range = range(current_gpu_offset, current_gpu_offset + gen_total_gpus) - env_gen["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range)) + env["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range)) current_gpu_offset += gen_total_gpus gen_server_args = gen_args + [ @@ -221,7 +242,7 @@ def launch_disaggregated_llm( gen_server_args.append( f"--max_num_tokens={gen_server_config['max_num_tokens']}") - gen_servers.append((env_gen, gen_server_args)) + gen_servers.append((env, gen_server_args)) @contextlib.contextmanager def multi_popen(server_configs, server_name="", enable_redirect_log=False): @@ -341,8 +362,43 @@ def generate_async(prompt: str, thread_pool.futures.append(future) return future + def _get_perf_metrics(): + path = "/perf_metrics" + perf_url = f"http://localhost:8000{path}" + try: + print(f"Fetching perf metrics from {perf_url}") + resp = requests.get(perf_url, timeout=10) + if resp.status_code == 200: + try: + metrics = resp.json() + print("perf_metrics JSON:") + print(json.dumps(metrics, indent=2, ensure_ascii=False)) + except ValueError: + print("perf_metrics returned non-JSON response:", + resp.text) + else: + print( + f"perf_metrics returned status {resp.status_code}: {resp.text}" + ) + except requests.exceptions.RequestException as e: + print(f"Error fetching {perf_url}: {e}") + + def _show_kvcache_time(kv_cache_perf_dir, max_lines=1000): + print(f"kv_cache_perf_dir: {kv_cache_perf_dir}") + for file in os.listdir(kv_cache_perf_dir): + print(f"file: {file}") + print(f"{'-'*25} {file}:{max_lines} {'-'*25}") + with open(os.path.join(kv_cache_perf_dir, file), "r") as f: + for line in f.readlines()[-max_lines:]: + print(line.strip()) + tokenizer = load_hf_tokenizer(model_name) - yield DuckLLM(args, tokenizer, generate_async) + try: + yield DuckLLM(args, tokenizer, generate_async) + finally: + if enable_perf: + _show_kvcache_time(kv_cache_perf_dir) + _get_perf_metrics() def run_parallel_test(model_name: str,