Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 65 additions & 9 deletions tests/integration/defs/accuracy/test_disaggregated_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,21 @@ def launch_disaggregated_llm(
f"Using unified tp parameter for testing is not recommended. Please use server configs instead."
)

enable_perf = True
perf_max_requests = 10000

def _apply_perf_flags(cfg: Optional[Dict[str, Any]]):
if not isinstance(cfg, dict):
return
if enable_perf:
# Only set these if the switch is enabled.
# Use `setdefault` so explicit per-test overrides are preserved.
cfg.setdefault("return_perf_metrics", True)
cfg.setdefault("perf_metrics_max_requests", perf_max_requests)

_apply_perf_flags(disaggregated_server_config)
_apply_perf_flags(ctx_server_config)
_apply_perf_flags(gen_server_config)
disaggregated_server_config = revise_disaggregated_server_config_urls_with_free_ports(
disaggregated_server_config)

Expand Down Expand Up @@ -183,12 +198,16 @@ def launch_disaggregated_llm(
ctx_servers = []
current_gpu_offset = 0

kv_cache_perf_dir = os.path.join(temp_dir.name, "kv_cache_perf")

for i, port in enumerate(ctx_ports):
env_ctx = os.environ.copy()
env_ctx["TRTLLM_USE_UCX_KVCACHE"] = "1"
env = os.environ.copy()
env["TRTLLM_USE_UCX_KVCACHE"] = "1"
if enable_perf:
env["TRTLLM_KVCACHE_TIME_OUTPUT_PATH"] = kv_cache_perf_dir
gpu_range = range(current_gpu_offset,
current_gpu_offset + ctx_total_gpus)
env_ctx["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range))
env["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range))
current_gpu_offset += ctx_total_gpus

ctx_server_args = ctx_args + [
Expand All @@ -200,16 +219,18 @@ def launch_disaggregated_llm(
ctx_server_args.append(
f"--max_num_tokens={ctx_server_config['max_num_tokens']}")

ctx_servers.append((env_ctx, ctx_server_args))
ctx_servers.append((env, ctx_server_args))

gen_servers = []

for i, port in enumerate(gen_ports):
env_gen = os.environ.copy()
env_gen["TRTLLM_USE_UCX_KVCACHE"] = "1"
env = os.environ.copy()
env["TRTLLM_USE_UCX_KVCACHE"] = "1"
if enable_perf:
env["TRTLLM_KVCACHE_TIME_OUTPUT_PATH"] = kv_cache_perf_dir
gpu_range = range(current_gpu_offset,
current_gpu_offset + gen_total_gpus)
env_gen["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range))
env["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range))
current_gpu_offset += gen_total_gpus

gen_server_args = gen_args + [
Expand All @@ -221,7 +242,7 @@ def launch_disaggregated_llm(
gen_server_args.append(
f"--max_num_tokens={gen_server_config['max_num_tokens']}")

gen_servers.append((env_gen, gen_server_args))
gen_servers.append((env, gen_server_args))

@contextlib.contextmanager
def multi_popen(server_configs, server_name="", enable_redirect_log=False):
Expand Down Expand Up @@ -341,8 +362,43 @@ def generate_async(prompt: str,
thread_pool.futures.append(future)
return future

def _get_perf_metrics():
path = "/perf_metrics"
perf_url = f"http://localhost:8000{path}"
try:
print(f"Fetching perf metrics from {perf_url}")
resp = requests.get(perf_url, timeout=10)
if resp.status_code == 200:
try:
metrics = resp.json()
print("perf_metrics JSON:")
print(json.dumps(metrics, indent=2, ensure_ascii=False))
except ValueError:
print("perf_metrics returned non-JSON response:",
resp.text)
else:
print(
f"perf_metrics returned status {resp.status_code}: {resp.text}"
)
except requests.exceptions.RequestException as e:
print(f"Error fetching {perf_url}: {e}")

def _show_kvcache_time(kv_cache_perf_dir, max_lines=1000):
print(f"kv_cache_perf_dir: {kv_cache_perf_dir}")
for file in os.listdir(kv_cache_perf_dir):
print(f"file: {file}")
print(f"{'-'*25} {file}:{max_lines} {'-'*25}")
with open(os.path.join(kv_cache_perf_dir, file), "r") as f:
for line in f.readlines()[-max_lines:]:
print(line.strip())

tokenizer = load_hf_tokenizer(model_name)
yield DuckLLM(args, tokenizer, generate_async)
try:
yield DuckLLM(args, tokenizer, generate_async)
finally:
if enable_perf:
_show_kvcache_time(kv_cache_perf_dir)
_get_perf_metrics()


def run_parallel_test(model_name: str,
Expand Down