diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 0cb65bd0f779..50cffdaf16e2 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1197,28 +1197,7 @@ def update_from_output( routed_experts = None if stopped: - if self.vllm_config.model_config.enable_return_routed_experts: - kv_blocks = self.kv_cache_manager.get_blocks(request.request_id) - block_ids = kv_blocks.get_block_ids()[0] - num_tokens = request.num_tokens - 1 - - # compute slot mapping - block_ids_array = np.array(block_ids, dtype=np.int32) - num_blocks = len(block_ids) - block_size = self.block_size - - # generate block offsets - block_offsets = np.arange(0, block_size) - - # compute slot mapping: slot = block_id * block_size + offset - slot_mapping = ( - block_offsets.reshape((1, block_size)) - + block_ids_array.reshape((num_blocks, 1)) * block_size - ).flatten()[:num_tokens] - - routed_experts = self.routed_experts_reader.get_routed_experts( - indices=slot_mapping - ) + routed_experts = self._get_routed_experts(request) kv_transfer_params = self._free_request(request) if status_before_stop == RequestStatus.RUNNING: stopped_running_reqs.add(request) @@ -1250,7 +1229,12 @@ def update_from_output( # Get prompt logprobs for this request. prompt_logprobs_tensors = prompt_logprobs_dict.get(req_id) - if new_token_ids or pooler_output is not None or kv_transfer_params: + if ( + new_token_ids + or pooler_output is not None + or kv_transfer_params + or stopped + ): # Add EngineCoreOutput for this Request. outputs[request.client_index].append( EngineCoreOutput( @@ -1351,6 +1335,30 @@ def update_from_output( return engine_core_outputs + def _get_routed_experts(self, request: Request) -> np.ndarray | None: + if not self.vllm_config.model_config.enable_return_routed_experts: + return None + + kv_blocks = self.kv_cache_manager.get_blocks(request.request_id) + block_ids = kv_blocks.get_block_ids()[0] + num_tokens = request.num_tokens - 1 + + # compute slot mapping + block_ids_array = np.array(block_ids, dtype=np.int32) + num_blocks = len(block_ids) + block_size = self.block_size + + # generate block offsets + block_offsets = np.arange(0, block_size) + + # compute slot mapping: slot = block_id * block_size + offset + slot_mapping = ( + block_offsets.reshape((1, block_size)) + + block_ids_array.reshape((num_blocks, 1)) * block_size + ).flatten()[:num_tokens] + + return self.routed_experts_reader.get_routed_experts(indices=slot_mapping) + def _update_request_with_output( self, request: Request, new_token_ids: list[int] ) -> tuple[list[int], bool]: