Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
de38596
Prep: abort_request dedup for chunked-resume dual-queue holding
fzyzcjy May 13, 2026
c79a73b
Prep: subtract prefix_indices from waiting_queue pending tokens sum
fzyzcjy May 13, 2026
a5915a1
Prep: document filter_batch chunked-exclusion invariant
fzyzcjy May 13, 2026
1c3bf8e
Bound cache_unfinished_req row read by kv_committed_len
fzyzcjy May 13, 2026
9b361ae
Drop is_chunked from req_to_token_pool alloc assert
fzyzcjy May 13, 2026
74f1d8b
Unify chunked admission via add_one_req reuse branch + add has_pendin…
fzyzcjy May 13, 2026
c445a82
Switch chunked-resume to waiting_queue holding; delete chunked_req fi…
fzyzcjy May 13, 2026
b9d5d6e
refactor: rename Req.is_chunked -> Req.pending_middle_outputs
fzyzcjy May 13, 2026
f038893
Fix retract_all passing List[Req] to filter_batch as keep_indices
fzyzcjy May 13, 2026
fd3dcca
Refactor filter_batch to use explicit exclude_chunked_req flag
fzyzcjy May 13, 2026
a79ba1b
Tighten add_one_req reuse gate to has_pending_chunk
fzyzcjy May 13, 2026
d7fa48b
Reset host_hit_length unconditionally in prepare_for_extend
fzyzcjy May 13, 2026
aaf3752
Skip chunked-resume reqs in calc_priority prefix matching
fzyzcjy May 13, 2026
359e5ed
Skip chunked-resume reqs in _abort_on_waiting_timeout
fzyzcjy May 13, 2026
5ed4faf
Bypass LoRA scheduling gate for chunked-resume reqs
fzyzcjy May 13, 2026
dbdcdde
Skip mamba_pool_idx cleanup for chunked-resume on NO_TOKEN
fzyzcjy May 13, 2026
36ec1d7
Widen merge_batch assert to match filter_batch predicate
fzyzcjy May 13, 2026
116584e
Bound streaming-session chunked stash by kv_committed_len
fzyzcjy May 13, 2026
96d4749
Release row + KV + lock_ref when aborting a chunked-resume req from w…
fzyzcjy May 13, 2026
bf5b4e9
Give chunked-resume reqs priority in LPM and DFS_WEIGHT sorts
fzyzcjy May 13, 2026
f38e69f
Extend pause(retract) to waiting chunked-resume reqs
fzyzcjy May 13, 2026
414efd4
Reset disagg send-side state on chunked-resume retract
fzyzcjy May 13, 2026
b433e1e
Count chunked-resume tail in runtime mem check (page_size > 1)
fzyzcjy May 13, 2026
f0af510
Document filter_batch(exclude_chunked_req=True) at every call site
fzyzcjy May 13, 2026
b823c16
Include PP microbatch reqs in abort_request batch_rids dedup
fzyzcjy May 13, 2026
678bba2
Document why Stage A chunk-stash runs at iter boundary instead of end…
fzyzcjy May 13, 2026
34c02d6
Filter chunked-resume reqs from split_prefill_batch before pdmux merge
fzyzcjy May 13, 2026
2868334
Apply black-jupyter formatting (CI lint fixup)
fzyzcjy May 13, 2026
daf9c42
Remove v1 SWA chunked-req stash gate test (gate was deleted in v2)
fzyzcjy May 13, 2026
a94e842
Drop v1 has_chunked_req kwarg + delete v1 add_chunked_req SWA tests
fzyzcjy May 13, 2026
02b1785
Guard _handle_finished_req against PP cross-microbatch double-finalize
fzyzcjy May 13, 2026
b0f2138
Seed has_pending_chunk/is_dllm/host_hit_length on test_prefill_adder …
fzyzcjy May 13, 2026
33f981c
Re-add ScheduleBatch.chunked_req marker for PP cross-mb filter exclusion
fzyzcjy May 14, 2026
11db3a4
Revert "Re-add ScheduleBatch.chunked_req marker for PP cross-mb filte…
fzyzcjy May 14, 2026
b3a7b9f
Bump pending_middle_outputs for last-chunk admits + decrement-first o…
fzyzcjy May 14, 2026
e875cd3
Revert "Bump pending_middle_outputs for last-chunk admits + decrement…
fzyzcjy May 14, 2026
5c52304
Exclude in-flight other-mb reqs in filter_batch (PP chunked-resume race)
fzyzcjy May 14, 2026
45347ca
Revert "Exclude in-flight other-mb reqs in filter_batch (PP chunked-r…
fzyzcjy May 14, 2026
69ef71e
Conditionally exclude in-flight other-mb chunked-resume reqs (PP, max…
fzyzcjy May 14, 2026
bdfb904
Merge upstream/main into feat/stateless_scheduler_b
fzyzcjy May 19, 2026
f5363d4
Merge branch 'main' into feat/stateless_scheduler_b
fzyzcjy May 19, 2026
14adb09
Rename inflight_middle_chunks -> pending_middle_outputs (revert upstr…
fzyzcjy May 19, 2026
be72b26
Fix Scheduler.pp_size refs: use ps.pp_size after ParallelState refactor
fzyzcjy May 19, 2026
5cc1a41
Merge branch 'main' into feat/stateless_scheduler_b
fzyzcjy May 19, 2026
f531ac9
Merge branch 'main' into feat/stateless_scheduler_b
fzyzcjy May 20, 2026
309b6dc
Merge branch 'main' into feat/stateless_scheduler_b
fzyzcjy May 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions python/sglang/srt/disaggregation/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,8 @@ def alloc(self, reqs: List["Req"]) -> Optional[List[int]]:
len(reusing) <= 1
), "only one chunked request may reuse req_pool_idx in a batch"
assert all(
reqs[i].inflight_middle_chunks > 0 or reqs[i].kv_committed_len > 0
for i in reusing
), "reusing request must be chunked or have committed KV"
reqs[i].kv_committed_len > 0 for i in reusing
), "reusing request must have committed KV"

need_size = len(reqs) - len(reusing)
if need_size > len(self.free_slots):
Expand Down Expand Up @@ -1655,11 +1654,16 @@ def get_next_disagg_decode_batch_to_run(
# Process pending prebuilt batch: output processing + filter + merge
new_prebuilt_batch = self.get_new_prebuilt_batch()
if new_prebuilt_batch:
assert self.chunked_req is None
assert not any(r.has_pending_chunk for r in self.waiting_queue)
self.batch_result_processor.process_batch_result_prebuilt(
new_prebuilt_batch
)
new_prebuilt_batch.filter_batch()
# Defensive: chunked prefill is a prefill-side concept; decode-side
# prebuilt batches shouldn't carry has_pending_chunk reqs. The
# assert above already guards waiting_queue; this flag protects
# against any future code that would route a chunked req through
# the disagg decode path.
new_prebuilt_batch.filter_batch(exclude_chunked_req=True)
if not new_prebuilt_batch.is_empty():
if self.running_batch.is_empty():
self.running_batch = new_prebuilt_batch
Expand Down
46 changes: 23 additions & 23 deletions python/sglang/srt/disaggregation/prefill.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ def process_batch_result_disagg_prefill(
for i, (req, next_token_id) in enumerate(
zip(batch.reqs, next_token_ids, strict=True)
):
if req.inflight_middle_chunks <= 0:
if req.pending_middle_outputs <= 0:
req.time_stats.set_prefill_finished_time()

# There is no output_ids for prefill
Expand Down Expand Up @@ -564,7 +564,7 @@ def process_batch_result_disagg_prefill(
req.grammar.finished = req.finished()
else:
# being chunked reqs' prefill is not finished
req.inflight_middle_chunks -= 1
req.pending_middle_outputs -= 1

if req.return_logprob:
extend_logprob_start_len = extend_logprob_start_len_per_req[i]
Expand Down Expand Up @@ -725,30 +725,30 @@ def get_transferred_rids(self: Scheduler) -> List[str]:
return transferred_rids

def process_prefill_chunk(self: Scheduler) -> None:
chunked_req_to_exclude = set()
if self.chunked_req:
chunked_req_to_exclude.add(self.chunked_req)
maybe_cache_unfinished_req(self.chunked_req, self.tree_cache, chunked=True)
if self.enable_overlap:
# Delay KV transfer to process_batch_result_disagg_prefill when overlap is enabled to ensure results are resolved
self.chunked_req.tmp_end_idx = min(
len(self.chunked_req.fill_ids),
len(self.chunked_req.origin_input_ids),
)
else:
self.send_kv_chunk(self.chunked_req)
self.running_batch.batch_is_full = False
# Per-req stash for any in-flight chunked-resume reqs (now sitting in
# the waiting_queue with has_pending_chunk == True).
for req in self.waiting_queue:
if req.has_pending_chunk and not req.is_dllm():
maybe_cache_unfinished_req(req, self.tree_cache, chunked=True)
if self.enable_overlap:
# Delay KV transfer to process_batch_result_disagg_prefill
# when overlap is enabled to ensure results are resolved.
req.tmp_end_idx = min(
len(req.fill_ids),
len(req.origin_input_ids),
)
else:
self.send_kv_chunk(req)
self.running_batch.batch_is_full = False

if self.last_batch and self.last_batch.forward_mode.is_extend():
if self.last_batch.chunked_req:
# In the context pipeline parallelism, after the last chunk, the current microbatch still track outdated chunked_req.
# We need to discard it.
chunked_req_to_exclude.add(self.last_batch.chunked_req)

last_bs = self.last_batch.batch_size()
self.last_batch.filter_batch(
chunked_req_to_exclude=list(chunked_req_to_exclude)
)
# Drop chunked-resume reqs from last_batch — running_batch runs
# decode forward and admitting a mid-prefill req there breaks
# shape + KV accounting. The dropped reqs stay in
# self.waiting_queue (chunked-resume retention) and re-enter via
# the next iter's Stage A stash + admission cycle.
self.last_batch.filter_batch(exclude_chunked_req=True)
if self.last_batch.batch_size() < last_bs:
self.running_batch.batch_is_full = False

Expand Down
7 changes: 3 additions & 4 deletions python/sglang/srt/dllm/mixin/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def _update_state_for_batch(

if can_run_list:
self.dllm_manager.add_staging_reqs(can_run_list)
self.dllm_manager.increment_inflight_middle_chunks()
self.dllm_manager.increment_pending_middle_outputs()

self.adder = adder
self.can_run_list = can_run_list
Expand Down Expand Up @@ -259,7 +259,6 @@ def process_dllm_incoming_reqs(
req.init_next_round_input(self.tree_cache)
res = adder.add_one_req(
req,
has_chunked_req=True,
truncation_align_size=self.truncation_align_size,
)

Expand Down Expand Up @@ -339,10 +338,10 @@ def is_empty(self) -> bool:
return True
return len(self.waiting_queue) == 0

def increment_inflight_middle_chunks(self) -> None:
def increment_pending_middle_outputs(self) -> None:
"""Increment chunked count for all staging requests."""
for req in self.staging_queue:
req.inflight_middle_chunks += 1
req.pending_middle_outputs += 1

def filter_finished_reqs(self) -> None:
"""Remove finished requests from both queues."""
Expand Down
73 changes: 56 additions & 17 deletions python/sglang/srt/managers/schedule_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,10 +823,22 @@ def __init__(
# The prefix length that is inserted into the tree cache
self.cache_protected_len: int = 0

# Whether or not if it is chunked. It increments whenever
# it is chunked, and decrement whenever chunked request is
# processed.
self.inflight_middle_chunks = 0
# Counter of middle-block prefill forwards that have been admitted
# but not yet output-processed for this req. Increments at admission
# for non-last chunks; decrements at output_processor. In PP, can
# exceed 1 because multiple microbatches may hold the same chunked
# req in flight concurrently. In non-PP, oscillates 0/1 within each
# iter. Used by output_processor to know whether this forward's
# sample is real (==0) or garbage (>0).
self.pending_middle_outputs = 0

# Persistent (cross-iter) flag set by admission when this req's
# current admission was truncated (more chunks remain). Cleared
# when last chunk is admitted (truncated=False) or on retract.
# Used by Stage A stash detection, filter_batch exclusion, and
# add_one_req's reuse-vs-fresh branch. Independent of pending_middle_outputs
# counter (transient) and kv_committed_len (derived).
self.has_pending_chunk = False

# For retraction
self.is_retracted = False
Expand Down Expand Up @@ -1319,7 +1331,8 @@ def reset_for_retract(self):
self.temp_input_top_logprobs_val = None
self.temp_input_top_logprobs_idx = None
self.extend_logprob_start_len = 0
self.inflight_middle_chunks = 0
self.pending_middle_outputs = 0
self.has_pending_chunk = False
self.mamba_pool_idx = None
self.mamba_ping_pong_track_buffer = None
self.mamba_next_track_idx = None
Expand All @@ -1335,6 +1348,14 @@ def reset_for_retract(self):
self.swa_evicted_seqlen = 0
self.extend_batch_idx = 0
self.decode_batch_idx = 0
# Disagg-prefill send-side bookkeeping. The pre-v2 retract path never
# ran against a req that had started sending (retract only touched
# running_batch), so these stayed at init values. After v2 added
# pause(retract) coverage for waiting chunked-resume reqs, a retracted
# disagg-prefill req's stale start_send_idx would index garbage in the
# new row on re-prefill.
self.start_send_idx = 0
self.tmp_end_idx = -1

# When using input_embeds, we cannot easily mix the original input embeddings
# with the newly generated output token IDs during re-prefill of retracted request.
Expand Down Expand Up @@ -1485,9 +1506,6 @@ class ScheduleBatch(ScheduleBatchDisaggregationDecodeMixin):
# This is an optimization to reduce the overhead of the prefill check.
batch_is_full: bool = False

# For chunked prefill in PP
chunked_req: Optional[Req] = None

# Sampling info
sampling_info: SamplingBatchInfo = None

Expand Down Expand Up @@ -1628,7 +1646,6 @@ def init_new(
model_config: ModelConfig,
enable_overlap: bool,
spec_algorithm: SpeculativeAlgorithm,
chunked_req: Optional[Req] = None,
dllm_config: Optional[DllmConfig] = None,
):
return_logprob = any(req.return_logprob for req in reqs)
Expand All @@ -1654,7 +1671,6 @@ def init_new(
return_routed_experts=any(req.return_routed_experts for req in reqs),
return_indexer_topk=any(req.return_indexer_topk for req in reqs),
is_prefill_only=all(req.is_prefill_only for req in reqs),
chunked_req=chunked_req,
dllm_config=dllm_config,
)
return batch
Expand Down Expand Up @@ -1931,6 +1947,13 @@ def prepare_for_extend(self):
req._cache_breakdown_computed = True

req.already_computed = seq_len
# Reset host_hit_length after init_load_back consumed it so that
# subsequent chunks' admissions skip init_load_back (host KV
# already loaded). Runs unconditionally: post-retract reqs have
# retracted_stain=True (skipping the outer block) but still
# match_prefix + init_load_back on their re-admission, so the
# reset must apply to them too.
req.host_hit_length = 0
req.is_retracted = False

if get_global_server_args().enable_mamba_extra_buffer():
Expand Down Expand Up @@ -2278,7 +2301,7 @@ def retract_all(self, server_args: ServerArgs):
for idx in range(len(self.reqs)):
self.release_req(idx, len(self.reqs) - idx, server_args)

self.filter_batch(retracted_reqs)
self.filter_batch(keep_indices=[])
return retracted_reqs

def retract_decode(
Expand Down Expand Up @@ -2486,21 +2509,27 @@ def prepare_for_decode(self):

def filter_batch(
self,
chunked_req_to_exclude: Optional[Union[Req, List[Req]]] = None,
keep_indices: Optional[List[int]] = None,
# FIXME(lsyin): deprecate this API after spec v1 is deprecated
v1_spec_info_filtered: Optional[bool] = False,
exclude_chunked_req: bool = False,
exclude_in_flight_other_mb: Optional[set] = None,
):
if keep_indices is None:
if isinstance(chunked_req_to_exclude, Req):
chunked_req_to_exclude = [chunked_req_to_exclude]
elif chunked_req_to_exclude is None:
chunked_req_to_exclude = []
in_flight_rids = exclude_in_flight_other_mb or set()
keep_indices = [
i
for i in range(len(self.reqs))
if not self.reqs[i].finished()
and self.reqs[i] not in chunked_req_to_exclude
and not (
exclude_chunked_req
and (
self.reqs[i].has_pending_chunk
or self.reqs[i].pending_middle_outputs > 0
or self.reqs[i].is_dllm()
)
)
and self.reqs[i].rid not in in_flight_rids
]

if keep_indices is None or len(keep_indices) == 0:
Expand Down Expand Up @@ -2566,6 +2595,16 @@ def filter_batch(
)

def merge_batch(self, other: "ScheduleBatch"):
# Caller must filter_batch(exclude_chunked_req=True) on the other batch
# before merging — running_batch runs decode forward and admitting a
# prefill-in-progress req there breaks shape + KV accounting. Mirror
# the full exclude_chunked_req predicate so PP middle-chunk and DLLM
# staging reqs are also caught here.
assert not any(
r.has_pending_chunk or r.pending_middle_outputs > 0 or r.is_dllm()
for r in other.reqs
)

# Penalizer orchestrator must be merged before Batch.reqs is merged. This is because
# orchestrator.merge() depends on Batch.reqs during preparation of each penalizers, so it
# needs to be called with pre-merged Batch.reqs.
Expand Down
Loading
Loading