Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions python/sglang/srt/entrypoints/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,23 @@ def resume_memory_occupation(self, tags: Optional[List[str]] = None):
self.tokenizer_manager.resume_memory_occupation(obj, None)
)

def freeze_gc(self):
"""
To maintain a high performance server with low latency, we want to reduce the
stalls caused by the garbage collector scanning through a large number of objects.

It is usually helpful to start the server and warm it up with real requests to
initialize many of the long-lived objects that do not need to be garbage collected.

After sufficient warmup, we can call this function to freeze the garbage collector
so that all objects created before this point are considered out of scope for garbage
collection.
"""

loop = asyncio.get_event_loop()
loop.run_until_complete(self.tokenizer_manager.freeze_gc())


"""
Execute an RPC call on all scheduler processes.
"""
Expand Down
11 changes: 11 additions & 0 deletions python/sglang/srt/entrypoints/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,17 @@ async def stop_profile_async():
)


@app.api_route("/freeze_gc", methods=["GET", "POST"])
async def freeze_gc_async():
"""
See engine.freeze_gc for more details.
"""
await _global_state.tokenizer_manager.freeze_gc()
return Response(
content="Garbage collection frozen.\n",
status_code=200,
)

@app.api_route("/start_expert_distribution_record", methods=["GET", "POST"])
async def start_expert_distribution_record_async():
"""Start recording the expert distribution. Clear the previous record if any."""
Expand Down
11 changes: 10 additions & 1 deletion python/sglang/srt/managers/detokenizer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
BatchMultimodalOut,
BatchStrOut,
BatchTokenIDOut,
FreezeGCReq,
FreezeGCReqOutput,
)
from sglang.srt.server_args import PortArgs, ServerArgs
from sglang.srt.utils import (
configure_logger,
freeze_gc,
get_zmq_socket,
kill_itself_when_parent_died,
)
Expand Down Expand Up @@ -100,6 +103,7 @@ def __init__(
(BatchEmbeddingOut, self.handle_batch_embedding_out),
(BatchTokenIDOut, self.handle_batch_token_id_out),
(BatchMultimodalDecodeReq, self.handle_multimodal_decode_req),
(FreezeGCReq, self.handle_freeze_gc_req),
]
)

Expand All @@ -108,7 +112,8 @@ def event_loop(self):
while True:
recv_obj = self.recv_from_scheduler.recv_pyobj()
output = self._request_dispatcher(recv_obj)
self.send_to_tokenizer.send_pyobj(output)
if output is not None:
self.send_to_tokenizer.send_pyobj(output)

def trim_matched_stop(
self, output: Union[str, List[int]], finished_reason: Dict, no_stop_trim: bool
Expand Down Expand Up @@ -247,6 +252,10 @@ def handle_multimodal_decode_req(self, recv_obj: BatchMultimodalDecodeReq):
cached_tokens=recv_obj.cached_tokens,
)

def handle_freeze_gc_req(self, recv_req: FreezeGCReq):
freeze_gc("Detokenizer Manager")
return None


class LimitedCapacityDict(OrderedDict):
def __init__(self, capacity: int, *args, **kwargs):
Expand Down
11 changes: 11 additions & 0 deletions python/sglang/srt/managers/io_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,17 @@ class ProfileReqOutput:
message: str


@dataclass
class FreezeGCReq:
pass


@dataclass
class FreezeGCReqOutput:
success: bool
message: str


@dataclass
class ConfigureLoggingReq:
log_requests: Optional[bool] = None
Expand Down
11 changes: 11 additions & 0 deletions python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
ExpertDistributionReqOutput,
FlushCacheReqInput,
FlushCacheReqOutput,
FreezeGCReq,
FreezeGCReqOutput,
GetInternalStateReq,
GetInternalStateReqOutput,
GetWeightsByNameReqInput,
Expand Down Expand Up @@ -143,6 +145,7 @@
configure_gc_logger,
configure_logger,
disable_request_logging,
freeze_gc,
get_available_gpu_memory,
get_bool_env_var,
get_zmq_socket,
Expand Down Expand Up @@ -514,6 +517,7 @@ def __init__(
(ResumeMemoryOccupationReqInput, self.resume_memory_occupation),
(SlowDownReqInput, self.slow_down),
(ProfileReq, self.profile),
(FreezeGCReq, self.handle_freeze_gc),
(GetInternalStateReq, self.get_internal_state),
(SetInternalStateReq, self.set_internal_state),
(RpcReqInput, self.handle_rpc_request),
Expand Down Expand Up @@ -2447,6 +2451,13 @@ def current_scheduler_metrics_enabled(self):
def maybe_sleep_on_idle(self):
if self.idle_sleeper is not None:
self.idle_sleeper.maybe_sleep()
def handle_freeze_gc(self, recv_req: FreezeGCReq):
"""Handle freeze_gc request: freeze scheduler's GC and forward to detokenizer."""
freeze_gc("Scheduler")

self.send_to_detokenizer.send_pyobj(recv_req)

return FreezeGCReqOutput(success=True, message="Scheduler GC frozen and request forwarded to detokenizer")


class IdleSleeper:
Expand Down
25 changes: 24 additions & 1 deletion python/sglang/srt/managers/tokenizer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
ExpertDistributionReqOutput,
FlushCacheReqInput,
FlushCacheReqOutput,
FreezeGCReq,
FreezeGCReqOutput,
GenerateReqInput,
GetInternalStateReq,
GetInternalStateReqOutput,
Expand Down Expand Up @@ -126,14 +128,15 @@
get_bool_env_var,
get_zmq_socket,
kill_process_tree,
freeze_gc,
configure_gc_warning,
)
from sglang.utils import TypeBasedDispatcher, get_exception_traceback

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

logger = logging.getLogger(__name__)


@dataclasses.dataclass
class ReqState:
"""Store the state a request."""
Expand Down Expand Up @@ -353,6 +356,10 @@ def __init__(
collect_tokens_histogram=self.server_args.collect_tokens_histogram,
)

# Configure GC warning
if self.server_args.gc_warning_threshold_secs > 0.0:
configure_gc_warning(self.server_args.gc_warning_threshold_secs)

# Communicators
self.init_weights_update_group_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
Expand Down Expand Up @@ -381,6 +388,9 @@ def __init__(
self.profile_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
)
self.freeze_gc_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
)
self.get_internal_state_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
)
Expand Down Expand Up @@ -447,6 +457,10 @@ def __init__(
ProfileReqOutput,
self.profile_communicator.handle_recv,
),
(
FreezeGCReqOutput,
self.freeze_gc_communicator.handle_recv,
),
(
GetInternalStateReqOutput,
self.get_internal_state_communicator.handle_recv,
Expand Down Expand Up @@ -1338,6 +1352,15 @@ def configure_logging(self, obj: ConfigureLoggingReq):
logging.info(f"Config logging: {obj=}")
self.log_request_metadata = self.get_log_request_metadata()

async def freeze_gc(self):
"""Send a freeze_gc message to the scheduler first, then freeze locally."""
req = FreezeGCReq()
result = (await self.freeze_gc_communicator(req))[0]

freeze_gc("Tokenizer Manager")
return result


def create_abort_task(self, obj: GenerateReqInput):
# Abort the request if the client is disconnected.
async def abort_request():
Expand Down
7 changes: 7 additions & 0 deletions python/sglang/srt/server_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class ServerArgs:
decode_log_interval: int = 40
enable_request_time_stats_logging: bool = False
kv_events_config: Optional[str] = None
gc_warning_threshold_secs: float = 0.0

# API related
api_key: Optional[str] = None
Expand Down Expand Up @@ -1144,6 +1145,12 @@ def add_cli_args(parser: argparse.ArgumentParser):
default=ServerArgs.collect_tokens_histogram,
help="Collect prompt/generation tokens histogram.",
)
parser.add_argument(
"--gc-warning-threshold-secs",
type=float,
default=ServerArgs.gc_warning_threshold_secs,
help="The threshold for long GC warning. If a GC takes longer than this, a warning will be logged. Set to 0 to disable.",
)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using a negative number to disable it.

parser.add_argument(
"--decode-log-interval",
type=int,
Expand Down
36 changes: 36 additions & 0 deletions python/sglang/srt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2598,6 +2598,42 @@ def dynamic_import(func_path: str):
func = getattr(module, func_name)
return func

def gc_object_counts():
import gc

g0 = len(gc.get_objects(0))
g1 = len(gc.get_objects(1))
g2 = len(gc.get_objects(2))
return g0, g1, g2

def configure_gc_warning(warn_threshold_secs):
import gc

gc_start_time = {}

def gc_callback(phase, info):
gen = info.get("generation", "?")
if phase == "start":
gc_start_time[gen] = time.time()
elif phase == "stop":
duration = time.time() - gc_start_time.get(gen, time.time())
if duration > warn_threshold_secs:
g0, g1, g2 = gc_object_counts()
logger.warn(
f"LONG GARBAGE COLLECTION DETECTED | Generation {gen} | Duration: {duration:.4f}s | # Objects: gen0={g0}, gen1={g1}, gen2={g2} | "
f"This may cause latency jitter. Consider calling the freeze_gc API after sending a few warmup requests."
)
Comment thread
chanh marked this conversation as resolved.

def freeze_gc(context: str):
import gc

g0_before, g1_before, g2_before = gc_object_counts()
gc.freeze()
g0_after, g1_after, g2_after = gc_object_counts()
logger.info(f"Freezing GC in {context} process. "
f"gen0: {g0_before}->{g0_after}, "
f"gen1: {g1_before}->{g1_after}, "
f"gen2: {g2_before}->{g2_after}")

def configure_gc_logger():
logger.info("Enable GC Logger")
Expand Down