Skip to content

Gateway supports dp rank scheduling and scheduling with the minimun number of tokens#20435

Open
jiashaokun-1 wants to merge 1 commit intosgl-project:mainfrom
jiashaokun-1:main
Open

Gateway supports dp rank scheduling and scheduling with the minimun number of tokens#20435
jiashaokun-1 wants to merge 1 commit intosgl-project:mainfrom
jiashaokun-1:main

Conversation

@jiashaokun-1
Copy link
Copy Markdown
Contributor

Motivation

#19268 The routed_dp_rank and disagg_prefill_dp_rank support external DP dispatch w/ PD-disaggregation mode. This modification supports this function in the gateway and schedules requests to the dprank with the minimum number of tokens.

In the current PD-Disaggregation mode. the prefill instances only support the round_robin scheduling policy. Under this scheduling policy, which DP group a request is routed to is determined by the result of bootstrap_room mod dpsize. This can lead to load imbalance among DP groups, and this imbalance becomes more pronounced when the input requests are variable-length sequences.
Based on the above situation, I have added a new feature to select the DP group with the lightest load to process requests, thereby achieving and supporting DP load balance for PD-Disaggregation mode. The current load is measured by the number of tokens, which can be adjusted as needed in the future.The enabling or disabling of this feature is controlled by the parameter dp_minimum_tokens_scheduler.

Key Changes

1.Load Collection

The /metrics interface is used instead of /get_load to obtain the instance load. A new WorkerLoadManager class has been added to manage the engine's load, storing the number of used tokens (num_used_tokens) for each DP group as the load.
)Modify the load query interface, add the extract_gauge_metrics method to parse the response returned by Prometheus.

        let load_url = format!("{}/metrics", url);
        let mut req = client.get(&load_url).timeout(REQUEST_TIMEOUT);
        if let Some(key) = api_key {
            req = req.bearer_auth(key);
        }

        match req.send().await {
            Ok(r) if r.status().is_success() => {
                if let Ok(text) = r.text().await {
                    return crate::core::metrics_manager::extract_gauge_metrics(text, "sglang_num_used_tokens");
                }
                HashMap::new()
            },
            _ => HashMap::new(),
        }

2. Load Management

A new DPLoadManager struct has been added to manage loads, providing three methods: update_dp_loads ,get_lowest_dp_load , and load_increment.

#[derive(Debug, Default)]
pub struct WorkerLoadManager {
    // <worker, <dp_rank, loads>>
    dp_cached_loads: RwLock<HashMap<String, HashMap<isize, isize>>>,
}
  1. The update_dp_loads method updates the load. After periodically collecting load data, the LoadMonitor calls this method to perform the update.
  2. The get_lowest_dp_load method takes a worker as input and returns the dp_rank with the lowest load among the workers.
  3. The load_increment method is used to add an increment to the load of a specific dp_rank for a worker. This is done to prevent all requests from being scheduled to the same DP group during the interval between two load reports.

3. New interfaces are added.

The DPRankLoadPolicy and MinimumTokensPolicy APIs are added. The worker is passed in, and the dpRank with the minimum number of tokens is selected.

#[async_trait]
pub trait DPRankLoadPolicy: Send + Sync + Debug {
    async fn select_dp_rank(&self, worker: &dyn Worker, text_str: isize) -> Option<isize>;
}

impl DPRankLoadPolicy for MinimumTokensPolicy {
    async fn select_dp_rank(&self, worker: &dyn Worker, text_str: isize) -> Option<isize> {
        if let Some(worker_load) = self.worker_load_manager.as_ref() {
            let lowest_tokens_dp_rank = worker_load.get_lowest_dp_load(worker);
            if let Some(dp_rank) = lowest_tokens_dp_rank {
                worker_load.load_increment(worker, dp_rank, text_str);
            }
            return lowest_tokens_dp_rank;
        }
        None
    }
}

4. Configuration Parameters

  1. A new configuration parameter dp-minimum-tokens-scheduler has been added to enable scheduling of the minimum load DP group when declaring the parameter.
  2. A new configuration parameter worker-load-check-interval has been added to specify the interval for load collection. Previously, the load collection interval reused the configuration of worker-startup-check-interval. Now, this new configuration item is separate from the startup check.

Benchmarking and Profiling

Comparing the performance gains of several scheduling strategies on variable-length datasets before and after enabling dp-minimum-tokens-scheduler, the Mean TPOP performance improved by approximately 9%, and the Mean TTFT (with max_out_len=1 set to eliminate the impact of decoding on prefill) performance improved by about 15%. The specific data is as follows:

Policy Mean TTFT(ms) max_out_len=1 Mean TPOT (ms)
benckmark enable dp minimum tokens Performance benckmark enable dp minimum tokense Performance
Random 2631 2249 +14.52% 53.41 48.15 +9.85%
Round_robin 2646 2164 +18.22% 55.42 49.02 +11.55%
Cache_aware 2601 2216 +14.80% 52.67 48.48 +7.96%

router

python -m sglang_router.launch_router   --pd-disaggregation   --prefill-policy cache_aware --dp-minimum-tokens-scheduler --worker-load-check-interval 1   --prefill grpc://141.61.29.204:6699   --decode grpc://127.0.0.1:7699   --model-path /home/weights/DeepSeek-R1_w8a8   --tokenizer-path /home/weights/DeepSeek-R1_w8a8   --host 0.0.0.0 --port 4567

prefill

python3 -m sglang.launch_server --model-path ${MODEL_PATH} --tp-size 1 --dp-size 2 --base-gpu-id 8 --disaggregation-mode prefill --trust-remote-code --attention-backend ascend --disaggregation-transfer-backend ascend --device npu --quantization modelslim --watchdog-timeout 9000 --host 141.61.29.204 --grpc-mode --metrics-port 10000 --port 6699 --cuda-graph-bs 8 16 24 28 32 36 --mem-fraction-static 0.71 --max-running-requests 144 --chunked-prefill-size -1  --dtype bfloat16 --load-balance-method round_robin --disable-overlap-schedule --enable-metrics

decode

python3 -m sglang.launch_server --model-path ${MODEL_PATH} --tp-size 1 --dp-size 2 --base-gpu-id 12 --disaggregation-mode decode --trust-remote-code --attention-backend ascend --disaggregation-transfer-backend ascend --device npu --quantization modelslim --watchdog-timeout 9000 --host 141.61.29.204 --grpc-mode --metrics-port 10001 --port 7699 --cuda-graph-bs 8 16 24 28 32 36 --mem-fraction-static 0.71 --max-running-requests 144 --chunked-prefill-size -1  --dtype bfloat16 --load-balance-method round_robin --disable-overlap-schedule --load-balance-method round_robin --prefill-round-robin-balance --enable-metrics

benchmark
The model evaluation tool I used is AISBench. Click the link below to learn more: AISBench. Variable-length datasets are prone to load imbalance scenarios. During testing, I constructed a variable-length dataset. When using the benchmark's --dataset-name random option to specify the dataset, the input prompts were being split. Therefore, I used AISBench to replace the original dataset with a variable-length dataset during the test runs.

ais_bench --models vllm_api_stream_chat --datasets gsm8k_gen_0_shot_cot_str_perf  --debug --summarizer default_perf --mode perf

