Skip to content
14 changes: 14 additions & 0 deletions python/sglang/srt/entrypoints/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
ResumeMemoryOccupationReqInput,
SeparateReasoningReqInput,
SetInternalStateReq,
SlowDownReqInput,
UpdateWeightFromDiskReqInput,
UpdateWeightsFromDistributedReqInput,
UpdateWeightsFromTensorReqInput,
Expand Down Expand Up @@ -494,6 +495,19 @@ async def resume_memory_occupation(
return _create_error_response(e)


@app.api_route("/slow_down", methods=["GET", "POST"])
async def slow_down(obj: SlowDownReqInput, request: Request):
"""Slow down the system deliberately. Only for testing. Example scenario:
when we want to test performance of D in large-scale PD disaggregation and have no enough nodes for P,
we can use this to slow down D to let it have enough running sequences, and then disable slowdown
to let it run in full batch size.
"""
try:
await _global_state.tokenizer_manager.slow_down(obj, request)
except Exception as e:
return _create_error_response(e)


@app.api_route("/open_session", methods=["GET", "POST"])
async def open_session(obj: OpenSessionReqInput, request: Request):
"""Open a session, and return its unique session id."""
Expand Down
10 changes: 10 additions & 0 deletions python/sglang/srt/managers/io_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,16 @@ class ResumeMemoryOccupationReqOutput:
pass


@dataclass
class SlowDownReqInput:
forward_sleep_time: Optional[float]


@dataclass
class SlowDownReqOutput:
pass


@dataclass
class AbortReq:
# The request id
Expand Down
16 changes: 16 additions & 0 deletions python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
RpcReqOutput,
SetInternalStateReq,
SetInternalStateReqOutput,
SlowDownReqInput,
SlowDownReqOutput,
TokenizedEmbeddingReqInput,
TokenizedGenerateReqInput,
UpdateWeightFromDiskReqInput,
Expand Down Expand Up @@ -417,6 +419,8 @@ def __init__(
self.profiler_id: Optional[str] = None
self.profiler_target_forward_ct: Optional[int] = None

self.forward_sleep_time = None

# Init metrics stats
self.init_metrics()

Expand All @@ -439,6 +443,7 @@ def __init__(
(GetWeightsByNameReqInput, self.get_weights_by_name),
(ReleaseMemoryOccupationReqInput, self.release_memory_occupation),
(ResumeMemoryOccupationReqInput, self.resume_memory_occupation),
(SlowDownReqInput, self.slow_down),
(ProfileReq, self.profile),
(GetInternalStateReq, self.get_internal_state),
(SetInternalStateReq, self.set_internal_state),
Expand Down Expand Up @@ -1536,6 +1541,10 @@ def run_batch(
):
self.stop_profile()

if self.forward_sleep_time is not None:
logger.info(f"Scheduler.run_batch sleep {self.forward_sleep_time}s")
time.sleep(self.forward_sleep_time)

# Run forward
if self.is_generation:
if self.spec_algorithm.is_none():
Expand Down Expand Up @@ -2011,6 +2020,13 @@ def resume_memory_occupation(self, recv_req: ResumeMemoryOccupationReqInput):
del self.stashed_model_static_state
return ResumeMemoryOccupationReqOutput()

def slow_down(self, recv_req: SlowDownReqInput):
t = recv_req.forward_sleep_time
if t is not None and t <= 0:
t = None
self.forward_sleep_time = t
return SlowDownReqOutput()

def profile(self, recv_req: ProfileReq):
if recv_req.type == ProfileReqType.START_PROFILE:
return self.start_profile(
Expand Down
17 changes: 17 additions & 0 deletions python/sglang/srt/managers/tokenizer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
ResumeMemoryOccupationReqInput,
ResumeMemoryOccupationReqOutput,
SessionParams,
SlowDownReqInput,
SlowDownReqOutput,
TokenizedEmbeddingReqInput,
TokenizedGenerateReqInput,
UpdateWeightFromDiskReqInput,
Expand Down Expand Up @@ -269,6 +271,9 @@ def __init__(
self.resume_memory_occupation_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
)
self.slow_down_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
)
self.flush_cache_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
)
Expand Down Expand Up @@ -322,6 +327,10 @@ def __init__(
ResumeMemoryOccupationReqOutput,
self.resume_memory_occupation_communicator.handle_recv,
),
(
SlowDownReqOutput,
self.slow_down_communicator.handle_recv,
),
(
FlushCacheReqOutput,
self.flush_cache_communicator.handle_recv,
Expand Down Expand Up @@ -880,6 +889,14 @@ async def resume_memory_occupation(
self.auto_create_handle_loop()
await self.resume_memory_occupation_communicator(obj)

async def slow_down(
self,
obj: SlowDownReqInput,
request: Optional[fastapi.Request] = None,
):
self.auto_create_handle_loop()
await self.slow_down_communicator(obj)

async def open_session(
self, obj: OpenSessionReqInput, request: Optional[fastapi.Request] = None
):
Expand Down
Loading