Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
7d66ba9
feat: Add metrics based on priority-based scheduling.
Jan 13, 2026
8676199
[fix] Incorrect parameter naming
Jan 13, 2026
999d4df
fix lint
huangtingwei9988 Jan 15, 2026
e1ef8c3
Merge branch 'main' into priority_metrics_dev
huangtingwei9988 Jan 15, 2026
bfad739
Merge branch 'main' into priority_metrics_dev
huangtingwei9988 Jan 20, 2026
a582950
[fix] Logic issue in the num_running_reqs_by_priority tracking record.
Jan 22, 2026
f9b834a
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Jan 22, 2026
53785e5
Merge branch 'main' into priority_metrics_dev
huangtingwei9988 Jan 26, 2026
bfe854a
[fix] Optimize priority scheduling logic.
Feb 4, 2026
311721f
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 4, 2026
934758c
[fix] Optimize priority scheduling logic.
Feb 4, 2026
e5dc82b
[fix] Code formatting issues
Feb 8, 2026
38c8494
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 8, 2026
6922329
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 11, 2026
16eb3cc
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 12, 2026
a5f2c62
Merge branch 'main' into priority_metrics_dev
JustinTong0323 Feb 12, 2026
683b39b
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 21, 2026
347ade2
[fix] Optimize the logic of the log_prefill_stats interface
Feb 22, 2026
046b3b8
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 22, 2026
168a589
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 23, 2026
eb75cca
Merge branch 'main' into priority_metrics_dev
zhuxinjie-nz Feb 23, 2026
878ddaa
[PD-Disagg] Support query dp rank from bootstrap server. (#19168)
hnyls2002 Feb 23, 2026
5e1202e
[CI] fix the teardown output of disaggregation test (#19193)
hnyls2002 Feb 23, 2026
c9a5157
add new ci user (#19133)
narutolhy Feb 23, 2026
96516d5
[CI] Tiny enhance the dp attention load blance benchmark (#19194)
hnyls2002 Feb 23, 2026
2fada5b
[PD-Disagg] Unify prefill info data transition flow, all with `Prefil…
hnyls2002 Feb 24, 2026
3c5de3f
fix: patch docker image fixes (#19100)
dougyster Feb 24, 2026
55c7122
Whisper model support & `/v1/audio/transcriptions` endpoint & benchma…
JustinTong0323 Feb 24, 2026
95eb0c7
fix: add missing blank line after docstring in serving_transcription.…
Kangyan-Zhou Feb 24, 2026
1191c05
[PD-Disagg] Deduplicate common KVManager methods into CommonKVManager…
hnyls2002 Feb 24, 2026
c30d115
fix(docker): migrate ROCm Dockerfiles from setuptools-rust to maturin…
slin1237 Feb 24, 2026
11dc631
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 24, 2026
6078c7c
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 24, 2026
06813e7
[fix] Optimize the preempt logic
Feb 24, 2026
5d8dd95
[fix] Optimize the preemption logic
Feb 24, 2026
b5c0a8d
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 25, 2026
bf7cdf5
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Feb 25, 2026
cc90291
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Mar 1, 2026
1f8cebe
Merge remote-tracking branch 'origin/main' into priority_metrics_dev
Mar 4, 2026
6f3e93d
fix wrong server args naming
hnyls2002 Mar 4, 2026
81ccd78
rename confusing name
hnyls2002 Mar 4, 2026
8491531
fix wrong comments
hnyls2002 Mar 4, 2026
56e9b44
simplify the code
hnyls2002 Mar 4, 2026
0724cbe
use QueueCount
hnyls2002 Mar 4, 2026
cdbaf59
fix import error
hnyls2002 Mar 4, 2026
f9d66ad
add comments
hnyls2002 Mar 4, 2026
b14152c
fix dict copy & label type
hnyls2002 Mar 4, 2026
75251d9
Add unit test for priority scheduling metrics
hnyls2002 Mar 4, 2026
e7257b7
fix future
hnyls2002 Mar 4, 2026
6f957cb
fix
hnyls2002 Mar 4, 2026
4504bcf
tiny fix
hnyls2002 Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions python/sglang/srt/dllm/mixin/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def init_diffusion_llm(self: Scheduler):

def get_new_batch_dllm(self: Scheduler) -> Optional[ScheduleBatch]:
"""Generate a new batch for DLLM (Diffusion LLM) scheduling."""
if self.try_preemption:
if self.enable_priority_preemption:
self.running_batch.batch_is_full = False

# Early exit if batch is full or no requests available
Expand Down Expand Up @@ -82,7 +82,7 @@ def _should_skip_prefill(self: Scheduler) -> bool:
if (
self.get_num_allocatable_reqs(running_bs) <= 0
and self.dllm_manager.is_empty()
and not self.try_preemption
and not self.enable_priority_preemption
):
self.running_batch.batch_is_full = True
return True
Expand Down Expand Up @@ -186,12 +186,8 @@ def _create_dllm_batch(
# Record prefill stats for logging after forward
from sglang.srt.observability.scheduler_metrics_mixin import PrefillStats

new_batch.prefill_stats = PrefillStats(
log_input_tokens=self.adder.log_input_tokens,
log_hit_tokens=self.adder.log_hit_tokens,
new_token_ratio=self.adder.new_token_ratio,
running_bs=len(self.running_batch.reqs),
num_new_seqs=len(can_run_list),
new_batch.prefill_stats = PrefillStats.from_adder(
self.adder, self.running_batch.reqs, self.enable_priority_scheduling
)

return new_batch
Expand All @@ -209,8 +205,9 @@ def process_dllm_incoming_reqs(

# Try preemption if batch is full
if self.running_batch.batch_is_full:
if not self.try_preemption or not adder.preempt_to_schedule(
req, self.server_args
if (
not self.enable_priority_preemption
or not adder.preempt_to_schedule(req, self.server_args)
):
break

Expand Down
27 changes: 15 additions & 12 deletions python/sglang/srt/managers/scheduler.py
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to make one or two helper functions that calculate per-priority request counts in scheduler.py and scheduler_metrics_mixin.py and consolidate the usage?

Perhaps that a) takes in the list of requests and return xxx_reqs_by_priority dictionary or b) additionally taking the dictionary and updating in place.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much — I’ll work on fixing these issues.

Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,13 @@ def init_schedule_policy(self):
else "cpu"
),
)
# Enable preemption for priority scheduling.
self.try_preemption = self.enable_priority_scheduling

# NOTE: preemption is enabled by default for priority scheduling.
self.enable_priority_preemption = (
self.enable_priority_scheduling
and not self.server_args.disable_priority_preemption
)

self.init_new_token_ratio = min(
envs.SGLANG_INIT_NEW_TOKEN_RATIO.get()
* self.server_args.schedule_conservativeness,
Expand Down Expand Up @@ -1994,7 +1999,7 @@ def _get_new_batch_prefill_raw(
for req in ready_grammar_requests:
self._add_request_to_queue(req)

if self.try_preemption:
if self.enable_priority_preemption:
# Reset batch_is_full to try preemption with a prefill adder.
self.running_batch.batch_is_full = False

Expand All @@ -2004,6 +2009,7 @@ def _get_new_batch_prefill_raw(
return None

running_bs = len(self.running_batch.reqs)

Comment thread
stmatengss marked this conversation as resolved.
# Ignore the check if self.chunked_req is not None.
# In the non-PP case, when self.chunked_req is not None, num_allocatable_reqs should always be greater than 0,
# as the space for the chunked requests has just been released.
Expand All @@ -2012,7 +2018,7 @@ def _get_new_batch_prefill_raw(
if (
self.get_num_allocatable_reqs(running_bs) <= 0
and self.chunked_req is not None
and not self.try_preemption
and not self.enable_priority_preemption
):
self.running_batch.batch_is_full = True
return None
Expand Down Expand Up @@ -2090,8 +2096,9 @@ def _get_new_batch_prefill_raw(
self.running_batch.batch_is_full = True

if self.running_batch.batch_is_full:
if not self.try_preemption or not adder.preempt_to_schedule(
req, self.server_args
if (
not self.enable_priority_preemption
or not adder.preempt_to_schedule(req, self.server_args)
):
break

Expand Down Expand Up @@ -2174,12 +2181,8 @@ def _get_new_batch_prefill_raw(
new_batch.prepare_for_extend()

# Record prefill stats for logging after forward
new_batch.prefill_stats = PrefillStats(
log_input_tokens=adder.log_input_tokens,
log_hit_tokens=adder.log_hit_tokens,
new_token_ratio=adder.new_token_ratio,
running_bs=len(self.running_batch.reqs),
num_new_seqs=len(can_run_list),
new_batch.prefill_stats = PrefillStats.from_adder(
adder, self.running_batch.reqs, self.enable_priority_scheduling
)

# Mixed-style chunked prefill
Expand Down
28 changes: 17 additions & 11 deletions python/sglang/srt/managers/scheduler_runtime_checker_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sglang.srt.environ import envs
from sglang.srt.managers.schedule_batch import ScheduleBatch
from sglang.srt.mem_cache.session_aware_cache import SessionAwareCache
from sglang.srt.observability.metrics_collector import QueueCount
from sglang.srt.utils.common import ceil_align, raise_error_or_warn
from sglang.srt.utils.request_logger import disable_request_logging
from sglang.srt.utils.watchdog import WatchdogRaw
Expand Down Expand Up @@ -301,26 +302,31 @@ def check_memory(self: Scheduler):
) = self._get_mamba_token_info()
else:
num_used, token_usage, _, _ = self._get_token_info()
num_running_reqs = len(self.running_batch.reqs)
self.stats.num_running_reqs = num_running_reqs

_enable_ps = self.enable_priority_scheduling
self.stats.num_running_reqs = QueueCount.from_reqs(
self.running_batch.reqs, _enable_ps
)
self.stats.num_used_tokens = num_used
self.stats.token_usage = round(token_usage, 2)
self.stats.gen_throughput = 0
self.stats.num_queue_reqs = len(self.waiting_queue)
self.stats.num_queue_reqs = QueueCount.from_reqs(
self.waiting_queue, _enable_ps
)
self.stats.num_grammar_queue_reqs = len(self.grammar_manager)
if self.disaggregation_mode == DisaggregationMode.PREFILL:
self.stats.num_prefill_prealloc_queue_reqs = len(
self.disagg_prefill_bootstrap_queue.queue
self.stats.num_prefill_prealloc_queue_reqs = QueueCount.from_reqs(
self.disagg_prefill_bootstrap_queue.queue, _enable_ps
)
self.stats.num_prefill_inflight_queue_reqs = len(
self.disagg_prefill_inflight_queue
self.stats.num_prefill_inflight_queue_reqs = QueueCount.from_reqs(
self.disagg_prefill_inflight_queue, _enable_ps
)
if self.disaggregation_mode == DisaggregationMode.DECODE:
self.stats.num_decode_prealloc_queue_reqs = len(
self.disagg_decode_prealloc_queue.queue
self.stats.num_decode_prealloc_queue_reqs = QueueCount.from_reqs(
self.disagg_decode_prealloc_queue.queue, _enable_ps
)
self.stats.num_decode_transfer_queue_reqs = len(
self.disagg_decode_transfer_queue.queue
self.stats.num_decode_transfer_queue_reqs = QueueCount.from_reqs(
self.disagg_decode_transfer_queue.queue, _enable_ps
)
self.metrics_collector.log_stats(self.stats)
self._publish_kv_events()
Expand Down
26 changes: 21 additions & 5 deletions python/sglang/srt/managers/tokenizer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ def init_model_config(self):
self.context_len = self.model_config.context_len
self.image_token_id = self.model_config.image_token_id
self.max_req_input_len = None # Will be set later in engine.py
self.enable_priority_scheduling = server_args.enable_priority_scheduling
self.default_priority_value = server_args.default_priority_value
speculative_algorithm = SpeculativeAlgorithm.from_string(
server_args.speculative_algorithm
)
Expand Down Expand Up @@ -421,6 +423,8 @@ def init_metric_collector_watchdog(self):
"model_name": self.server_args.served_model_name,
# TODO: Add lora name/path in the future,
}
if self.enable_priority_scheduling:
labels["priority"] = ""
if self.server_args.tokenizer_metrics_allowed_custom_labels:
for label in self.server_args.tokenizer_metrics_allowed_custom_labels:
labels[label] = ""
Expand Down Expand Up @@ -482,6 +486,7 @@ async def generate_request(

# Normalize the request
obj.normalize_batch_and_arguments()
self._set_default_priority(obj)
self._validate_rid(obj)

if isinstance(obj, GenerateReqInput) and obj.routed_dp_rank is not None:
Expand Down Expand Up @@ -1894,11 +1899,13 @@ def collect_metrics(self, state: ReqState, recv_obj: BatchStrOutput, i: int):
)

custom_labels = getattr(state.obj, "custom_labels", None)
labels = (
{**self.metrics_collector.labels, **custom_labels}
if custom_labels
else self.metrics_collector.labels
)
labels = dict(self.metrics_collector.labels)
if custom_labels:
labels.update(custom_labels)
if self.enable_priority_scheduling:
priority = getattr(state.obj, "priority", None)
if priority is not None:
labels["priority"] = str(priority)
if (
state.time_stats.first_token_time == 0.0
and self.disaggregation_mode != DisaggregationMode.PREFILL
Expand Down Expand Up @@ -2367,6 +2374,15 @@ def convert_to_span_attrs(

return span_attrs

def _set_default_priority(self, obj: Union[GenerateReqInput, EmbeddingReqInput]):
"""Set the default priority value."""
if (
self.enable_priority_scheduling
and obj.priority is None
and self.default_priority_value is not None
):
obj.priority = self.default_priority_value


class ServerStatus(Enum):
Up = "Up"
Expand Down
61 changes: 51 additions & 10 deletions python/sglang/srt/observability/metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# ==============================================================================
"""Utilities for Prometheus Metrics Collection."""

from __future__ import annotations

import dataclasses
import logging
import os
import time
from collections import Counter
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union

from sglang.srt.environ import envs
from sglang.srt.model_executor.forward_batch_info import ForwardMode
Expand All @@ -27,8 +30,12 @@
from sglang.srt.utils import get_bool_env_var
from sglang.srt.utils.gauge_histogram import GaugeHistogram

SGLANG_TEST_REQUEST_TIME_STATS = get_bool_env_var("SGLANG_TEST_REQUEST_TIME_STATS")
if TYPE_CHECKING:
from prometheus_client import Gauge

from sglang.srt.managers.schedule_batch import Req

SGLANG_TEST_REQUEST_TIME_STATS = get_bool_env_var("SGLANG_TEST_REQUEST_TIME_STATS")

logger = logging.getLogger(__name__)

Expand All @@ -47,18 +54,38 @@ def get_histogram_conf_from_env(env_var_name: str) -> Optional[List[float]]:
return [float(x) for x in env_var_value.split(",")]


@dataclass
class QueueCount:
"""Holds both the total count and optional per-priority breakdown for a queue."""

total: int = 0
by_priority: Optional[Dict[int, int]] = None

@classmethod
def from_reqs(cls, reqs: List[Req], enable_priority_scheduling: bool = False):
# NOTE: If requests have priority=None (no --default-priority-value set),
# Counter will produce {None: N}, resulting in priority="None" Prometheus labels.
# Set --default-priority-value when enabling priority scheduling to avoid this.
by_priority = (
dict(Counter(req.priority for req in reqs))
if enable_priority_scheduling
else None
)
return cls(total=len(reqs), by_priority=by_priority)


@dataclass
class SchedulerStats:
# Basics
num_running_reqs: int = 0
num_running_reqs: QueueCount = field(default_factory=QueueCount)
num_used_tokens: int = 0
token_usage: float = 0.0
pending_prealloc_token_usage: float = 0.0
swa_token_usage: float = 0.0
mamba_usage: float = 0.0
decode_sum_seq_lens: int = 0
gen_throughput: float = 0.0
num_queue_reqs: int = 0
num_queue_reqs: QueueCount = field(default_factory=QueueCount)
num_grammar_queue_reqs: int = 0
num_running_reqs_offline_batch: int = 0
cache_hit_rate: float = 0.0
Expand All @@ -74,10 +101,10 @@ class SchedulerStats:
num_paused_reqs: int = 0

# PD disaggregation
num_prefill_prealloc_queue_reqs: int = 0
num_prefill_inflight_queue_reqs: int = 0
num_decode_prealloc_queue_reqs: int = 0
num_decode_transfer_queue_reqs: int = 0
num_prefill_prealloc_queue_reqs: QueueCount = field(default_factory=QueueCount)
num_prefill_inflight_queue_reqs: QueueCount = field(default_factory=QueueCount)
num_decode_prealloc_queue_reqs: QueueCount = field(default_factory=QueueCount)
num_decode_transfer_queue_reqs: QueueCount = field(default_factory=QueueCount)
kv_transfer_speed_gb_s: float = 0.0
kv_transfer_latency_ms: float = 0.0
kv_transfer_bootstrap_ms: float = 0.0
Expand Down Expand Up @@ -158,6 +185,7 @@ def __init__(
self.enable_lora = enable_lora
self.enable_hierarchical_cache = enable_hierarchical_cache
self.last_log_time = time.perf_counter()
self._known_priorities: Set[int] = set()

self.num_running_reqs = Gauge(
name="sglang:num_running_reqs",
Expand Down Expand Up @@ -733,9 +761,22 @@ def __init__(
multiprocess_mode="mostrecent",
)

def _log_gauge(self, gauge, data: Union[int, float]) -> None:
def _log_gauge(self, gauge: Gauge, data: Union[int, float, QueueCount]) -> None:
# Convenience function for logging to gauge.
gauge.labels(**self.labels).set(data)
if isinstance(data, QueueCount):
# NOTE: When priority scheduling is enabled, the total is recorded under
# priority="" (the default label value). Per-priority breakdowns are recorded
# with priority="<int>". Grafana queries should use priority="" for totals.
gauge.labels(**self.labels).set(data.total)
if data.by_priority is not None:
self._known_priorities.update(data.by_priority.keys())
for priority in self._known_priorities:
value = data.by_priority.get(priority, 0)
labels = dict(self.labels)
labels["priority"] = str(priority)
gauge.labels(**labels).set(value)
else:
gauge.labels(**self.labels).set(data)

def _log_histogram(self, histogram, data: Union[int, float]) -> None:
histogram.labels(**self.labels).observe(data)
Expand Down
Loading
Loading