╒══════════════════════════╤═════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╤═════════════════╤══════╕
│ Performance Parameters   │ Stage   │ Average         │ Min             │ Max             │ Median          │ P75             │ P90             │ P99             │  N   │
╞══════════════════════════╪═════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╪═════════════════╪══════╡
│ E2EL                     │ total   │ 55044.4573 ms   │ 11143.2514 ms   │ 195472.4726 ms  │ 35057.9889 ms   │ 69464.8019 ms   │ 132295.1279 ms  │ 185509.4833 ms  │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ TTFT                     │ total   │ 893.5725 ms     │ 348.2403 ms     │ 1549.0416 ms    │ 901.9457 ms     │ 1084.3782 ms    │ 1215.7292 ms    │ 1469.2704 ms    │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ TPOT                     │ total   │ 48.4854 ms      │ 33.7401 ms      │ 63.8094 ms      │ 48.7747 ms      │ 52.4151 ms      │ 55.4097 ms      │ 59.9007 ms      │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ ITL                      │ total   │ 114.1014 ms     │ 0.0068 ms       │ 594.2452 ms     │ 115.7806 ms     │ 119.678 ms      │ 123.1585 ms     │ 251.8668 ms     │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ InputTokens              │ total   │ 2560.5243       │ 1481.0          │ 4120.0          │ 2216.0          │ 3402.25         │ 3943.3          │ 4107.49         │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ OutputTokens             │ total   │ 1159.8058       │ 187.0           │ 4068.0          │ 715.5           │ 1358.0          │ 2769.4          │ 4068.0          │ 1000 │
├──────────────────────────┼─────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼──────┤
│ OutputTokenThroughput    │ total   │ 20.3425 token/s │ 14.7171 token/s │ 28.1505 token/s │ 19.9678 token/s │ 22.0362 token/s │ 23.6102 token/s │ 26.6782 token/s │ 1000 │
╘══════════════════════════╧═════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╧═════════════════╧══════╛
╒══════════════════════════╤═════════╤═══════════════════╕
│ Common Metric            │ Stage   │ Value             │
╞══════════════════════════╪═════════╪═══════════════════╡
│ Benchmark Duration       │ total   │ 392829.9277 ms    │
├──────────────────────────┼─────────┼───────────────────┤
│ Total Requests           │ total   │ 1000              │
├──────────────────────────┼─────────┼───────────────────┤
│ Failed Requests          │ total   │ 0                 │
├──────────────────────────┼─────────┼───────────────────┤
│ Success Requests         │ total   │ 1000              │
├──────────────────────────┼─────────┼───────────────────┤
│ Concurrency              │ total   │ 86.5959           │
├──────────────────────────┼─────────┼───────────────────┤
│ Max Concurrency          │ total   │ 270               │
├──────────────────────────┼─────────┼───────────────────┤
│ Request Throughput       │ total   │ 2.5456 req/s      │
├──────────────────────────┼─────────┼───────────────────┤
│ Total Input Tokens       │ total   │ 1582404           │
├──────────────────────────┼─────────┼───────────────────┤
│ Prefill Token Throughput │ total   │ 2865.4913 token/s │
├──────────────────────────┼─────────┼───────────────────┤
│ Total generated tokens   │ total   │ 716760            │
├──────────────────────────┼─────────┼───────────────────┤
│ Input Token Throughput   │ total   │ 4028.2165 token/s │
├──────────────────────────┼─────────┼───────────────────┤
│ Output Token Throughput  │ total   │ 1824.6064 token/s │
├──────────────────────────┼─────────┼───────────────────┤
│ Total Token Throughput   │ total   │ 5852.8229 token/s │
╘══════════════════════════╧═════════╧═══════════════════╛

Checklist

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the gateway's ability to manage load distribution in a distributed inference setup, particularly for models operating in PD-Disaggregation mode. By introducing a token-based load balancing strategy, it aims to mitigate load imbalances that arise with variable-length input sequences, leading to more efficient resource utilization and improved overall performance. The changes involve a new load monitoring and management system, integration with Prometheus metrics, and new configuration options to activate and fine-tune this feature.

Highlights

  • DP Rank Scheduling: Implemented a new scheduling policy for Data Parallel (DP) groups that selects the DP rank with the minimum number of tokens, addressing load imbalance issues in PD-Disaggregation mode, especially for variable-length sequences.
  • Load Management System: Introduced a WorkerLoadManager to track and manage the load (number of used tokens) for each DP group within workers. This includes methods to update loads, retrieve the lowest load DP rank, and increment loads to prevent over-scheduling.
  • Metrics Integration: Switched from a custom /get_load endpoint to using the /metrics (Prometheus) interface for collecting worker load data, specifically parsing sglang_num_used_tokens gauge metrics.
  • Configuration and Performance: Added new configuration parameters (dp-minimum-tokens-scheduler and worker-load-check-interval) to enable and control the new scheduling behavior. Benchmarking shows performance improvements of approximately 9% in Mean TPOP and 15% in Mean TTFT on variable-length datasets.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • sgl-model-gateway/bindings/python/src/lib.rs
    • Added worker_load_check_interval and dp_minimum_tokens_scheduler fields to the Router struct.
    • Updated Router constructor and builder methods to include the new configuration parameters.
    • Set default values for worker_load_check_interval and dp_minimum_tokens_scheduler.
  • sgl-model-gateway/bindings/python/src/sglang_router/router.py
    • Updated the Router class docstring to describe the new worker_load_check_interval and dp_minimum_tokens_scheduler parameters.
  • sgl-model-gateway/bindings/python/src/sglang_router/router_args.py
    • Added worker_load_check_interval and dp_minimum_tokens_scheduler to RouterArgs with default values.
    • Introduced new CLI arguments --dp-minimum-tokens-scheduler and --worker-load-check-interval.
  • sgl-model-gateway/src/app_context.rs
    • Modified the AppContextBuilder to use config.worker_load_check_interval_secs when initializing the LoadMonitor.
  • sgl-model-gateway/src/config/builder.rs
    • Added builder methods worker_load_check_interval_secs and dp_minimum_tokens_scheduler to RouterConfigBuilder.
  • sgl-model-gateway/src/config/types.rs
    • Added worker_load_check_interval_secs and dp_minimum_tokens_scheduler fields to RouterConfig.
    • Set default values for the new fields in the RouterConfig default implementation.
  • sgl-model-gateway/src/core/metrics_manager.rs
    • Renamed from metrics_aggregator.rs.
    • Added HashMap and MetricNumber imports.
    • Implemented extract_gauge_metrics to parse Prometheus text for specific gauge metrics like sglang_num_used_tokens.
    • Added a generic extract_metrics helper function.
  • sgl-model-gateway/src/core/mod.rs
    • Updated module import from metrics_aggregator to metrics_manager.
    • Added and exported the worker_load module and WorkerLoadManager.
  • sgl-model-gateway/src/core/worker_load.rs
    • Added a new file defining the WorkerLoadManager struct to cache and manage DP group loads.
    • Implemented new, update_dp_loads, get_lowest_dp_load, and load_increment methods for load management.
    • Included unit tests for the WorkerLoadManager functionality.
  • sgl-model-gateway/src/core/worker_manager.rs
    • Updated imports to reflect the metrics_manager rename and the new WorkerLoadManager.
    • Modified get_all_worker_loads to return a tuple containing WorkerLoadsResult and a HashMap of DP rank loads.
    • Changed parse_load_response to query the /metrics endpoint and use extract_gauge_metrics to retrieve DP rank loads.
    • Added worker_load_manager to the LoadMonitor struct and initialized it.
    • Updated the monitor_loop to pass and utilize the worker_load_manager for updating DP rank loads.
  • sgl-model-gateway/src/main.rs
    • Added CLI arguments dp_minimum_tokens_scheduler and worker_load_check_interval.
    • Passed the new CLI argument values to the RouterConfigBuilder.
  • sgl-model-gateway/src/policies/dp_min_token.rs
    • Added a new file defining the MinimumTokensPolicy struct.
    • Implemented the DPRankLoadPolicy trait for MinimumTokensPolicy, including the select_dp_rank method to choose the DP rank with the lowest token load.
  • sgl-model-gateway/src/policies/mod.rs
    • Added the dp_min_token module and exported MinimumTokensPolicy.
    • Defined the new DPRankLoadPolicy trait.
  • sgl-model-gateway/src/policies/registry.rs
    • Imported DPRankLoadPolicy.
    • Added dp_rank_policy to PolicyRegistry and initialized it.
    • Implemented set_dp_rank_policy and get_dp_rank_policy methods for managing the DP rank policy.
  • sgl-model-gateway/src/routers/factory.rs
    • Imported DPRankLoadPolicy and MinimumTokensPolicy.
    • Modified build_pd_router to conditionally create and set a MinimumTokensPolicy based on the dp_minimum_tokens_scheduler configuration.
  • sgl-model-gateway/src/routers/http/pd_router.rs
    • Added inject_dp_rank_to_json helper function to modify JSON requests with DP rank information.
    • Updated execute_dual_dispatch to incorporate DP rank selection logic using DPRankLoadPolicy and inject routed_dp_rank and disagg_prefill_dp_rank into prefill and decode requests.
    • Modified execute_dual_dispatch_internal to accept separate JSON requests for prefill and decode.
  • sgl-model-gateway/src/server.rs
    • Adjusted the get_loads function to correctly extract and return WorkerLoadsResult from the new tuple return type of WorkerManager::get_all_worker_loads.
  • sgl-model-gateway/tests/metrics_manager_test.rs
    • Renamed from metrics_aggregator_test.rs.
    • Updated imports to metrics_manager.
    • Added new unit tests for the extract_gauge_metrics function, covering cases with and without dp_rank labels.
