From 4e12bd6cf7900a750f02914c600999ccbeb60651 Mon Sep 17 00:00:00 2001 From: Chanh Nguyen Date: Fri, 15 Aug 2025 06:22:36 +0000 Subject: [PATCH 1/4] Support GC Freeze to reduce latency jitter --- python/sglang/srt/entrypoints/engine.py | 17 +++++++++ python/sglang/srt/entrypoints/http_server.py | 11 ++++++ .../srt/managers/detokenizer_manager.py | 11 +++++- python/sglang/srt/managers/io_struct.py | 11 ++++++ python/sglang/srt/managers/scheduler.py | 11 ++++++ .../sglang/srt/managers/tokenizer_manager.py | 25 ++++++++++++- python/sglang/srt/server_args.py | 7 ++++ python/sglang/srt/utils.py | 36 +++++++++++++++++++ 8 files changed, 127 insertions(+), 2 deletions(-) diff --git a/python/sglang/srt/entrypoints/engine.py b/python/sglang/srt/entrypoints/engine.py index c09a128b53a0..acc2aa681c32 100644 --- a/python/sglang/srt/entrypoints/engine.py +++ b/python/sglang/srt/entrypoints/engine.py @@ -530,6 +530,23 @@ def resume_memory_occupation(self, tags: Optional[List[str]] = None): self.tokenizer_manager.resume_memory_occupation(obj, None) ) + def freeze_gc(self): + """ + To maintain a high performance server with low latency, we want to reduce the + stalls caused by the garbage collector scanning through a large number of objects. + + It is usually helpful to start the server and warm it up with real requests to + initialize many of the long-lived objects that do not need to be garbage collected. + + After sufficient warmup, we can call this function to freeze the garbage collector + so that all objects created before this point are considered out of scope for garbage + collection. + """ + + loop = asyncio.get_event_loop() + loop.run_until_complete(self.tokenizer_manager.freeze_gc()) + + """ Execute an RPC call on all scheduler processes. """ diff --git a/python/sglang/srt/entrypoints/http_server.py b/python/sglang/srt/entrypoints/http_server.py index c4d36088f394..ea4bde17c69c 100644 --- a/python/sglang/srt/entrypoints/http_server.py +++ b/python/sglang/srt/entrypoints/http_server.py @@ -502,6 +502,17 @@ async def stop_profile_async(): ) +@app.api_route("/freeze_gc", methods=["GET", "POST"]) +async def freeze_gc_async(): + """ + See engine.freeze_gc for more details. + """ + await _global_state.tokenizer_manager.freeze_gc() + return Response( + content="Garbage collection frozen.\n", + status_code=200, + ) + @app.api_route("/start_expert_distribution_record", methods=["GET", "POST"]) async def start_expert_distribution_record_async(): """Start recording the expert distribution. Clear the previous record if any.""" diff --git a/python/sglang/srt/managers/detokenizer_manager.py b/python/sglang/srt/managers/detokenizer_manager.py index 29757b4b295c..6970f93a88e5 100644 --- a/python/sglang/srt/managers/detokenizer_manager.py +++ b/python/sglang/srt/managers/detokenizer_manager.py @@ -31,10 +31,13 @@ BatchMultimodalOut, BatchStrOut, BatchTokenIDOut, + FreezeGCReq, + FreezeGCReqOutput, ) from sglang.srt.server_args import PortArgs, ServerArgs from sglang.srt.utils import ( configure_logger, + freeze_gc, get_zmq_socket, kill_itself_when_parent_died, ) @@ -100,6 +103,7 @@ def __init__( (BatchEmbeddingOut, self.handle_batch_embedding_out), (BatchTokenIDOut, self.handle_batch_token_id_out), (BatchMultimodalDecodeReq, self.handle_multimodal_decode_req), + (FreezeGCReq, self.handle_freeze_gc_req), ] ) @@ -108,7 +112,8 @@ def event_loop(self): while True: recv_obj = self.recv_from_scheduler.recv_pyobj() output = self._request_dispatcher(recv_obj) - self.send_to_tokenizer.send_pyobj(output) + if output is not None: + self.send_to_tokenizer.send_pyobj(output) def trim_matched_stop( self, output: Union[str, List[int]], finished_reason: Dict, no_stop_trim: bool @@ -247,6 +252,10 @@ def handle_multimodal_decode_req(self, recv_obj: BatchMultimodalDecodeReq): cached_tokens=recv_obj.cached_tokens, ) + def handle_freeze_gc_req(self, recv_req: FreezeGCReq): + freeze_gc("Detokenizer Manager") + return None + class LimitedCapacityDict(OrderedDict): def __init__(self, capacity: int, *args, **kwargs): diff --git a/python/sglang/srt/managers/io_struct.py b/python/sglang/srt/managers/io_struct.py index 5461282125dc..36ab301627f8 100644 --- a/python/sglang/srt/managers/io_struct.py +++ b/python/sglang/srt/managers/io_struct.py @@ -987,6 +987,17 @@ class ProfileReqOutput: message: str +@dataclass +class FreezeGCReq: + pass + + +@dataclass +class FreezeGCReqOutput: + success: bool + message: str + + @dataclass class ConfigureLoggingReq: log_requests: Optional[bool] = None diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index 6fd6ffe6437d..6cc3fd623a08 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -72,6 +72,8 @@ ExpertDistributionReqOutput, FlushCacheReqInput, FlushCacheReqOutput, + FreezeGCReq, + FreezeGCReqOutput, GetInternalStateReq, GetInternalStateReqOutput, GetWeightsByNameReqInput, @@ -143,6 +145,7 @@ configure_gc_logger, configure_logger, disable_request_logging, + freeze_gc, get_available_gpu_memory, get_bool_env_var, get_zmq_socket, @@ -514,6 +517,7 @@ def __init__( (ResumeMemoryOccupationReqInput, self.resume_memory_occupation), (SlowDownReqInput, self.slow_down), (ProfileReq, self.profile), + (FreezeGCReq, self.handle_freeze_gc), (GetInternalStateReq, self.get_internal_state), (SetInternalStateReq, self.set_internal_state), (RpcReqInput, self.handle_rpc_request), @@ -2447,6 +2451,13 @@ def current_scheduler_metrics_enabled(self): def maybe_sleep_on_idle(self): if self.idle_sleeper is not None: self.idle_sleeper.maybe_sleep() + def handle_freeze_gc(self, recv_req: FreezeGCReq): + """Handle freeze_gc request: freeze scheduler's GC and forward to detokenizer.""" + freeze_gc("Scheduler") + + self.send_to_detokenizer.send_pyobj(recv_req) + + return FreezeGCReqOutput(success=True, message="Scheduler GC frozen and request forwarded to detokenizer") class IdleSleeper: diff --git a/python/sglang/srt/managers/tokenizer_manager.py b/python/sglang/srt/managers/tokenizer_manager.py index 50ac39f8859e..f8a391b47ad5 100644 --- a/python/sglang/srt/managers/tokenizer_manager.py +++ b/python/sglang/srt/managers/tokenizer_manager.py @@ -78,6 +78,8 @@ ExpertDistributionReqOutput, FlushCacheReqInput, FlushCacheReqOutput, + FreezeGCReq, + FreezeGCReqOutput, GenerateReqInput, GetInternalStateReq, GetInternalStateReqOutput, @@ -126,6 +128,8 @@ get_bool_env_var, get_zmq_socket, kill_process_tree, + freeze_gc, + configure_gc_warning, ) from sglang.utils import TypeBasedDispatcher, get_exception_traceback @@ -133,7 +137,6 @@ logger = logging.getLogger(__name__) - @dataclasses.dataclass class ReqState: """Store the state a request.""" @@ -353,6 +356,10 @@ def __init__( collect_tokens_histogram=self.server_args.collect_tokens_histogram, ) + # Configure GC warning + if self.server_args.gc_warning_threshold_secs > 0.0: + configure_gc_warning(self.server_args.gc_warning_threshold_secs) + # Communicators self.init_weights_update_group_communicator = _Communicator( self.send_to_scheduler, server_args.dp_size @@ -381,6 +388,9 @@ def __init__( self.profile_communicator = _Communicator( self.send_to_scheduler, server_args.dp_size ) + self.freeze_gc_communicator = _Communicator( + self.send_to_scheduler, server_args.dp_size + ) self.get_internal_state_communicator = _Communicator( self.send_to_scheduler, server_args.dp_size ) @@ -447,6 +457,10 @@ def __init__( ProfileReqOutput, self.profile_communicator.handle_recv, ), + ( + FreezeGCReqOutput, + self.freeze_gc_communicator.handle_recv, + ), ( GetInternalStateReqOutput, self.get_internal_state_communicator.handle_recv, @@ -1338,6 +1352,15 @@ def configure_logging(self, obj: ConfigureLoggingReq): logging.info(f"Config logging: {obj=}") self.log_request_metadata = self.get_log_request_metadata() + async def freeze_gc(self): + """Send a freeze_gc message to the scheduler first, then freeze locally.""" + req = FreezeGCReq() + result = (await self.freeze_gc_communicator(req))[0] + + freeze_gc("Tokenizer Manager") + return result + + def create_abort_task(self, obj: GenerateReqInput): # Abort the request if the client is disconnected. async def abort_request(): diff --git a/python/sglang/srt/server_args.py b/python/sglang/srt/server_args.py index 8f8774f2a2fe..649f174683a5 100644 --- a/python/sglang/srt/server_args.py +++ b/python/sglang/srt/server_args.py @@ -120,6 +120,7 @@ class ServerArgs: decode_log_interval: int = 40 enable_request_time_stats_logging: bool = False kv_events_config: Optional[str] = None + gc_warning_threshold_secs: float = 0.0 # API related api_key: Optional[str] = None @@ -1144,6 +1145,12 @@ def add_cli_args(parser: argparse.ArgumentParser): default=ServerArgs.collect_tokens_histogram, help="Collect prompt/generation tokens histogram.", ) + parser.add_argument( + "--gc-warning-threshold-secs", + type=float, + default=ServerArgs.gc_warning_threshold_secs, + help="The threshold for long GC warning. If a GC takes longer than this, a warning will be logged. Set to 0 to disable.", + ) parser.add_argument( "--decode-log-interval", type=int, diff --git a/python/sglang/srt/utils.py b/python/sglang/srt/utils.py index 1e07a413630a..b22a64e8c98e 100644 --- a/python/sglang/srt/utils.py +++ b/python/sglang/srt/utils.py @@ -2598,6 +2598,42 @@ def dynamic_import(func_path: str): func = getattr(module, func_name) return func +def gc_object_counts(): + import gc + + g0 = len(gc.get_objects(0)) + g1 = len(gc.get_objects(1)) + g2 = len(gc.get_objects(2)) + return g0, g1, g2 + +def configure_gc_warning(warn_threshold_secs): + import gc + + gc_start_time = {} + + def gc_callback(phase, info): + gen = info.get("generation", "?") + if phase == "start": + gc_start_time[gen] = time.time() + elif phase == "stop": + duration = time.time() - gc_start_time.get(gen, time.time()) + if duration > warn_threshold_secs: + g0, g1, g2 = gc_object_counts() + logger.warn( + f"LONG GARBAGE COLLECTION DETECTED | Generation {gen} | Duration: {duration:.4f}s | # Objects: gen0={g0}, gen1={g1}, gen2={g2} | " + f"This may cause latency jitter. Consider calling the freeze_gc API after sending a few warmup requests." + ) + +def freeze_gc(context: str): + import gc + + g0_before, g1_before, g2_before = gc_object_counts() + gc.freeze() + g0_after, g1_after, g2_after = gc_object_counts() + logger.info(f"Freezing GC in {context} process. " + f"gen0: {g0_before}->{g0_after}, " + f"gen1: {g1_before}->{g1_after}, " + f"gen2: {g2_before}->{g2_after}") def configure_gc_logger(): logger.info("Enable GC Logger") From baf2d00c66d3d4df9c84bc6e5b141eb186f1cdae Mon Sep 17 00:00:00 2001 From: Chanh Nguyen Date: Mon, 18 Aug 2025 09:53:48 -0700 Subject: [PATCH 2/4] Fix Freeze GC when Tokenizer disabled (#123) --- python/sglang/srt/managers/detokenizer_manager.py | 1 - python/sglang/srt/managers/io_struct.py | 6 ------ python/sglang/srt/managers/scheduler.py | 5 +---- python/sglang/srt/managers/tokenizer_manager.py | 15 +++------------ 4 files changed, 4 insertions(+), 23 deletions(-) diff --git a/python/sglang/srt/managers/detokenizer_manager.py b/python/sglang/srt/managers/detokenizer_manager.py index 6970f93a88e5..395fd870fa51 100644 --- a/python/sglang/srt/managers/detokenizer_manager.py +++ b/python/sglang/srt/managers/detokenizer_manager.py @@ -32,7 +32,6 @@ BatchStrOut, BatchTokenIDOut, FreezeGCReq, - FreezeGCReqOutput, ) from sglang.srt.server_args import PortArgs, ServerArgs from sglang.srt.utils import ( diff --git a/python/sglang/srt/managers/io_struct.py b/python/sglang/srt/managers/io_struct.py index 36ab301627f8..01c77fbfa304 100644 --- a/python/sglang/srt/managers/io_struct.py +++ b/python/sglang/srt/managers/io_struct.py @@ -992,12 +992,6 @@ class FreezeGCReq: pass -@dataclass -class FreezeGCReqOutput: - success: bool - message: str - - @dataclass class ConfigureLoggingReq: log_requests: Optional[bool] = None diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index 6cc3fd623a08..68393d3d612d 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -73,7 +73,6 @@ FlushCacheReqInput, FlushCacheReqOutput, FreezeGCReq, - FreezeGCReqOutput, GetInternalStateReq, GetInternalStateReqOutput, GetWeightsByNameReqInput, @@ -2454,10 +2453,8 @@ def maybe_sleep_on_idle(self): def handle_freeze_gc(self, recv_req: FreezeGCReq): """Handle freeze_gc request: freeze scheduler's GC and forward to detokenizer.""" freeze_gc("Scheduler") - self.send_to_detokenizer.send_pyobj(recv_req) - - return FreezeGCReqOutput(success=True, message="Scheduler GC frozen and request forwarded to detokenizer") + return None class IdleSleeper: diff --git a/python/sglang/srt/managers/tokenizer_manager.py b/python/sglang/srt/managers/tokenizer_manager.py index f8a391b47ad5..61c3123f32a7 100644 --- a/python/sglang/srt/managers/tokenizer_manager.py +++ b/python/sglang/srt/managers/tokenizer_manager.py @@ -79,7 +79,6 @@ FlushCacheReqInput, FlushCacheReqOutput, FreezeGCReq, - FreezeGCReqOutput, GenerateReqInput, GetInternalStateReq, GetInternalStateReqOutput, @@ -388,9 +387,6 @@ def __init__( self.profile_communicator = _Communicator( self.send_to_scheduler, server_args.dp_size ) - self.freeze_gc_communicator = _Communicator( - self.send_to_scheduler, server_args.dp_size - ) self.get_internal_state_communicator = _Communicator( self.send_to_scheduler, server_args.dp_size ) @@ -457,10 +453,7 @@ def __init__( ProfileReqOutput, self.profile_communicator.handle_recv, ), - ( - FreezeGCReqOutput, - self.freeze_gc_communicator.handle_recv, - ), + (FreezeGCReq, lambda x: None), # For handling case when scheduler skips detokenizer and forwards back to the tokenizer manager, we ignore it. ( GetInternalStateReqOutput, self.get_internal_state_communicator.handle_recv, @@ -1354,11 +1347,9 @@ def configure_logging(self, obj: ConfigureLoggingReq): async def freeze_gc(self): """Send a freeze_gc message to the scheduler first, then freeze locally.""" - req = FreezeGCReq() - result = (await self.freeze_gc_communicator(req))[0] - + self.send_to_scheduler.send_pyobj(FreezeGCReq()) freeze_gc("Tokenizer Manager") - return result + return None def create_abort_task(self, obj: GenerateReqInput): From 9d36e3adf3c8bbc10dd13ec756dd612d19c95cba Mon Sep 17 00:00:00 2001 From: Chanh Nguyen Date: Thu, 21 Aug 2025 22:14:59 +0000 Subject: [PATCH 3/4] register callback --- python/sglang/srt/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/sglang/srt/utils.py b/python/sglang/srt/utils.py index 042fdf8108ff..6317e1f11f0d 100644 --- a/python/sglang/srt/utils.py +++ b/python/sglang/srt/utils.py @@ -2566,6 +2566,8 @@ def gc_callback(phase, info): f"This may cause latency jitter. Consider calling the freeze_gc API after sending a few warmup requests." ) + gc.callbacks.append(gc_callback) + def freeze_gc(context: str): import gc From fb1b52799cc358a51922e4b3a8e3cf8941ea5b5a Mon Sep 17 00:00:00 2001 From: Liangsheng Yin Date: Sat, 23 Aug 2025 11:22:20 +0800 Subject: [PATCH 4/4] fix lint --- python/sglang/srt/entrypoints/engine.py | 5 ++--- python/sglang/srt/entrypoints/http_server.py | 1 + python/sglang/srt/managers/scheduler.py | 1 + python/sglang/srt/managers/tokenizer_manager.py | 11 +++++++---- python/sglang/srt/utils.py | 16 +++++++++++----- 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/python/sglang/srt/entrypoints/engine.py b/python/sglang/srt/entrypoints/engine.py index 7f87e253717d..90c167432885 100644 --- a/python/sglang/srt/entrypoints/engine.py +++ b/python/sglang/srt/entrypoints/engine.py @@ -540,10 +540,10 @@ def freeze_gc(self): """ To maintain a high performance server with low latency, we want to reduce the stalls caused by the garbage collector scanning through a large number of objects. - + It is usually helpful to start the server and warm it up with real requests to initialize many of the long-lived objects that do not need to be garbage collected. - + After sufficient warmup, we can call this function to freeze the garbage collector so that all objects created before this point are considered out of scope for garbage collection. @@ -552,7 +552,6 @@ def freeze_gc(self): loop = asyncio.get_event_loop() loop.run_until_complete(self.tokenizer_manager.freeze_gc()) - """ Execute an RPC call on all scheduler processes. """ diff --git a/python/sglang/srt/entrypoints/http_server.py b/python/sglang/srt/entrypoints/http_server.py index e5b35c8ae153..aa496b7544fb 100644 --- a/python/sglang/srt/entrypoints/http_server.py +++ b/python/sglang/srt/entrypoints/http_server.py @@ -522,6 +522,7 @@ async def freeze_gc_async(): status_code=200, ) + @app.api_route("/start_expert_distribution_record", methods=["GET", "POST"]) async def start_expert_distribution_record_async(): """Start recording the expert distribution. Clear the previous record if any.""" diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index 99805b229176..1a82010a23a0 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -2471,6 +2471,7 @@ def current_scheduler_metrics_enabled(self): def maybe_sleep_on_idle(self): if self.idle_sleeper is not None: self.idle_sleeper.maybe_sleep() + def handle_freeze_gc(self, recv_req: FreezeGCReq): """Handle freeze_gc request: freeze scheduler's GC and forward to detokenizer.""" freeze_gc("Scheduler") diff --git a/python/sglang/srt/managers/tokenizer_manager.py b/python/sglang/srt/managers/tokenizer_manager.py index 2694e40dadfb..1161cdf1a512 100644 --- a/python/sglang/srt/managers/tokenizer_manager.py +++ b/python/sglang/srt/managers/tokenizer_manager.py @@ -123,12 +123,12 @@ from sglang.srt.sampling.sampling_params import SamplingParams from sglang.srt.server_args import PortArgs, ServerArgs from sglang.srt.utils import ( + configure_gc_warning, dataclass_to_string_truncated, + freeze_gc, get_bool_env_var, get_zmq_socket, kill_process_tree, - freeze_gc, - configure_gc_warning, ) from sglang.utils import TypeBasedDispatcher, get_exception_traceback @@ -136,6 +136,7 @@ logger = logging.getLogger(__name__) + @dataclasses.dataclass class ReqState: """Store the state a request.""" @@ -452,7 +453,10 @@ def __init__( ProfileReqOutput, self.profile_communicator.handle_recv, ), - (FreezeGCReq, lambda x: None), # For handling case when scheduler skips detokenizer and forwards back to the tokenizer manager, we ignore it. + ( + FreezeGCReq, + lambda x: None, + ), # For handling case when scheduler skips detokenizer and forwards back to the tokenizer manager, we ignore it. ( GetInternalStateReqOutput, self.get_internal_state_communicator.handle_recv, @@ -1371,7 +1375,6 @@ async def freeze_gc(self): self.send_to_scheduler.send_pyobj(FreezeGCReq()) freeze_gc("Tokenizer Manager") return None - def create_abort_task(self, obj: GenerateReqInput): # Abort the request if the client is disconnected. diff --git a/python/sglang/srt/utils.py b/python/sglang/srt/utils.py index 6317e1f11f0d..6979be0d429b 100644 --- a/python/sglang/srt/utils.py +++ b/python/sglang/srt/utils.py @@ -2540,7 +2540,8 @@ def dynamic_import(func_path: str): func = getattr(module, func_name) return func -def gc_object_counts(): + +def gc_object_counts(): import gc g0 = len(gc.get_objects(0)) @@ -2548,6 +2549,7 @@ def gc_object_counts(): g2 = len(gc.get_objects(2)) return g0, g1, g2 + def configure_gc_warning(warn_threshold_secs): import gc @@ -2568,16 +2570,20 @@ def gc_callback(phase, info): gc.callbacks.append(gc_callback) + def freeze_gc(context: str): import gc g0_before, g1_before, g2_before = gc_object_counts() gc.freeze() g0_after, g1_after, g2_after = gc_object_counts() - logger.info(f"Freezing GC in {context} process. " - f"gen0: {g0_before}->{g0_after}, " - f"gen1: {g1_before}->{g1_after}, " - f"gen2: {g2_before}->{g2_after}") + logger.info( + f"Freezing GC in {context} process. " + f"gen0: {g0_before}->{g0_after}, " + f"gen1: {g1_before}->{g1_after}, " + f"gen2: {g2_before}->{g2_after}" + ) + def configure_gc_logger(): logger.info("Enable GC Logger")