From 45069fda0a9d4eae594019e0f2cf957e765e1482 Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 16:56:18 -0700 Subject: [PATCH 01/12] unify radix cache check for idle and busy --- .../scheduler_runtime_checker_mixin.py | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index 8fdd4391e10e..35ca64ec031e 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -299,16 +299,24 @@ def _check_mamba_memory(self: Scheduler): ) return memory_leak, token_msg - def _check_radix_cache_memory(self: Scheduler): - pool_stats = self._get_token_info() - available_size = pool_stats.full_available_size - evictable_size = pool_stats.full_evictable_size + def _check_radix_cache_memory( + self: Scheduler, ps: PoolStats, uncached_size: int = 0 + ): protected_size = self.tree_cache.protected_size() session_held = self._session_held_tokens() - memory_leak = (available_size + evictable_size) != ( - self.max_total_num_tokens - protected_size - session_held + total_accounted = ( + ps.full_available_size + + ps.full_evictable_size + + protected_size + + session_held + + uncached_size + ) + memory_leak = total_accounted != self.max_total_num_tokens + token_msg = ( + f"{self.max_total_num_tokens=}, available_size={ps.full_available_size}, " + f"evictable_size={ps.full_evictable_size}, {protected_size=}, " + f"{session_held=}, {uncached_size=}\n" ) - token_msg = f"{self.max_total_num_tokens=}, {available_size=}, {evictable_size=}, {protected_size=}, {session_held=}\n" return memory_leak, token_msg def _get_batch_uncached_size(self: Scheduler, batch: ScheduleBatch) -> int: @@ -341,10 +349,6 @@ def self_check_during_busy(self: Scheduler): return pool_stats = self._get_token_info() - available_size = pool_stats.full_available_size - evictable_size = pool_stats.full_evictable_size - protected_size = self.tree_cache.protected_size() - uncached_size = self._get_batch_uncached_size(current_batch) if ( @@ -355,20 +359,17 @@ def self_check_during_busy(self: Scheduler): uncached_size += self._get_batch_uncached_size(self.running_batch) if envs.SGLANG_ENABLE_STRICT_MEM_CHECK_DURING_BUSY.get() > 1: - log_msg = f"[Mem Check (BUSY)] {available_size=}, {evictable_size=}, {protected_size=}, {uncached_size=}" + log_msg = ( + f"[Mem Check (BUSY)] available_size={pool_stats.full_available_size}, " + f"evictable_size={pool_stats.full_evictable_size}, " + f"protected_size={self.tree_cache.protected_size()}, {uncached_size=}" + ) logger.info(log_msg) - session_held = self._session_held_tokens() - total_tokens = ( - available_size - + evictable_size - + protected_size - + uncached_size - + session_held + memory_leak, token_msg = self._check_radix_cache_memory( + pool_stats, uncached_size=uncached_size ) - assert ( - total_tokens == self.max_total_num_tokens - ), f"Mem Leak Detected! {total_tokens=} vs {self.max_total_num_tokens=}" + assert not memory_leak, f"Mem Leak Detected! {token_msg}" def _check_req_pool(self: Scheduler): if self.disaggregation_mode == DisaggregationMode.DECODE: @@ -399,7 +400,9 @@ def check_memory(self: Scheduler): elif self.is_hybrid_ssm and self.tree_cache.supports_mamba(): memory_leak, token_msg = self._check_mamba_memory() else: - memory_leak, token_msg = self._check_radix_cache_memory() + memory_leak, token_msg = self._check_radix_cache_memory( + self._get_token_info() + ) if memory_leak: msg = "token_to_kv_pool_allocator memory leak detected! " f"{token_msg}" @@ -487,7 +490,9 @@ def dump_info() -> str: elif scheduler.is_hybrid_ssm and scheduler.tree_cache.supports_mamba(): _, info_msg = scheduler._check_mamba_memory() else: - _, info_msg = scheduler._check_radix_cache_memory() + _, info_msg = scheduler._check_radix_cache_memory( + scheduler._get_token_info() + ) return ( f"{scheduler.cur_batch.batch_size()=}\n" f"{scheduler.cur_batch.reqs=}\n" From 84a9d63ffbafab5ff9b681e0626d47aabe2512a8 Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 17:08:36 -0700 Subject: [PATCH 02/12] extract _get_total_uncached_size; simplify busy check --- .../scheduler_runtime_checker_mixin.py | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index 35ca64ec031e..ea07fd4741ed 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -335,40 +335,35 @@ def _get_batch_uncached_size(self: Scheduler, batch: ScheduleBatch) -> int: return ret - def self_check_during_busy(self: Scheduler): + def _get_total_uncached_size(self: Scheduler) -> int: + """Sum uncached tokens across the current and running batches.""" current_batch: ScheduleBatch = self.last_batch - - if current_batch is None: - return - - spec_topk = self.server_args.speculative_eagle_topk or 1 - if spec_topk > 1: - warnings.warn( - "Runtime memory check (busy) is not supported when speculation topk > 1." - ) - return - - pool_stats = self._get_token_info() uncached_size = self._get_batch_uncached_size(current_batch) - if ( current_batch.forward_mode.is_extend() and self.running_batch is not None and not self.running_batch.is_empty() ): uncached_size += self._get_batch_uncached_size(self.running_batch) + return uncached_size - if envs.SGLANG_ENABLE_STRICT_MEM_CHECK_DURING_BUSY.get() > 1: - log_msg = ( - f"[Mem Check (BUSY)] available_size={pool_stats.full_available_size}, " - f"evictable_size={pool_stats.full_evictable_size}, " - f"protected_size={self.tree_cache.protected_size()}, {uncached_size=}" + def self_check_during_busy(self: Scheduler): + if self.last_batch is None: + return + spec_topk = self.server_args.speculative_eagle_topk or 1 + if spec_topk > 1: + warnings.warn( + "Runtime memory check (busy) is not supported when speculation topk > 1." ) - logger.info(log_msg) + return + uncached_size = self._get_total_uncached_size() memory_leak, token_msg = self._check_radix_cache_memory( - pool_stats, uncached_size=uncached_size + self._get_token_info(), uncached_size=uncached_size ) + + if envs.SGLANG_ENABLE_STRICT_MEM_CHECK_DURING_BUSY.get() > 1: + logger.info(f"[Mem Check (BUSY)] {token_msg}") assert not memory_leak, f"Mem Leak Detected! {token_msg}" def _check_req_pool(self: Scheduler): From c5a5d394d798f1087a1d9ef453d07d428906edf5 Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 17:21:35 -0700 Subject: [PATCH 03/12] flatten checkers: _check_full_pool + _check_swa_pool + _check_mamba_pool --- .../scheduler_runtime_checker_mixin.py | 206 ++++++++++-------- 1 file changed, 110 insertions(+), 96 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index ea07fd4741ed..2338a21178be 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -228,45 +228,76 @@ def _get_swa_token_info(self: Scheduler) -> PoolStats: swa_evictable_size=swa_evictable_size, ) - def _check_hybrid_memory(self: Scheduler): - pool_stats = self._get_swa_token_info() - full_num_used = pool_stats.full_num_used - swa_num_used = pool_stats.swa_num_used - full_available_size = pool_stats.full_available_size - full_evictable_size = pool_stats.full_evictable_size - swa_available_size = pool_stats.swa_available_size - swa_evictable_size = pool_stats.swa_evictable_size - session_held_full = self._session_held_full_tokens() - session_held_swa = self._session_held_swa_tokens() - - # Streaming sessions hold tree locks during idle, so tree-protected - # tokens must be accounted for alongside session-held tokens. - full_protected = self.tree_cache.full_protected_size() - swa_protected = self.tree_cache.swa_protected_size() - full_leaked = full_num_used - full_protected - session_held_full - swa_leaked = swa_num_used - swa_protected - session_held_swa - memory_leak = full_leaked != 0 or swa_leaked != 0 - token_msg = ( - f"{full_leaked=}, {swa_leaked=}\n" - f"{self.full_tokens_per_layer=}, {full_available_size=}, {full_evictable_size=}, {full_protected=}, {session_held_full=}\n" - f"{self.swa_tokens_per_layer=}, {swa_available_size=}, {swa_evictable_size=}, {swa_protected=}, {session_held_swa=}\n" + @staticmethod + def _check_pool_invariant( + available: int, + evictable: int, + protected: int, + session_held: int, + total: int, + uncached: int = 0, + ) -> Tuple[bool, str]: + """Check: available + evictable + protected + session_held + uncached == total.""" + total_accounted = available + evictable + protected + session_held + uncached + leak = total_accounted != total + msg = ( + f"{total=}, {available=}, {evictable=}, " + f"{protected=}, {session_held=}, {uncached=}" ) - return memory_leak, token_msg - - def _check_mamba_memory(self: Scheduler): - pool_stats = self._get_mamba_token_info() - full_num_used = pool_stats.full_num_used - mamba_num_used = pool_stats.mamba_num_used - full_available_size = pool_stats.full_available_size - full_evictable_size = pool_stats.full_evictable_size - mamba_available_size = pool_stats.mamba_available_size - mamba_evictable_size = pool_stats.mamba_evictable_size - session_held = self._session_held_tokens() - memory_leak = ( - full_num_used != self.tree_cache.full_protected_size() + session_held - or mamba_num_used != self.tree_cache.mamba_protected_size() + return leak, msg + + def _check_full_pool( + self: Scheduler, ps: PoolStats, uncached: int = 0 + ) -> Tuple[bool, str]: + if self.is_hybrid_swa or ( + self.is_hybrid_ssm and self.tree_cache.supports_mamba() + ): + protected = self.tree_cache.full_protected_size() + session_held = ( + self._session_held_full_tokens() + if self.is_hybrid_swa + else self._session_held_tokens() + ) + total = ( + self.full_tokens_per_layer + if self.is_hybrid_swa + else self.token_to_kv_pool_allocator.size + ) + else: + protected = self.tree_cache.protected_size() + session_held = self._session_held_tokens() + total = self.max_total_num_tokens + return self._check_pool_invariant( + ps.full_available_size, + ps.full_evictable_size, + protected, + session_held, + total, + uncached, + ) + + def _check_swa_pool( + self: Scheduler, ps: PoolStats, uncached: int = 0 + ) -> Tuple[bool, str]: + return self._check_pool_invariant( + ps.swa_available_size, + ps.swa_evictable_size, + self.tree_cache.swa_protected_size(), + self._session_held_swa_tokens(), + self.swa_tokens_per_layer, + uncached, ) - if memory_leak: + + def _check_mamba_pool(self: Scheduler, ps: PoolStats) -> Tuple[bool, str]: + leak, msg = self._check_pool_invariant( + ps.mamba_available_size, + ps.mamba_evictable_size, + self.tree_cache.mamba_protected_size(), + 0, + self.req_to_token_pool.mamba_pool.size, + ) + if leak: + # Page-level leak diagnosis for mamba free_full_pages = set( self.token_to_kv_pool_allocator.free_pages.tolist() + self.token_to_kv_pool_allocator.release_pages.tolist() @@ -288,36 +319,11 @@ def _check_mamba_memory(self: Scheduler): leaked_mamba_pages = ( expected_mamba_pages - free_mamba_pages - cached_mamba_pages ) - token_msg = ( - f"{full_available_size=}, {full_evictable_size=}, {self.token_to_kv_pool_allocator.size=}, {self.tree_cache.full_protected_size()=}\n" - f"{mamba_available_size=}, {mamba_evictable_size=}, {self.req_to_token_pool.mamba_pool.size=}, {self.tree_cache.mamba_protected_size()=}, leaked_full_pages={leaked_full_pages if len(leaked_full_pages) > 0 else None}, leaked_mamba_pages={leaked_mamba_pages if len(leaked_mamba_pages) > 0 else None}\n" - ) - else: - token_msg = ( - f"{full_available_size=}, {full_evictable_size=}, {self.token_to_kv_pool_allocator.size=}, {self.tree_cache.full_protected_size()=}\n" - f"{mamba_available_size=}, {mamba_evictable_size=}, {self.req_to_token_pool.mamba_pool.size=}, {self.tree_cache.mamba_protected_size()=}\n" + msg += ( + f", leaked_full_pages={leaked_full_pages or None}" + f", leaked_mamba_pages={leaked_mamba_pages or None}" ) - return memory_leak, token_msg - - def _check_radix_cache_memory( - self: Scheduler, ps: PoolStats, uncached_size: int = 0 - ): - protected_size = self.tree_cache.protected_size() - session_held = self._session_held_tokens() - total_accounted = ( - ps.full_available_size - + ps.full_evictable_size - + protected_size - + session_held - + uncached_size - ) - memory_leak = total_accounted != self.max_total_num_tokens - token_msg = ( - f"{self.max_total_num_tokens=}, available_size={ps.full_available_size}, " - f"evictable_size={ps.full_evictable_size}, {protected_size=}, " - f"{session_held=}, {uncached_size=}\n" - ) - return memory_leak, token_msg + return leak, msg def _get_batch_uncached_size(self: Scheduler, batch: ScheduleBatch) -> int: ret = 0 @@ -357,14 +363,12 @@ def self_check_during_busy(self: Scheduler): ) return - uncached_size = self._get_total_uncached_size() - memory_leak, token_msg = self._check_radix_cache_memory( - self._get_token_info(), uncached_size=uncached_size - ) + uncached = self._get_total_uncached_size() + leak, msg = self._check_full_pool(self.get_pool_stats(), uncached=uncached) if envs.SGLANG_ENABLE_STRICT_MEM_CHECK_DURING_BUSY.get() > 1: - logger.info(f"[Mem Check (BUSY)] {token_msg}") - assert not memory_leak, f"Mem Leak Detected! {token_msg}" + logger.info(f"[Mem Check (BUSY)] {msg}") + assert not leak, f"Mem Leak Detected! {msg}" def _check_req_pool(self: Scheduler): if self.disaggregation_mode == DisaggregationMode.DECODE: @@ -389,24 +393,34 @@ def _check_req_pool(self: Scheduler): msg, ) + def _report_leak(self: Scheduler, pool_name: str, token_msg: str): + msg = f"{pool_name} memory leak detected! {token_msg}" + raise_error_or_warn( + self, + envs.SGLANG_ENABLE_STRICT_MEM_CHECK_DURING_IDLE.get(), + "count_memory_leak_warnings", + msg, + ) + def check_memory(self: Scheduler): + ps = self.get_pool_stats() + + # Always check full/KV pool + full_leak, full_msg = self._check_full_pool(ps) + if full_leak: + self._report_leak("full_pool", full_msg) + + # Check SWA sub-pool if self.is_hybrid_swa: - memory_leak, token_msg = self._check_hybrid_memory() - elif self.is_hybrid_ssm and self.tree_cache.supports_mamba(): - memory_leak, token_msg = self._check_mamba_memory() - else: - memory_leak, token_msg = self._check_radix_cache_memory( - self._get_token_info() - ) + swa_leak, swa_msg = self._check_swa_pool(ps) + if swa_leak: + self._report_leak("swa_pool", swa_msg) - if memory_leak: - msg = "token_to_kv_pool_allocator memory leak detected! " f"{token_msg}" - raise_error_or_warn( - self, - envs.SGLANG_ENABLE_STRICT_MEM_CHECK_DURING_IDLE.get(), - "count_memory_leak_warnings", - msg, - ) + # Check mamba sub-pool + if self.is_hybrid_ssm and self.tree_cache.supports_mamba(): + mamba_leak, mamba_msg = self._check_mamba_pool(ps) + if mamba_leak: + self._report_leak("mamba_pool", mamba_msg) self._check_req_pool() @@ -480,18 +494,18 @@ def create_scheduler_watchdog( def dump_info() -> str: if scheduler.is_initializing or disable_request_logging(): return "" + ps = scheduler.get_pool_stats() + _, full_msg = scheduler._check_full_pool(ps) + info_parts = [full_msg] if scheduler.is_hybrid_swa: - _, info_msg = scheduler._check_hybrid_memory() - elif scheduler.is_hybrid_ssm and scheduler.tree_cache.supports_mamba(): - _, info_msg = scheduler._check_mamba_memory() - else: - _, info_msg = scheduler._check_radix_cache_memory( - scheduler._get_token_info() - ) + _, swa_msg = scheduler._check_swa_pool(ps) + info_parts.append(swa_msg) + if scheduler.is_hybrid_ssm and scheduler.tree_cache.supports_mamba(): + _, mamba_msg = scheduler._check_mamba_pool(ps) + info_parts.append(mamba_msg) return ( f"{scheduler.cur_batch.batch_size()=}\n" - f"{scheduler.cur_batch.reqs=}\n" - f"{info_msg}" + f"{scheduler.cur_batch.reqs=}\n" + "\n".join(info_parts) ) return WatchdogRaw( From bf52b1e6bfe38fc29d0a2e5f61d8ebaff40e74c9 Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 22:22:04 -0700 Subject: [PATCH 04/12] use is_fully_idle() as idle check guard --- .../managers/scheduler_runtime_checker_mixin.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index 2338a21178be..0666b4cdd1ed 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -466,21 +466,8 @@ def check_tree_cache(self: Scheduler): self.tree_cache.sanity_check() def self_check_during_idle(self: Scheduler): - if self.enable_hisparse and self.hisparse_coordinator.has_ongoing_staging(): + if not self.is_fully_idle(): return - if self.disaggregation_mode == DisaggregationMode.PREFILL: - if len(self.disagg_prefill_inflight_queue) > 0: - return - elif self.disaggregation_mode == DisaggregationMode.DECODE: - queue_size = ( - len(self.waiting_queue) - + len(self.disagg_decode_transfer_queue.queue) - + len(self.disagg_decode_prealloc_queue.queue) - ) - if self.server_args.disaggregation_decode_enable_offload_kvcache: - queue_size += len(self.decode_offload_manager.ongoing_offload) - if queue_size: - return self.check_memory() self.check_tree_cache() From a2be961555ac5dcae049cf18f85a6721e6fa991f Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 22:35:53 -0700 Subject: [PATCH 05/12] rename on_idle/self_check_during_idle/busy; extract _maybe_log_idle_metrics --- python/sglang/srt/disaggregation/decode.py | 4 +- python/sglang/srt/disaggregation/prefill.py | 4 +- python/sglang/srt/managers/scheduler.py | 4 +- .../sglang/srt/managers/scheduler_pp_mixin.py | 6 +- .../scheduler_runtime_checker_mixin.py | 77 +++++++++++-------- .../srt/multiplex/multiplexing_mixin.py | 4 +- 6 files changed, 54 insertions(+), 45 deletions(-) diff --git a/python/sglang/srt/disaggregation/decode.py b/python/sglang/srt/disaggregation/decode.py index ccf842ac0067..b7e1f97c347d 100644 --- a/python/sglang/srt/disaggregation/decode.py +++ b/python/sglang/srt/disaggregation/decode.py @@ -1165,7 +1165,7 @@ def event_loop_normal_disagg_decode(self: Scheduler): self.process_batch_result(batch, result) else: # When the server is idle, do self-check and re-init some states - self.self_check_during_idle() + self.on_idle() # Update last_batch self.last_batch = batch @@ -1198,7 +1198,7 @@ def event_loop_overlap_disagg_decode(self: Scheduler): tmp_batch, tmp_result = self.result_queue.popleft() self.process_batch_result(tmp_batch, tmp_result) elif batch is None: - self.self_check_during_idle() + self.on_idle() # Run sample of the current batch # It depends on the result of the last batch (e.g., grammar), so we run it after the last batch is processed. diff --git a/python/sglang/srt/disaggregation/prefill.py b/python/sglang/srt/disaggregation/prefill.py index 8e3da245b9e8..0e0a653fdaa0 100644 --- a/python/sglang/srt/disaggregation/prefill.py +++ b/python/sglang/srt/disaggregation/prefill.py @@ -409,7 +409,7 @@ def event_loop_normal_disagg_prefill(self: Scheduler) -> None: result = self.run_batch(batch) self.process_batch_result(batch, result) else: - self.self_check_during_idle() + self.on_idle() self.process_disagg_prefill_inflight_queue() @@ -448,7 +448,7 @@ def event_loop_overlap_disagg_prefill(self: Scheduler) -> None: self.process_batch_result(tmp_batch, tmp_result) elif batch is None: # When the server is idle, do self-check and re-init some states - self.self_check_during_idle() + self.on_idle() self.process_disagg_prefill_inflight_queue() diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index 5493c0aa456c..d366046b743a 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -1370,7 +1370,7 @@ def event_loop_normal(self): self.process_batch_result(batch, result) else: # When the server is idle, do self-check and re-init some states. - self.self_check_during_idle() + self.on_idle() # Update last_batch self.last_batch = batch @@ -1420,7 +1420,7 @@ def pop_and_process(): pop_and_process() elif batch is None: # When the server is idle, do self-check and re-init some states - self.self_check_during_idle() + self.on_idle() # Run sample of the current batch # It depends on the result of the last batch (e.g., grammar), so we run it after the last batch is processed. diff --git a/python/sglang/srt/managers/scheduler_pp_mixin.py b/python/sglang/srt/managers/scheduler_pp_mixin.py index ba9cc0ac2342..9c0edc315173 100644 --- a/python/sglang/srt/managers/scheduler_pp_mixin.py +++ b/python/sglang/srt/managers/scheduler_pp_mixin.py @@ -142,7 +142,7 @@ def event_loop_pp(self: Scheduler): # When the server is idle, self-check and re-init some states if server_is_idle: - self.self_check_during_idle() + self.on_idle() @DynamicGradMode() def event_loop_pp_disagg_prefill(self: Scheduler): @@ -318,7 +318,7 @@ def event_loop_pp_disagg_prefill(self: Scheduler): # When the server is idle, self-check and re-init some states if server_is_idle and len(self.disagg_prefill_inflight_queue) == 0: - self.self_check_during_idle() + self.on_idle() @DynamicGradMode() def event_loop_pp_disagg_decode(self: Scheduler): @@ -508,7 +508,7 @@ def event_loop_pp_disagg_decode(self: Scheduler): queue_size += len(self.decode_offload_manager.ongoing_offload) if server_is_idle and queue_size == 0: - self.self_check_during_idle() + self.on_idle() def init_pp_loop_state(self: Scheduler): self.pp_loop_size: int = self.pp_size + self.server_args.pp_async_batch_depth diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index 0666b4cdd1ed..82e4ffab6fb6 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -402,11 +402,15 @@ def _report_leak(self: Scheduler, pool_name: str, token_msg: str): msg, ) - def check_memory(self: Scheduler): - ps = self.get_pool_stats() + def self_check_during_idle( + self: Scheduler, ps: Optional[PoolStats] = None, uncached: int = 0 + ): + """Check memory invariant across all pools. Used by both idle and busy paths.""" + if ps is None: + ps = self.get_pool_stats() # Always check full/KV pool - full_leak, full_msg = self._check_full_pool(ps) + full_leak, full_msg = self._check_full_pool(ps, uncached=uncached) if full_leak: self._report_leak("full_pool", full_msg) @@ -424,40 +428,42 @@ def check_memory(self: Scheduler): self._check_req_pool() + def _maybe_log_idle_metrics(self: Scheduler): + """Collect and log metrics every 30 seconds during idle.""" if ( - self.current_scheduler_metrics_enabled - and time.perf_counter() > self.metrics_collector.last_log_time + 30 + not self.current_scheduler_metrics_enabled + or time.perf_counter() <= self.metrics_collector.last_log_time + 30 ): - # During idle time, also collect metrics every 30 seconds. - self.get_pool_stats().update_scheduler_stats(self.stats) + return + + self.get_pool_stats().update_scheduler_stats(self.stats) - priority_enabled = self.enable_priority_scheduling - self.stats.num_running_reqs = QueueCount.from_reqs( - self.running_batch.reqs, priority_enabled + priority_enabled = self.enable_priority_scheduling + self.stats.num_running_reqs = QueueCount.from_reqs( + self.running_batch.reqs, priority_enabled + ) + self.stats.gen_throughput = 0 + self.stats.num_queue_reqs = QueueCount.from_reqs( + self.waiting_queue, priority_enabled + ) + self.stats.num_grammar_queue_reqs = len(self.grammar_manager) + if self.disaggregation_mode == DisaggregationMode.PREFILL: + self.stats.num_prefill_prealloc_queue_reqs = QueueCount.from_reqs( + self.disagg_prefill_bootstrap_queue.queue, priority_enabled ) - self.stats.gen_throughput = 0 - self.stats.num_queue_reqs = QueueCount.from_reqs( - self.waiting_queue, priority_enabled + self.stats.num_prefill_inflight_queue_reqs = QueueCount.from_reqs( + self.disagg_prefill_inflight_queue, priority_enabled ) - self.stats.num_grammar_queue_reqs = len(self.grammar_manager) - if self.disaggregation_mode == DisaggregationMode.PREFILL: - self.stats.num_prefill_prealloc_queue_reqs = QueueCount.from_reqs( - self.disagg_prefill_bootstrap_queue.queue, priority_enabled - ) - self.stats.num_prefill_inflight_queue_reqs = QueueCount.from_reqs( - self.disagg_prefill_inflight_queue, priority_enabled - ) - if self.disaggregation_mode == DisaggregationMode.DECODE: - self.stats.num_decode_prealloc_queue_reqs = QueueCount.from_reqs( - self.disagg_decode_prealloc_queue.queue, priority_enabled - ) - self.stats.num_decode_transfer_queue_reqs = QueueCount.from_reqs( - self.disagg_decode_transfer_queue.queue, priority_enabled - ) - self.metrics_collector.log_stats(self.stats) - self._publish_kv_events() + if self.disaggregation_mode == DisaggregationMode.DECODE: + self.stats.num_decode_prealloc_queue_reqs = QueueCount.from_reqs( + self.disagg_decode_prealloc_queue.queue, priority_enabled + ) + self.stats.num_decode_transfer_queue_reqs = QueueCount.from_reqs( + self.disagg_decode_transfer_queue.queue, priority_enabled + ) + self.metrics_collector.log_stats(self.stats) - def check_tree_cache(self: Scheduler): + def _check_tree_cache(self: Scheduler): if ( self.tree_cache.is_tree_cache() and (self.is_hybrid_swa and self.tree_cache.supports_swa()) @@ -465,12 +471,15 @@ def check_tree_cache(self: Scheduler): ): self.tree_cache.sanity_check() - def self_check_during_idle(self: Scheduler): + def on_idle(self: Scheduler): + """Idle housekeeping: guard, check, metrics, reset, sleep.""" if not self.is_fully_idle(): return - self.check_memory() - self.check_tree_cache() + self.self_check_during_idle() + self._check_tree_cache() + self._maybe_log_idle_metrics() + self._publish_kv_events() self.new_token_ratio = self.init_new_token_ratio self.maybe_sleep_on_idle() diff --git a/python/sglang/srt/multiplex/multiplexing_mixin.py b/python/sglang/srt/multiplex/multiplexing_mixin.py index 1e1e858aefb6..22e3b4d9b6f8 100644 --- a/python/sglang/srt/multiplex/multiplexing_mixin.py +++ b/python/sglang/srt/multiplex/multiplexing_mixin.py @@ -128,8 +128,8 @@ def event_loop_pdmux(self: Scheduler): stream_idx > 0 and self.running_batch.is_empty() ) if self.running_batch.is_empty() and self.split_prefill_batch is None: - self.check_memory() - self.check_tree_cache() + self.self_check_during_idle() + self._check_tree_cache() self.new_token_ratio = self.init_new_token_ratio self.maybe_sleep_on_idle() From 20d171985b1240e92e9cdc570faac66d1ae0400d Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 22:38:37 -0700 Subject: [PATCH 06/12] add inline comments for on_idle steps --- .../srt/managers/scheduler_runtime_checker_mixin.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index 82e4ffab6fb6..a451d6a57176 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -476,12 +476,12 @@ def on_idle(self: Scheduler): if not self.is_fully_idle(): return - self.self_check_during_idle() - self._check_tree_cache() - self._maybe_log_idle_metrics() - self._publish_kv_events() - self.new_token_ratio = self.init_new_token_ratio - self.maybe_sleep_on_idle() + self.self_check_during_idle() # memory leak check + self._check_tree_cache() # tree cache sanity check + self._maybe_log_idle_metrics() # metrics every 30s + self._publish_kv_events() # kv event publishing + self.new_token_ratio = self.init_new_token_ratio # reset token ratio + self.maybe_sleep_on_idle() # sleep until next event def create_scheduler_watchdog( From 4c5f13a8c5266169f1e4ccdc6cbdb7652a613e6b Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 22:40:05 -0700 Subject: [PATCH 07/12] fix on_idle ordering: reset token_ratio before sleep --- .../scheduler_runtime_checker_mixin.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index a451d6a57176..445ce5a994c1 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -476,12 +476,23 @@ def on_idle(self: Scheduler): if not self.is_fully_idle(): return - self.self_check_during_idle() # memory leak check - self._check_tree_cache() # tree cache sanity check - self._maybe_log_idle_metrics() # metrics every 30s - self._publish_kv_events() # kv event publishing - self.new_token_ratio = self.init_new_token_ratio # reset token ratio - self.maybe_sleep_on_idle() # sleep until next event + # memory leak check + self.self_check_during_idle() + + # tree cache sanity check + self._check_tree_cache() + + # metrics every 30s + self._maybe_log_idle_metrics() + + # kv event publishing + self._publish_kv_events() + + # reset token ratio + self.new_token_ratio = self.init_new_token_ratio + + # sleep until next event + self.maybe_sleep_on_idle() def create_scheduler_watchdog( From c4110a882e1ed163f3f156381af2a3847fe98f34 Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 22:41:40 -0700 Subject: [PATCH 08/12] extract _check_all_pools; reuse in idle check and watchdog --- .../scheduler_runtime_checker_mixin.py | 48 +++++++++---------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index 445ce5a994c1..aeef08c741a8 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -402,30 +402,34 @@ def _report_leak(self: Scheduler, pool_name: str, token_msg: str): msg, ) - def self_check_during_idle( - self: Scheduler, ps: Optional[PoolStats] = None, uncached: int = 0 - ): - """Check memory invariant across all pools. Used by both idle and busy paths.""" - if ps is None: - ps = self.get_pool_stats() - - # Always check full/KV pool + def _check_all_pools( + self: Scheduler, ps: PoolStats, uncached: int = 0 + ) -> Tuple[bool, List[str]]: + """Check memory invariant across all pools. Returns (has_leak, messages).""" + has_leak = False + messages = [] + full_leak, full_msg = self._check_full_pool(ps, uncached=uncached) - if full_leak: - self._report_leak("full_pool", full_msg) + has_leak |= full_leak + messages.append(full_msg) - # Check SWA sub-pool if self.is_hybrid_swa: swa_leak, swa_msg = self._check_swa_pool(ps) - if swa_leak: - self._report_leak("swa_pool", swa_msg) + has_leak |= swa_leak + messages.append(swa_msg) - # Check mamba sub-pool if self.is_hybrid_ssm and self.tree_cache.supports_mamba(): mamba_leak, mamba_msg = self._check_mamba_pool(ps) - if mamba_leak: - self._report_leak("mamba_pool", mamba_msg) + has_leak |= mamba_leak + messages.append(mamba_msg) + + return has_leak, messages + def self_check_during_idle(self: Scheduler): + """Idle memory check: all pools + req pool.""" + has_leak, messages = self._check_all_pools(self.get_pool_stats()) + if has_leak: + self._report_leak("pool", "\n".join(messages)) self._check_req_pool() def _maybe_log_idle_metrics(self: Scheduler): @@ -501,18 +505,10 @@ def create_scheduler_watchdog( def dump_info() -> str: if scheduler.is_initializing or disable_request_logging(): return "" - ps = scheduler.get_pool_stats() - _, full_msg = scheduler._check_full_pool(ps) - info_parts = [full_msg] - if scheduler.is_hybrid_swa: - _, swa_msg = scheduler._check_swa_pool(ps) - info_parts.append(swa_msg) - if scheduler.is_hybrid_ssm and scheduler.tree_cache.supports_mamba(): - _, mamba_msg = scheduler._check_mamba_pool(ps) - info_parts.append(mamba_msg) + _, messages = scheduler._check_all_pools(scheduler.get_pool_stats()) return ( f"{scheduler.cur_batch.batch_size()=}\n" - f"{scheduler.cur_batch.reqs=}\n" + "\n".join(info_parts) + f"{scheduler.cur_batch.reqs=}\n" + "\n".join(messages) ) return WatchdogRaw( From e0fb811ddcbdfefdca7aa49175fa07b4775d12fc Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 22:47:24 -0700 Subject: [PATCH 09/12] move _check_req_pool to on_idle --- .../sglang/srt/managers/scheduler_runtime_checker_mixin.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index aeef08c741a8..6fe806964197 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -356,6 +356,7 @@ def _get_total_uncached_size(self: Scheduler) -> int: def self_check_during_busy(self: Scheduler): if self.last_batch is None: return + spec_topk = self.server_args.speculative_eagle_topk or 1 if spec_topk > 1: warnings.warn( @@ -426,11 +427,10 @@ def _check_all_pools( return has_leak, messages def self_check_during_idle(self: Scheduler): - """Idle memory check: all pools + req pool.""" + """Idle memory check: all pools.""" has_leak, messages = self._check_all_pools(self.get_pool_stats()) if has_leak: self._report_leak("pool", "\n".join(messages)) - self._check_req_pool() def _maybe_log_idle_metrics(self: Scheduler): """Collect and log metrics every 30 seconds during idle.""" @@ -482,6 +482,7 @@ def on_idle(self: Scheduler): # memory leak check self.self_check_during_idle() + self._check_req_pool() # tree cache sanity check self._check_tree_cache() From 2257c865e0576c587e503fee48ebf8dff40fe332 Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Fri, 10 Apr 2026 22:48:27 -0700 Subject: [PATCH 10/12] inline self_check_during_idle into on_idle; unify multiplexing caller --- .../srt/managers/scheduler_runtime_checker_mixin.py | 10 +++------- python/sglang/srt/multiplex/multiplexing_mixin.py | 5 +---- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index 6fe806964197..06841ca98f91 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -426,12 +426,6 @@ def _check_all_pools( return has_leak, messages - def self_check_during_idle(self: Scheduler): - """Idle memory check: all pools.""" - has_leak, messages = self._check_all_pools(self.get_pool_stats()) - if has_leak: - self._report_leak("pool", "\n".join(messages)) - def _maybe_log_idle_metrics(self: Scheduler): """Collect and log metrics every 30 seconds during idle.""" if ( @@ -481,7 +475,9 @@ def on_idle(self: Scheduler): return # memory leak check - self.self_check_during_idle() + has_leak, messages = self._check_all_pools(self.get_pool_stats()) + if has_leak: + self._report_leak("pool", "\n".join(messages)) self._check_req_pool() # tree cache sanity check diff --git a/python/sglang/srt/multiplex/multiplexing_mixin.py b/python/sglang/srt/multiplex/multiplexing_mixin.py index 22e3b4d9b6f8..9902afe5c16f 100644 --- a/python/sglang/srt/multiplex/multiplexing_mixin.py +++ b/python/sglang/srt/multiplex/multiplexing_mixin.py @@ -128,10 +128,7 @@ def event_loop_pdmux(self: Scheduler): stream_idx > 0 and self.running_batch.is_empty() ) if self.running_batch.is_empty() and self.split_prefill_batch is None: - self.self_check_during_idle() - self._check_tree_cache() - self.new_token_ratio = self.init_new_token_ratio - self.maybe_sleep_on_idle() + self.on_idle() if adjust_stream_group: prefill_stream.synchronize() From 144f48f57c93b14ce4b4e6079f3f06e66158a42e Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Sat, 11 Apr 2026 00:22:00 -0700 Subject: [PATCH 11/12] add pool_name to _check_pool_invariant msg --- .../sglang/srt/managers/scheduler_runtime_checker_mixin.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index 06841ca98f91..948707fa551d 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -230,6 +230,7 @@ def _get_swa_token_info(self: Scheduler) -> PoolStats: @staticmethod def _check_pool_invariant( + pool_name: str, available: int, evictable: int, protected: int, @@ -241,7 +242,7 @@ def _check_pool_invariant( total_accounted = available + evictable + protected + session_held + uncached leak = total_accounted != total msg = ( - f"{total=}, {available=}, {evictable=}, " + f"[{pool_name}] {total=}, {available=}, {evictable=}, " f"{protected=}, {session_held=}, {uncached=}" ) return leak, msg @@ -268,6 +269,7 @@ def _check_full_pool( session_held = self._session_held_tokens() total = self.max_total_num_tokens return self._check_pool_invariant( + "full", ps.full_available_size, ps.full_evictable_size, protected, @@ -280,6 +282,7 @@ def _check_swa_pool( self: Scheduler, ps: PoolStats, uncached: int = 0 ) -> Tuple[bool, str]: return self._check_pool_invariant( + "swa", ps.swa_available_size, ps.swa_evictable_size, self.tree_cache.swa_protected_size(), @@ -290,6 +293,7 @@ def _check_swa_pool( def _check_mamba_pool(self: Scheduler, ps: PoolStats) -> Tuple[bool, str]: leak, msg = self._check_pool_invariant( + "mamba", ps.mamba_available_size, ps.mamba_evictable_size, self.tree_cache.mamba_protected_size(), From c07ee9c62d5073808e0d9dae160d4082030de83c Mon Sep 17 00:00:00 2001 From: hnyls2002 Date: Sat, 11 Apr 2026 00:22:39 -0700 Subject: [PATCH 12/12] flatten _check_full_pool into if/elif/else --- .../scheduler_runtime_checker_mixin.py | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py index 948707fa551d..b5a0e4a3086c 100644 --- a/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py +++ b/python/sglang/srt/managers/scheduler_runtime_checker_mixin.py @@ -250,20 +250,14 @@ def _check_pool_invariant( def _check_full_pool( self: Scheduler, ps: PoolStats, uncached: int = 0 ) -> Tuple[bool, str]: - if self.is_hybrid_swa or ( - self.is_hybrid_ssm and self.tree_cache.supports_mamba() - ): + if self.is_hybrid_swa: protected = self.tree_cache.full_protected_size() - session_held = ( - self._session_held_full_tokens() - if self.is_hybrid_swa - else self._session_held_tokens() - ) - total = ( - self.full_tokens_per_layer - if self.is_hybrid_swa - else self.token_to_kv_pool_allocator.size - ) + session_held = self._session_held_full_tokens() + total = self.full_tokens_per_layer + elif self.is_hybrid_ssm and self.tree_cache.supports_mamba(): + protected = self.tree_cache.full_protected_size() + session_held = self._session_held_tokens() + total = self.token_to_kv_pool_allocator.size else: protected = self.tree_cache.protected_size() session_held = self._session_held_tokens()