Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sgl-model-gateway/bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ struct Router {
policy: PolicyType,
worker_startup_timeout_secs: u64,
worker_startup_check_interval: u64,
worker_load_check_interval: u64,
cache_threshold: f32,
balance_abs_threshold: usize,
balance_rel_threshold: f32,
Expand All @@ -351,6 +352,7 @@ struct Router {
assignment_mode: String,
max_payload_size: usize,
dp_aware: bool,
dp_minimum_tokens_scheduler: bool,
api_key: Option<String>,
log_dir: Option<String>,
log_level: Option<String>,
Expand Down Expand Up @@ -570,6 +572,7 @@ impl Router {
.request_timeout_secs(self.request_timeout_secs)
.worker_startup_timeout_secs(self.worker_startup_timeout_secs)
.worker_startup_check_interval_secs(self.worker_startup_check_interval)
.worker_load_check_interval_secs(self.worker_load_check_interval)
.max_concurrent_requests(self.max_concurrent_requests)
.queue_size(self.queue_size)
.queue_timeout_secs(self.queue_timeout_secs)
Expand Down Expand Up @@ -632,6 +635,7 @@ impl Router {
self.server_cert_path.as_ref(),
self.server_key_path.as_ref(),
)
.dp_minimum_tokens_scheduler(self.dp_minimum_tokens_scheduler)
.build()
}
}
Expand All @@ -646,6 +650,7 @@ impl Router {
port = 3001,
worker_startup_timeout_secs = 600,
worker_startup_check_interval = 30,
worker_load_check_interval = 10,
cache_threshold = 0.3,
balance_abs_threshold = 64,
balance_rel_threshold = 1.5,
Expand All @@ -655,6 +660,7 @@ impl Router {
assignment_mode = String::from("random"),
max_payload_size = 512 * 1024 * 1024,
dp_aware = false,
dp_minimum_tokens_scheduler = false,
api_key = None,
log_dir = None,
log_level = None,
Expand Down Expand Up @@ -733,6 +739,7 @@ impl Router {
port: u16,
worker_startup_timeout_secs: u64,
worker_startup_check_interval: u64,
worker_load_check_interval: u64,
cache_threshold: f32,
balance_abs_threshold: usize,
balance_rel_threshold: f32,
Expand All @@ -742,6 +749,7 @@ impl Router {
assignment_mode: String,
max_payload_size: usize,
dp_aware: bool,
dp_minimum_tokens_scheduler: bool,
api_key: Option<String>,
log_dir: Option<String>,
log_level: Option<String>,
Expand Down Expand Up @@ -833,6 +841,7 @@ impl Router {
policy,
worker_startup_timeout_secs,
worker_startup_check_interval,
worker_load_check_interval,
cache_threshold,
balance_abs_threshold,
balance_rel_threshold,
Expand All @@ -842,6 +851,7 @@ impl Router {
assignment_mode,
max_payload_size,
dp_aware,
dp_minimum_tokens_scheduler,
api_key,
log_dir,
log_level,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class Router:
port: Port number to bind the router server. Default: 3001
worker_startup_timeout_secs: Timeout in seconds for worker startup and registration. Large models can take significant time to load into GPU memory. Default: 1800 (30 minutes)
worker_startup_check_interval: Interval in seconds between checks for worker initialization. Default: 10
worker_load_check_interval: Interval in seconds between get loads for worker initialization. Default: 10
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for worker_load_check_interval is a bit misleading. It says "...for worker initialization", which suggests it's only used during startup. This interval is for periodic load checking during the router's lifetime. A clearer description would be better.

Suggested change
worker_load_check_interval: Interval in seconds between get loads for worker initialization. Default: 10
worker_load_check_interval: Interval in seconds between getting loads for workers. Default: 10

cache_threshold: Cache threshold (0.0-1.0) for cache-aware routing. Routes to cached worker
if the match rate exceeds threshold, otherwise routes to the worker with the smallest
tree. Default: 0.5
Expand All @@ -152,6 +153,7 @@ class Router:
max_payload_size: Maximum payload size in bytes. Default: 256MB
max_tree_size: Maximum size of the approximation tree for cache-aware routing. Default: 2^24
dp_aware: Enable data parallelism aware schedule. Default: False
dp_minimum_tokens_scheduler: Enable minimum tokens scheduler for data parallel group. Default: False
enable_igw: Enable IGW (Inference-Gateway) mode for multi-model support. When enabled,
the router can manage multiple models simultaneously with per-model load balancing
policies. Default: False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class RouterArgs:
decode_policy: Optional[str] = None # Specific policy for decode nodes in PD mode
worker_startup_timeout_secs: int = 1800
worker_startup_check_interval: int = 30
worker_load_check_interval: int = 10
cache_threshold: float = 0.3
balance_abs_threshold: int = 64
balance_rel_threshold: float = 1.5
Expand All @@ -41,6 +42,7 @@ class RouterArgs:
max_payload_size: int = 512 * 1024 * 1024 # 512MB default for large batches
bucket_adjust_interval_secs: int = 5
dp_aware: bool = False
dp_minimum_tokens_scheduler: bool = False
enable_igw: bool = False # Enable IGW (Inter-Gateway) mode for multi-model support
api_key: Optional[str] = None
log_dir: Optional[str] = None
Expand Down Expand Up @@ -350,6 +352,11 @@ def add_cli_args(
action="store_true",
help="Enable data parallelism aware schedule",
)
routing_group.add_argument(
f"--{prefix}dp-minimum-tokens-scheduler",
action="store_true",
help="Enable minimum tokens scheduler for data parallel group",
)
routing_group.add_argument(
f"--{prefix}enable-igw",
action="store_true",
Expand Down Expand Up @@ -399,6 +406,12 @@ def add_cli_args(
default=RouterArgs.worker_startup_check_interval,
help="Interval in seconds between checks for worker startup",
)
pd_group.add_argument(
f"--{prefix}worker-load-check-interval",
type=int,
default=RouterArgs.worker_load_check_interval,
help="Interval in seconds between checks for worker load",
)

# Logging configuration
logging_group.add_argument(
Expand Down
2 changes: 1 addition & 1 deletion sgl-model-gateway/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl AppContextBuilder {
.expect("policy_registry must be set")
.clone(),
client.clone(),
config.worker_startup_check_interval_secs,
config.worker_load_check_interval_secs,
)));
self
}
Expand Down
9 changes: 9 additions & 0 deletions sgl-model-gateway/src/config/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ impl RouterConfigBuilder {
self
}

pub fn worker_load_check_interval_secs(mut self, interval: u64) -> Self {
self.config.worker_load_check_interval_secs = interval;
self
}
// ==================== Rate Limiting ====================

pub fn max_concurrent_requests(mut self, max: i32) -> Self {
Expand Down Expand Up @@ -471,6 +475,11 @@ impl RouterConfigBuilder {
self
}

pub fn dp_minimum_tokens_scheduler(mut self, enable: bool) -> Self {
self.config.dp_minimum_tokens_scheduler = enable;
self
}

// ==================== Option Setters ====================
// Accept Option<T> and only set if Some

Expand Down
4 changes: 4 additions & 0 deletions sgl-model-gateway/src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ pub struct RouterConfig {
pub request_timeout_secs: u64,
pub worker_startup_timeout_secs: u64,
pub worker_startup_check_interval_secs: u64,
pub worker_load_check_interval_secs: u64,
pub dp_aware: bool,
pub dp_minimum_tokens_scheduler: bool,
pub api_key: Option<String>,
pub discovery: Option<DiscoveryConfig>,
pub metrics: Option<MetricsConfig>,
Expand Down Expand Up @@ -484,7 +486,9 @@ impl Default for RouterConfig {
request_timeout_secs: 1800, // 30 minutes
worker_startup_timeout_secs: 1800, // 30 minutes for large model loading
worker_startup_check_interval_secs: 30,
worker_load_check_interval_secs: 10,
dp_aware: false,
dp_minimum_tokens_scheduler: false,
api_key: None,
discovery: None,
metrics: None,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::collections::HashMap;

use anyhow::ensure;
use openmetrics_parser::{MetricFamily, MetricsExposition, PrometheusType, PrometheusValue};
use openmetrics_parser::{
MetricFamily, MetricNumber, MetricsExposition, PrometheusType, PrometheusValue,
};
use tracing::warn;

#[derive(Debug)]
Expand Down Expand Up @@ -89,3 +93,78 @@ where

Ok(Some(it.try_fold(first, f)?))
}

pub fn extract_gauge_metrics(text: String, target_metric_family: &str) -> HashMap<isize, isize> {
let metrics_text = text.replace(":", "_");
let exposition = match openmetrics_parser::prometheus::parse_prometheus(&metrics_text) {
Ok(x) => x,
Err(err) => {
warn!(
"parse_load_response error when parsing text: pack={:?} err={:?}",
metrics_text, err
);
return HashMap::new();
}
};
extract_metrics(
&exposition,
target_metric_family,
&PrometheusValue::Gauge(MetricNumber::Float(0.0)),
)
}

pub fn extract_metrics(
exposition: &PrometheusExposition,
target_metric_family: &str,
target_value_type: &PrometheusValue,
) -> HashMap<isize, isize> {
let mut result = HashMap::new();
let Some(target_families) = exposition.families.get(target_metric_family) else {
warn!("{} don't exist!", target_metric_family);
return result;
};

for sample in target_families.iter_samples() {
let label_set = match sample.get_labelset() {
Ok(l_set) => l_set,
Err(e) => {
warn!("The metric is missing the dp_rank label{}, skipping.", e);
continue;
}
Comment on lines +130 to +133
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The warning message here seems to be incorrect. The error e comes from sample.get_labelset(), which can fail if there's no label set at all, not specifically because dp_rank is missing. The check for dp_rank happens later. A more generic error message would be more accurate.

Suggested change
Err(e) => {
warn!("The metric is missing the dp_rank label{}, skipping.", e);
continue;
}
Err(e) => {
warn!("Failed to get label set for metric sample: {}, skipping.", e);
continue;
}

};

let dp_rank_str = match label_set.get_label_value("dp_rank") {
Some(val) => val,
None => {
warn!("Don't find dp_rank");
"0"
}
};

let dp_rank = match dp_rank_str.parse::<isize>() {
Ok(rank_num) => rank_num,
Err(e) => {
warn!(
"Failed to parse dp_rank value {} as number: {}",
dp_rank_str, e
);
0
}
};

let metric_value = match (&target_value_type, &sample.value) {
(PrometheusValue::Gauge(_), PrometheusValue::Gauge(val)) => val.as_f64(),
(target_type, actual_type) => {
warn!(
"Unadapted PrometheusValue. Expected:{:?}, Actual:{:?}.",
target_type, actual_type
);
continue;
}
};

let value = metric_value as isize;
result.insert(dp_rank, value);
}
result
}
4 changes: 3 additions & 1 deletion sgl-model-gateway/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ pub use crate::protocols::UNKNOWN_MODEL_ID;
pub mod circuit_breaker;
pub mod error;
pub mod job_queue;
pub mod metrics_aggregator;
pub mod metrics_manager;
pub mod model_card;
pub mod model_type;
pub mod retry;
pub mod steps;
pub mod token_bucket;
pub mod worker;
pub mod worker_builder;
pub mod worker_load;
pub mod worker_manager;
pub mod worker_registry;
pub mod worker_service;
Expand All @@ -38,6 +39,7 @@ pub use worker::{
WorkerType,
};
pub use worker_builder::{BasicWorkerBuilder, DPAwareWorkerBuilder};
pub use worker_load::WorkerLoadManager;
pub use worker_manager::{LoadMonitor, WorkerManager};
pub use worker_registry::{HashRing, WorkerRegistry};
pub use worker_service::WorkerService;
Loading
Loading