diff --git a/scripts/profiling/profile.sh b/scripts/profiling/profile.sh index 2c82f307..b150fadc 100755 --- a/scripts/profiling/profile.sh +++ b/scripts/profiling/profile.sh @@ -6,8 +6,8 @@ # This script runs bench_one_batch_server with profiling enabled model_name="deepseek-ai/DeepSeek-R1" -head_node="127.0.0.1" -head_port=30000 +head_node="${HEAD_NODE:-127.0.0.1}" +head_port="${HEAD_PORT:-8000}" # Parse arguments (same as sa-bench for consistency) n_prefill=$1 @@ -26,7 +26,6 @@ echo " Decode GPUs: ${decode_gpus}" echo " Total GPUs: ${total_gpus}" # Wait for server to be ready using inline wait function -echo "Waiting for server at http://${head_node}:${head_port} to be ready..." wait_until_ready() { local SERVER_URL="$1" while true; do @@ -42,7 +41,36 @@ wait_until_ready() { sleep 30 done } -wait_until_ready "http://${head_node}:${head_port}" + +# Parse prefill/decode leader IP lists from environment (comma-separated) +IFS=',' read -r -a PREFILL_IPS <<< "${PROFILE_PREFILL_IPS:-}" +IFS=',' read -r -a DECODE_IPS <<< "${PROFILE_DECODE_IPS:-}" + +wait_all_workers_ready() { + local ips=("$@") + for ip in "${ips[@]}"; do + if [[ -z "${ip}" ]]; then + continue + fi + echo "Waiting for worker at http://${ip}:30000 to be ready..." + wait_until_ready "http://${ip}:30000" + done +} + +# For PD disaggregation, wait for router once and then wait for all worker servers +if [[ "${PROFILING_MODE}" == "prefill" ]]; then + echo "Waiting for router at http://${head_node}:${head_port} to be ready..." + wait_until_ready "http://${head_node}:${head_port}" +fi + +if [[ "${#PREFILL_IPS[@]}" -gt 0 || "${#DECODE_IPS[@]}" -gt 0 ]]; then + echo "Waiting for all profiling workers to be ready..." + wait_all_workers_ready "${PREFILL_IPS[@]}" "${DECODE_IPS[@]}" +else + # Backward-compatible single-node behavior + echo "Waiting for local ${PROFILING_MODE} server at http://127.0.0.1:30000 to be ready..." + wait_until_ready "http://127.0.0.1:30000" +fi # Determine profiling parameters strictly from environment PROFILE_STEPS_ARG="" @@ -71,7 +99,7 @@ echo "$(date '+%Y-%m-%d %H:%M:%S')" # Create profiling output directory only when torch profiler dir is provided ACTIVITIES="" if [[ -n "${SGLANG_TORCH_PROFILER_DIR}" ]]; then - ACTIVITIES='["CPU", "GPU"]' + ACTIVITIES='["CPU", "GPU", "MEM"]' mkdir -p "${SGLANG_TORCH_PROFILER_DIR}" 2>/dev/null || true export SGLANG_TORCH_PROFILER_DIR=${SGLANG_TORCH_PROFILER_DIR} else @@ -81,30 +109,49 @@ fi set -x -curl -X POST http://${head_node}:${head_port}/start_profile -H "Content-Type: application/json" -d "{\"start_step\": \"$PROFILE_START_STEP\", \"num_steps\": $((PROFILE_STOP_STEP-PROFILE_START_STEP)), \"activities\": $ACTIVITIES}" - -python3 -m sglang.bench_serving \ ---backend sglang \ ---model ${model_name} \ ---host ${head_node} --port ${head_port} \ ---dataset-name random \ ---max-concurrency $PROFILE_CONCURRENCY \ ---num-prompts 128 \ ---random-input-len $PROFILE_ISL \ ---random-output-len $PROFILE_OSL \ ---random-range-ratio 1 \ ---warmup-request 10 - -pip install lm-eval tenacity -python -m lm_eval \ ---model local-completions \ ---tasks gsm8k \ ---model_args \ -base_url=http://${head_node}:${head_port}/v1/completions,\ -model=${model_name},\ -tokenized_requests=False,tokenizer_backend=None,\ -num_concurrent=${PROFILE_CONCURRENCY},timeout=6000,max_retries=1 \ ---limit 10 +start_profile_on_worker() { + local ip="$1" + if [[ -z "${ip}" ]]; then + return + fi + echo "Starting profiling on http://${ip}:30000" + curl -X POST "http://${ip}:30000/start_profile" -H "Content-Type: application/json" -d "{\"start_step\": \"$PROFILE_START_STEP\", \"num_steps\": $((PROFILE_STOP_STEP-PROFILE_START_STEP)), \"activities\": $ACTIVITIES}" +} + +if [[ "${#PREFILL_IPS[@]}" -gt 0 || "${#DECODE_IPS[@]}" -gt 0 ]]; then + for ip in "${PREFILL_IPS[@]}"; do + start_profile_on_worker "${ip}" + done + for ip in "${DECODE_IPS[@]}"; do + start_profile_on_worker "${ip}" + done +else + # Fallback to local single-node profiling + echo "Starting profiling on local server http://127.0.0.1:30000" + curl -X POST http://127.0.0.1:30000/start_profile -H "Content-Type: application/json" -d "{\"start_step\": \"$PROFILE_START_STEP\", \"num_steps\": $((PROFILE_STOP_STEP-PROFILE_START_STEP)), \"activities\": $ACTIVITIES}" +fi + +# Only the prefill profiling job needs to generate traffic through the router. +if [[ "${PROFILING_MODE}" == "prefill" ]]; then + python3 -m sglang.bench_serving \ + --backend sglang \ + --model ${model_name} \ + --host ${head_node} --port ${head_port} \ + --dataset-name random \ + --max-concurrency $PROFILE_CONCURRENCY \ + --num-prompts 128 \ + --random-input-len $PROFILE_ISL \ + --random-output-len $PROFILE_OSL \ + --random-range-ratio 1 \ + --warmup-request 5 + + pip install lm-eval tenacity > /dev/null + python -m lm_eval \ + --model local-completions \ + --tasks gsm8k \ + --model_args base_url=http://${head_node}:${head_port}/v1/completions,model=${model_name},tokenized_requests=False,tokenizer_backend=None,num_concurrent=${PROFILE_CONCURRENCY},timeout=6000,max_retries=1 \ + --limit 10 +fi exit_code=$? set +x diff --git a/scripts/templates/job_script_template_disagg.j2 b/scripts/templates/job_script_template_disagg.j2 index 3e12acbb..a527777f 100755 --- a/scripts/templates/job_script_template_disagg.j2 +++ b/scripts/templates/job_script_template_disagg.j2 @@ -2,6 +2,7 @@ #SBATCH --job-name={{ job_name }} #SBATCH --nodes={{ total_nodes }} #SBATCH --ntasks={{ total_nodes }} +#SBATCH --segment={{ total_nodes }} #SBATCH --ntasks-per-node=1 {% if use_gpus_per_node_directive %} #SBATCH --gpus-per-node={{ gpus_per_node }} @@ -57,7 +58,7 @@ for i in "${!nodes[@]}"; do done {% endraw %} -{% if enable_multiple_frontends %} +{% if enable_multiple_frontends and not use_sglang_router %} {% raw %} # Multiple frontend architecture # Node 0: nginx only + prefill shard @@ -142,7 +143,7 @@ echo "Master IP address: $MASTER_IP" # Compute leader nodes for each worker {% endraw %} -{% if enable_multiple_frontends %} +{% if enable_multiple_frontends and not use_sglang_router %} {% raw %} # With multiple frontends: keep offset 0; nginx coexists on node 0 WORKER_NODE_OFFSET=0 @@ -183,7 +184,12 @@ ENROOT_ARGS="\ {% raw %} WORKER_ARGS="--gpu_type ${GPU_TYPE} --gpus_per_node ${GPUS_PER_NODE} --master_ip ${MASTER_IP}" {% endraw %} -{% if enable_multiple_frontends %} +{% if use_sglang_router %} +{% raw %} +WORKER_ARGS="$WORKER_ARGS --use-sglang-router" +{% endraw %} +{% endif %} +{% if enable_multiple_frontends and not use_sglang_router %} {% raw %} # Add multiple frontends flag for worker setup WORKER_ARGS="$WORKER_ARGS --multiple-frontends-enabled" @@ -204,7 +210,7 @@ WORKER_ARGS="$WORKER_ARGS --setup-script {{ setup_script }}" {% raw %} {% endraw %} -{% if enable_multiple_frontends %} +{% if enable_multiple_frontends and not use_sglang_router %} {% raw %} {% endraw %} {% if total_nodes > 1 %} @@ -314,7 +320,7 @@ done echo "" {% endraw %} -{% if enable_multiple_frontends %} +{% if enable_multiple_frontends and not use_sglang_router %} {% raw %} echo "Frontend available at: http://${NGINX_NODE}:8000" echo "To connect to the nginx node:" @@ -330,6 +336,38 @@ echo "srun $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${nodes[0]} --overlap --pty bas {% endif %} {% raw %} +# Launch sglang router when enabled +{% endraw %}{% if use_sglang_router %}{% raw %} +echo "Launching sglang router on ${nodes[0]}" +# Collect leader IPs for prefill and decode +PREFILL_LEADER_IPS=() +for idx in "${prefill_leaders[@]}"; do + node_name=${nodes[$idx]} + ip=$(get_node_ip "$node_name" "$SLURM_JOB_ID" "$NETWORK_INTERFACE") + PREFILL_LEADER_IPS+=("$ip") +done +DECODE_LEADER_IPS=() +for idx in "${decode_leaders[@]}"; do + node_name=${nodes[$idx]} + ip=$(get_node_ip "$node_name" "$SLURM_JOB_ID" "$NETWORK_INTERFACE") + DECODE_LEADER_IPS+=("$ip") +done + +ROUTER_ARGS="--pd-disaggregation" +for ip in "${PREFILL_LEADER_IPS[@]}"; do + ROUTER_ARGS="$ROUTER_ARGS --prefill http://${ip}:30000" +done +for ip in "${DECODE_LEADER_IPS[@]}"; do + ROUTER_ARGS="$ROUTER_ARGS --decode http://${ip}:30000" +done + +ROUTER_NODE=${nodes[0]} +cmd="srun --overlap $ENROOT_ARGS --nodes=1 --ntasks=1 --nodelist=$ROUTER_NODE --output=${LOG_DIR}/${ROUTER_NODE}_router.out python -m sglang_router.launch_router $ROUTER_ARGS --host 0.0.0.0 --port 8000" +echo "$cmd" +$cmd & +{% endraw %}{% endif %} +{% raw %} + echo "" echo "Make sure to cancel the job at the end:" echo "scancel $SLURM_JOB_ID" @@ -349,25 +387,35 @@ srun --nodes=1 --ntasks=1 $ENROOT_ARGS --jobid $SLURM_JOB_ID -w ${nodes[0]} --ou {% if profiler != 'none' %} {% raw %} -# Torch profiling mode: run profiling on prefill and decode workers separately -echo "Starting profiler..." +# Torch/NSYS profiling mode: run a single orchestrator that profiles all prefill and decode workers. +echo "Starting unified profiler..." + +# Collect leader IPs for prefill and decode workers +PREFILL_LEADER_IPS=() +for idx in "${prefill_leaders[@]}"; do + node_name=${nodes[$idx]} + ip=$(get_node_ip "$node_name" "$SLURM_JOB_ID" "$NETWORK_INTERFACE") + PREFILL_LEADER_IPS+=("$ip") +done -# Get leader nodes for first prefill and decode workers -PREFILL_LEADER_NODE=${nodes[${prefill_leaders[0]}]} -DECODE_LEADER_NODE=${nodes[${decode_leaders[0]}]} +DECODE_LEADER_IPS=() +for idx in "${decode_leaders[@]}"; do + node_name=${nodes[$idx]} + ip=$(get_node_ip "$node_name" "$SLURM_JOB_ID" "$NETWORK_INTERFACE") + DECODE_LEADER_IPS+=("$ip") +done -echo "Prefill profiling will run on: $PREFILL_LEADER_NODE" -echo "Decode profiling will run on: $DECODE_LEADER_NODE" +PREFILL_LEADER_IPS_STR=$(IFS=,; echo "${PREFILL_LEADER_IPS[*]}") +DECODE_LEADER_IPS_STR=$(IFS=,; echo "${DECODE_LEADER_IPS[*]}") -# Run prefill profiling on first prefill worker's leader node -srun --nodes=1 --ntasks=1 $ENROOT_ARGS --jobid $SLURM_JOB_ID -w $PREFILL_LEADER_NODE \ - --output=${LOG_DIR}/profile_prefill.out --overlap \ - bash -c "PROFILING_MODE=prefill {% endraw %}{% if profiler == 'torch' %}SGLANG_TORCH_PROFILER_DIR=/logs/profiles/prefill {% endif %}{{ prefill_profile_env }}{% raw %} /scripts/profiling/profile.sh $PREFILL_WORKERS $DECODE_WORKERS $PREFILL_GPUS $DECODE_GPUS $TOTAL_GPUS" & +# Use the first prefill leader as the orchestrator node +PROFILE_ORCHESTRATOR_NODE=${nodes[${prefill_leaders[0]}]} +echo "Unified profiling will run on orchestrator node: $PROFILE_ORCHESTRATOR_NODE" -# Run decode profiling on first decode worker's leader node -srun --nodes=1 --ntasks=1 $ENROOT_ARGS --jobid $SLURM_JOB_ID -w $DECODE_LEADER_NODE \ - --output=${LOG_DIR}/profile_decode.out --overlap \ - bash -c "PROFILING_MODE=decode {% endraw %}{% if profiler == 'torch' %}SGLANG_TORCH_PROFILER_DIR=/logs/profiles/decode {% endif %}{{ decode_profile_env }}{% raw %} /scripts/profiling/profile.sh $PREFILL_WORKERS $DECODE_WORKERS $PREFILL_GPUS $DECODE_GPUS $TOTAL_GPUS" & +# Run a single profiling orchestrator that coordinates profiling across all leaders +srun --nodes=1 --ntasks=1 $ENROOT_ARGS --jobid $SLURM_JOB_ID -w $PROFILE_ORCHESTRATOR_NODE \ + --output=${LOG_DIR}/profile_all.out --overlap \ + bash -c "PROFILING_MODE=prefill PROFILE_PREFILL_IPS=${PREFILL_LEADER_IPS_STR} PROFILE_DECODE_IPS=${DECODE_LEADER_IPS_STR} {% endraw %}{% if profiler == 'torch' %}SGLANG_TORCH_PROFILER_DIR=/logs/profiles {% endif %}{{ prefill_profile_env }}{% raw %} /scripts/profiling/profile.sh $PREFILL_WORKERS $DECODE_WORKERS $PREFILL_GPUS $DECODE_GPUS $TOTAL_GPUS" & {% endraw %} {% endif %} diff --git a/scripts/worker_setup.py b/scripts/worker_setup.py index 8557634c..6a4fe9b8 100644 --- a/scripts/worker_setup.py +++ b/scripts/worker_setup.py @@ -95,6 +95,12 @@ def _parse_command_line_args(args: list[str] | None = None) -> argparse.Namespac help="Whether multiple frontend architecture is enabled (affects infrastructure setup)", ) + parser.add_argument( + "--use-sglang-router", + action="store_true", + help="Whether this job uses sglang router (PD disaggregation); skips NATS/ETCD/frontend bootstrap in workers.", + ) + parser.add_argument( "--dump-config-path", type=str, @@ -188,6 +194,7 @@ def main(input_args: list[str] | None = None): args.sglang_config_path, args.dump_config_path, args.setup_script, + args.use_sglang_router, ) elif args.worker_type == "decode": setup_decode_worker( @@ -201,6 +208,7 @@ def main(input_args: list[str] | None = None): args.sglang_config_path, args.dump_config_path, args.setup_script, + args.use_sglang_router, ) elif args.worker_type == "aggregated": setup_aggregated_worker( @@ -215,6 +223,7 @@ def main(input_args: list[str] | None = None): args.sglang_config_path, args.dump_config_path, args.setup_script, + args.use_sglang_router, ) logging.info(f"{args.worker_type.capitalize()} worker setup complete") diff --git a/scripts/worker_setup/command.py b/scripts/worker_setup/command.py index 09579a90..6a4a5aa5 100644 --- a/scripts/worker_setup/command.py +++ b/scripts/worker_setup/command.py @@ -73,9 +73,6 @@ def build_sglang_command_from_yaml( # Add all SGLang flags from config for key, value in sorted(mode_config.items()): flag_name = key.replace("_", "-") - # Skip disaggregation-mode flag for profiling - if flag_name == "disaggregation-mode": - continue if isinstance(value, bool): if value: cmd_parts.append(f"--{flag_name}") diff --git a/scripts/worker_setup/worker.py b/scripts/worker_setup/worker.py index c58dc50f..42f92469 100644 --- a/scripts/worker_setup/worker.py +++ b/scripts/worker_setup/worker.py @@ -84,19 +84,21 @@ def setup_prefill_worker( sglang_config_path: str | None = None, dump_config_path: str | None = None, setup_script: str | None = None, + use_sglang_router: bool = False, ) -> int: """Setup the prefill worker.""" # Setup infrastructure first (if traditional mode) need_frontend = not multiple_frontends_enabled and worker_idx == 0 and local_rank == 0 - if need_frontend: - setup_head_prefill_node(master_ip) - if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): - raise RuntimeError("Failed to connect to etcd") - else: - logging.info(f"Setting up prefill worker {worker_idx}, local rank {local_rank}") - if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): - raise RuntimeError("Failed to connect to etcd") + if not use_sglang_router: + if need_frontend: + setup_head_prefill_node(master_ip) + if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): + raise RuntimeError("Failed to connect to etcd") + else: + logging.info(f"Setting up prefill worker {worker_idx}, local rank {local_rank}") + if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): + raise RuntimeError("Failed to connect to etcd") # Install dynamo from PyPI install_dynamo_wheels(gpu_type) @@ -148,12 +150,14 @@ def setup_decode_worker( sglang_config_path: str | None = None, dump_config_path: str | None = None, setup_script: str | None = None, + use_sglang_router: bool = False, ) -> int: """Setup the decode worker.""" logging.info(f"Setting up decode worker {worker_idx}, local rank {local_rank}") - if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): - raise RuntimeError("Failed to connect to etcd") + if not use_sglang_router: + if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): + raise RuntimeError("Failed to connect to etcd") # Install dynamo from PyPI install_dynamo_wheels(gpu_type) @@ -191,19 +195,21 @@ def setup_aggregated_worker( sglang_config_path: str | None = None, dump_config_path: str | None = None, setup_script: str | None = None, + use_sglang_router: bool = False, ) -> int: """Setup the aggregated worker.""" # Setup infrastructure first (if traditional mode) need_frontend = not multiple_frontends_enabled and worker_idx == 0 and local_rank == 0 - if need_frontend: - setup_head_prefill_node(master_ip) - if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): - raise RuntimeError("Failed to connect to etcd") - else: - logging.info(f"Setting up aggregated worker {worker_idx}, local rank {local_rank}") - if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): - raise RuntimeError("Failed to connect to etcd") + if not use_sglang_router: + if need_frontend: + setup_head_prefill_node(master_ip) + if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): + raise RuntimeError("Failed to connect to etcd") + else: + logging.info(f"Setting up aggregated worker {worker_idx}, local rank {local_rank}") + if not wait_for_etcd(f"http://{master_ip}:{ETCD_CLIENT_PORT}"): + raise RuntimeError("Failed to connect to etcd") # Install dynamo from PyPI install_dynamo_wheels(gpu_type) diff --git a/src/srtctl/backends/sglang.py b/src/srtctl/backends/sglang.py index bd793e85..f0a23705 100644 --- a/src/srtctl/backends/sglang.py +++ b/src/srtctl/backends/sglang.py @@ -142,9 +142,7 @@ def _config_to_flags(self, config: dict) -> list[str]: # Convert underscores to hyphens flag_name = key.replace("_", "-") - # Skip disaggregation-mode flag when profiling (sglang.launch_server doesn't accept it) - if profiling_type in ("torch", "nsys") and flag_name == "disaggregation-mode": - continue + # Always pass disaggregation-mode so profiling runs in PD mode if isinstance(value, bool): if value: @@ -281,10 +279,8 @@ def generate_slurm_script(self, config_path: Path = None, timestamp: str = None) config_dir_path = srtctl_root / "configs" log_dir_path = srtctl_root / "logs" - # Build profiling env injections + # Build profiling env injections profiling_cfg = self.config.get("profiling") or {} - prefill_cfg = profiling_cfg.get("prefill") or {} - decode_cfg = profiling_cfg.get("decode") or {} def build_env_str(cfg: dict) -> str: parts: list[str] = [] @@ -300,11 +296,12 @@ def build_env_str(cfg: dict) -> str: parts.append(f"PROFILE_STOP_STEP={cfg['stop_step']}") return " ".join(parts) - prefill_profile_env = build_env_str(prefill_cfg) - decode_profile_env = build_env_str(decode_cfg) + # Use the same profiling spec for both prefill and decode; in PD + # disaggregation mode this single spec drives both sides. + prefill_profile_env = build_env_str(profiling_cfg) + decode_profile_env = build_env_str(profiling_cfg) profiler_mode = profiling_cfg.get("type") or "none" - # Template variables template_vars = { "job_name": job_name, @@ -327,6 +324,7 @@ def build_env_str(cfg: dict) -> str: "partition": partition, "enable_multiple_frontends": self.backend_config.get("enable_multiple_frontends", True), "num_additional_frontends": self.backend_config.get("num_additional_frontends", 9), + "use_sglang_router": self.backend_config.get("use_sglang_router", False), "do_benchmark": do_benchmark, "benchmark_type": bench_type, "benchmark_arg": parsable_config, diff --git a/src/srtctl/core/schema.py b/src/srtctl/core/schema.py index be1cfdf9..ce1be111 100644 --- a/src/srtctl/core/schema.py +++ b/src/srtctl/core/schema.py @@ -170,9 +170,12 @@ class ProfilingType(str, Enum): NONE = "none" -class ProfilingPhaseConfig(BaseModel): - """Per-phase profiling parameters.""" +class ProfilingConfig(BaseModel): + """Profiling configuration.""" + type: ProfilingType = Field(ProfilingType.NONE, description="Profiling type") + # Unified profiling spec (used for both prefill and decode in PD + # disaggregation mode, or for aggregated mode). isl: Optional[int] = Field(None, description="Input sequence length") osl: Optional[int] = Field(None, description="Output sequence length") concurrency: Optional[int] = Field(None, description="Batch size / concurrency") @@ -180,14 +183,6 @@ class ProfilingPhaseConfig(BaseModel): stop_step: Optional[int] = Field(None, description="Profiling stop step") -class ProfilingConfig(BaseModel): - """Profiling configuration.""" - - type: ProfilingType = Field(ProfilingType.NONE, description="Profiling type") - prefill: Optional[ProfilingPhaseConfig] = None - decode: Optional[ProfilingPhaseConfig] = None - - class SGLangPrefillConfig(BaseModel): """SGLang prefill worker configuration. @@ -239,9 +234,12 @@ class BackendConfig(BaseModel): # SGLang-specific config sglang_config: Optional[SGLangConfig] = None - # Frontend settings + # Frontend / router settings enable_multiple_frontends: bool = True num_additional_frontends: int = 9 + # Whether to launch sglang_router alongside the workers (PD disaggregation). + # This is user-configurable via backend.use_sglang_router in the recipe. + use_sglang_router: bool = False class JobConfig(BaseModel):