diff --git a/vllm/v1/worker/gpu/model_runner.py b/vllm/v1/worker/gpu/model_runner.py index b81bd0dc59fc..6485444e6a2f 100644 --- a/vllm/v1/worker/gpu/model_runner.py +++ b/vllm/v1/worker/gpu/model_runner.py @@ -712,15 +712,26 @@ def add_requests(self, scheduler_output: SchedulerOutput) -> None: self.sampler.apply_staged_writes() def update_requests(self, scheduler_output: SchedulerOutput) -> None: - # Add new blocks for the existing requests. + # Add new blocks and update num_computed_tokens for the existing requests. reqs = scheduler_output.scheduled_cached_reqs - for req_new_block_ids, req_id in zip(reqs.new_block_ids, reqs.req_ids): + num_computed_tokens_np = self.req_states.num_computed_tokens_np + for req_id, num_computed_tokens, req_new_block_ids in zip( + reqs.req_ids, reqs.num_computed_tokens, reqs.new_block_ids + ): + req_index = self.req_states.req_id_to_index[req_id] + num_computed_tokens_np[req_index] = num_computed_tokens if req_new_block_ids is not None: - req_index = self.req_states.req_id_to_index[req_id] self.block_tables.append_block_ids( req_index, req_new_block_ids, overwrite=False ) + # Update num_computed_prefill_tokens. + np.minimum( + self.req_states.num_computed_tokens_np, + self.req_states.prefill_len.np, + out=self.req_states.num_computed_prefill_tokens, + ) + def prepare_inputs( self, scheduler_output: SchedulerOutput, batch_desc: BatchExecutionDescriptor ) -> InputBatch: @@ -787,10 +798,7 @@ def prepare_inputs( async_copy_to_gpu(query_start_loc_np, out=self.input_buffers.query_start_loc) query_start_loc_np = query_start_loc_np[: num_reqs_padded + 1] query_start_loc = self.input_buffers.query_start_loc[: num_reqs_padded + 1] - is_prefilling_np = ( - self.req_states.num_computed_prefill_tokens[idx_mapping_np] - < self.req_states.prefill_len.np[idx_mapping_np] - ) + is_prefilling_np = self.req_states.is_prefilling(idx_mapping_np) # Get prefill tokens if any. if np.any(is_prefilling_np): @@ -973,18 +981,6 @@ def postprocess( self.req_states.total_len.gpu, ) - # Update the number of computed prefill tokens. - idx_mapping_np = input_batch.idx_mapping_np - computed_prefill = self.req_states.num_computed_prefill_tokens - computed_prefill[idx_mapping_np] += input_batch.num_scheduled_tokens - np.minimum( - computed_prefill, self.req_states.prefill_len.np, out=computed_prefill - ) - # Advance the CPU mirror optimistically (assume all scheduled accepted). - self.req_states.num_computed_tokens_np[idx_mapping_np] += ( - input_batch.num_scheduled_tokens - ) - self.model_state.postprocess_state(input_batch, num_sampled) @torch.inference_mode() @@ -1357,18 +1353,6 @@ def postprocess_pool(self, input_batch: InputBatch) -> None: input_batch.query_start_loc, ) - # Update the number of computed prefill tokens. - idx_mapping_np = input_batch.idx_mapping_np - computed_prefill = self.req_states.num_computed_prefill_tokens - computed_prefill[idx_mapping_np] += input_batch.num_scheduled_tokens - np.minimum( - computed_prefill, self.req_states.prefill_len.np, out=computed_prefill - ) - # Advance the CPU mirror optimistically (assume all scheduled accepted). - self.req_states.num_computed_tokens_np[idx_mapping_np] += ( - input_batch.num_scheduled_tokens - ) - def shutdown(self) -> None: """Release GPU tensors (model weights, KV caches, workspace) so that memory is reclaimable when running in the same process.""" diff --git a/vllm/v1/worker/gpu/states.py b/vllm/v1/worker/gpu/states.py index 6268ea0ba673..cdd7286fa56e 100644 --- a/vllm/v1/worker/gpu/states.py +++ b/vllm/v1/worker/gpu/states.py @@ -104,9 +104,8 @@ def add_request( self.num_computed_prefill_tokens[req_idx] = num_computed_tokens self.num_computed_tokens_np[req_idx] = num_computed_tokens self.num_computed_tokens.stage_write_elem(req_idx, num_computed_tokens) - self.num_computed_tokens_np[req_idx] = num_computed_tokens - if num_computed_tokens > 0 and num_computed_tokens <= prefill_len: + if 0 < num_computed_tokens <= prefill_len: # For PD disagg or resumed requests: set last_sampled to the last # computed token so the first decode step gets the right input_id. # For fresh prefill requests (num_computed_tokens == 0) the tensor @@ -134,8 +133,8 @@ def remove_request(self, req_id: str) -> bool: self.free_indices.append(req_idx) return True - def any_prefills(self, idx_mapping_np: np.ndarray) -> bool: - return np.any( + def is_prefilling(self, idx_mapping_np: np.ndarray) -> np.ndarray: + return ( self.num_computed_prefill_tokens[idx_mapping_np] < self.prefill_len.np[idx_mapping_np] )