Activity
  • The author, jiashaokun-1, has completed the initial development of the DP rank scheduling feature.
  • Unit tests have been added for the new WorkerLoadManager and metrics_manager functionality.
  • Performance benchmarks were conducted, demonstrating significant improvements in Mean TPOP and Mean TTFT with the new scheduling policy.
  • Code formatting has been applied according to project guidelines.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new scheduling policy for DP-enabled workers to balance load based on the minimum number of tokens, which is a great addition for handling variable-length inputs more efficiently. The implementation is well-structured, including a new WorkerLoadManager and MinimumTokensPolicy. My review includes suggestions to improve a docstring, a log message, and a variable name for better clarity. I also identified a potential feature gap where the new scheduling policy is not applied to the gRPC PD router, which should be addressed for consistency.

Comment on lines 123 to 124
ctx.policy_registry.set_prefill_policy(prefill_policy);
ctx.policy_registry.set_decode_policy(decode_policy);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The new DP scheduling policy (MinimumTokensPolicy) is configured for the HTTP PD router (PDRouter) but not for the gRPC PD router (GrpcPDRouter). This seems like an omission that makes the feature available only for HTTP requests. For consistency, this should probably be added to create_grpc_pd_router as well.

        ctx.policy_registry.set_prefill_policy(prefill_policy);
        ctx.policy_registry.set_decode_policy(decode_policy);

        let config = ctx.router_config.clone();
        if config.dp_minimum_tokens_scheduler {
            let mini_tokens_policy = MinimumTokensPolicy::new(
                ctx.load_monitor
                    .as_ref()
                    .map(|load_monitor_arc| load_monitor_arc.worker_load_manager.clone()),
            );
            let dp_rank_policy: Arc<dyn DPRankLoadPolicy> = Arc::new(mini_tokens_policy);
            ctx.policy_registry.set_dp_rank_policy(dp_rank_policy);
        }

port: Port number to bind the router server. Default: 3001
worker_startup_timeout_secs: Timeout in seconds for worker startup and registration. Large models can take significant time to load into GPU memory. Default: 1800 (30 minutes)
worker_startup_check_interval: Interval in seconds between checks for worker initialization. Default: 10
worker_load_check_interval: Interval in seconds between get loads for worker initialization. Default: 10
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for worker_load_check_interval is a bit misleading. It says "...for worker initialization", which suggests it's only used during startup. This interval is for periodic load checking during the router's lifetime. A clearer description would be better.

Suggested change
worker_load_check_interval: Interval in seconds between get loads for worker initialization. Default: 10
worker_load_check_interval: Interval in seconds between getting loads for workers. Default: 10

Comment on lines +130 to +133
Err(e) => {
warn!("The metric is missing the dp_rank label{}, skipping.", e);
continue;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The warning message here seems to be incorrect. The error e comes from sample.get_labelset(), which can fail if there's no label set at all, not specifically because dp_rank is missing. The check for dp_rank happens later. A more generic error message would be more accurate.

Suggested change
Err(e) => {
warn!("The metric is missing the dp_rank label{}, skipping.", e);
continue;
}
Err(e) => {
warn!("Failed to get label set for metric sample: {}, skipping.", e);
continue;
}

return loads
.iter()
.min_by_key(|&(_, load)| load)
.map(|(&rand_id, _)| rand_id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The variable name rand_id is a bit confusing here. Based on the context, it represents the dp_rank. Renaming it would improve clarity and maintainability.

Suggested change
.map(|(&rand_id, _)| rand_id);
.map(|(&dp_rank, _)| dp_rank);

@ping1jing2 ping1jing2 self-assigned this Mar 17, 2026
@slin1237 slin1237 assigned slin1237 and unassigned slin1237 Mar 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants