Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 15 additions & 31 deletions vllm/v1/worker/gpu/model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,15 +712,26 @@ def add_requests(self, scheduler_output: SchedulerOutput) -> None:
self.sampler.apply_staged_writes()

def update_requests(self, scheduler_output: SchedulerOutput) -> None:
# Add new blocks for the existing requests.
# Add new blocks and update num_computed_tokens for the existing requests.
reqs = scheduler_output.scheduled_cached_reqs
for req_new_block_ids, req_id in zip(reqs.new_block_ids, reqs.req_ids):
num_computed_tokens_np = self.req_states.num_computed_tokens_np
for req_id, num_computed_tokens, req_new_block_ids in zip(
reqs.req_ids, reqs.num_computed_tokens, reqs.new_block_ids
):
req_index = self.req_states.req_id_to_index[req_id]
num_computed_tokens_np[req_index] = num_computed_tokens
Comment on lines +718 to +722

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The loop in update_requests now performs a dictionary lookup (req_id_to_index) and a scalar assignment to num_computed_tokens_np for every cached request. While this is necessary to refresh the CPU mirror from the scheduler's state, it could be a performance bottleneck if the number of cached requests is very large. Consider if this can be vectorized in the future, although the current implementation is correct for fixing the divergence issue.

if req_new_block_ids is not None:
req_index = self.req_states.req_id_to_index[req_id]
self.block_tables.append_block_ids(
req_index, req_new_block_ids, overwrite=False
)

# Update num_computed_prefill_tokens.
np.minimum(
self.req_states.num_computed_tokens_np,
self.req_states.prefill_len.np,
out=self.req_states.num_computed_prefill_tokens,
)

def prepare_inputs(
self, scheduler_output: SchedulerOutput, batch_desc: BatchExecutionDescriptor
) -> InputBatch:
Expand Down Expand Up @@ -787,10 +798,7 @@ def prepare_inputs(
async_copy_to_gpu(query_start_loc_np, out=self.input_buffers.query_start_loc)
query_start_loc_np = query_start_loc_np[: num_reqs_padded + 1]
query_start_loc = self.input_buffers.query_start_loc[: num_reqs_padded + 1]
is_prefilling_np = (
self.req_states.num_computed_prefill_tokens[idx_mapping_np]
< self.req_states.prefill_len.np[idx_mapping_np]
)
is_prefilling_np = self.req_states.is_prefilling(idx_mapping_np)

# Get prefill tokens if any.
if np.any(is_prefilling_np):
Expand Down Expand Up @@ -973,18 +981,6 @@ def postprocess(
self.req_states.total_len.gpu,
)

# Update the number of computed prefill tokens.
idx_mapping_np = input_batch.idx_mapping_np
computed_prefill = self.req_states.num_computed_prefill_tokens
computed_prefill[idx_mapping_np] += input_batch.num_scheduled_tokens
np.minimum(
computed_prefill, self.req_states.prefill_len.np, out=computed_prefill
)
# Advance the CPU mirror optimistically (assume all scheduled accepted).
self.req_states.num_computed_tokens_np[idx_mapping_np] += (
input_batch.num_scheduled_tokens
)

self.model_state.postprocess_state(input_batch, num_sampled)

@torch.inference_mode()
Expand Down Expand Up @@ -1357,18 +1353,6 @@ def postprocess_pool(self, input_batch: InputBatch) -> None:
input_batch.query_start_loc,
)

# Update the number of computed prefill tokens.
idx_mapping_np = input_batch.idx_mapping_np
computed_prefill = self.req_states.num_computed_prefill_tokens
computed_prefill[idx_mapping_np] += input_batch.num_scheduled_tokens
np.minimum(
computed_prefill, self.req_states.prefill_len.np, out=computed_prefill
)
# Advance the CPU mirror optimistically (assume all scheduled accepted).
self.req_states.num_computed_tokens_np[idx_mapping_np] += (
input_batch.num_scheduled_tokens
)

def shutdown(self) -> None:
"""Release GPU tensors (model weights, KV caches, workspace) so that
memory is reclaimable when running in the same process."""
Expand Down
7 changes: 3 additions & 4 deletions vllm/v1/worker/gpu/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ def add_request(
self.num_computed_prefill_tokens[req_idx] = num_computed_tokens
self.num_computed_tokens_np[req_idx] = num_computed_tokens
self.num_computed_tokens.stage_write_elem(req_idx, num_computed_tokens)
self.num_computed_tokens_np[req_idx] = num_computed_tokens

if num_computed_tokens > 0 and num_computed_tokens <= prefill_len:
if 0 < num_computed_tokens <= prefill_len:
# For PD disagg or resumed requests: set last_sampled to the last
# computed token so the first decode step gets the right input_id.
# For fresh prefill requests (num_computed_tokens == 0) the tensor
Expand Down Expand Up @@ -134,8 +133,8 @@ def remove_request(self, req_id: str) -> bool:
self.free_indices.append(req_idx)
return True

def any_prefills(self, idx_mapping_np: np.ndarray) -> bool:
return np.any(
def is_prefilling(self, idx_mapping_np: np.ndarray) -> np.ndarray:
return (
self.num_computed_prefill_tokens[idx_mapping_np]
< self.prefill_len.np[idx_mapping_np]
)
Loading