Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions python/sglang/srt/disaggregation/nixl/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,21 +688,24 @@ def transfer_worker(self, queue: FastQueue, staging_buffer=None):

handles.append(kv_xfer_handle)

if kv_chunk.is_last and kv_chunk.state_indices:
if kv_chunk.is_last:
dst_info = self.decode_kv_args_table[req.agent_name]
state_xfer_handles = self.maybe_send_extra(
req.agent_name,
kv_chunk.state_indices,
dst_info.dst_state_data_ptrs,
req.dst_state_indices,
dst_info.gpu_id,
f"{req.room}_state_{self.kv_args.engine_rank}",
decode_tp_size,
decode_tp_rank=dst_info.decode_tp_rank,
dst_state_item_lens=dst_info.dst_state_item_lens,
dst_state_dim_per_tensor=dst_info.dst_state_dim_per_tensor,
)
handles.extend(h for h in state_xfer_handles if h is not None)
if kv_chunk.state_indices:
state_xfer_handles = self.maybe_send_extra(
req.agent_name,
kv_chunk.state_indices,
dst_info.dst_state_data_ptrs,
req.dst_state_indices,
dst_info.gpu_id,
f"{req.room}_state_{self.kv_args.engine_rank}",
decode_tp_size,
decode_tp_rank=dst_info.decode_tp_rank,
dst_state_item_lens=dst_info.dst_state_item_lens,
dst_state_dim_per_tensor=dst_info.dst_state_dim_per_tensor,
)
handles.extend(
h for h in state_xfer_handles if h is not None
)

if kv_chunk.prefill_aux_index is None:
raise RuntimeError("Missing aux index for last chunk")
Expand Down Expand Up @@ -1974,8 +1977,11 @@ def send_metadata(
]
)

# Mark that we expect state data if state_indices was provided
if state_indices is not None:
# Mark that we expect state data if state_indices was provided.
# Match the prefill-side truthy check: an empty list means the
# model has no state types (e.g. dense LLaMA/Qwen), and prefill
# won't send state notifs, so we must not expect them.
if state_indices:
self.kv_mgr.transfer_statuses[self.bootstrap_room].expects_state = True

self.started_transfer = True
Expand Down
Loading