diff --git a/vllm/v1/worker/gpu/model_runner.py b/vllm/v1/worker/gpu/model_runner.py index 7dcdaf1d2029..a3d387a69260 100644 --- a/vllm/v1/worker/gpu/model_runner.py +++ b/vllm/v1/worker/gpu/model_runner.py @@ -259,7 +259,7 @@ def load_model(self, *args, **kwargs) -> None: prepare_communication_buffer_for_model(self.model) if self.speculator is not None: - prepare_communication_buffer_for_model(self.speculator) + prepare_communication_buffer_for_model(self.speculator.model) # Initialize the components that require the model. self.model_state = ModelState(self.vllm_config, self.model, self.device) @@ -358,7 +358,7 @@ def _dummy_run( return None, None assert self.execute_model_state is not None - input_batch, _, _, _, hidden_states, _, _ = self.execute_model_state + input_batch, _, _, _, hidden_states, _, _, _ = self.execute_model_state self.execute_model_state = None assert hidden_states is not None # Last PP rank always has hidden_states sample_hidden_states = hidden_states[input_batch.logits_indices] @@ -974,6 +974,7 @@ def execute_model( hidden_states, aux_hidden_states, kv_connector_output, + num_tokens_across_dp, ) if not self.is_last_pp_rank: @@ -1000,6 +1001,7 @@ def sample_tokens( hidden_states, aux_hidden_states, kv_connector_output, + num_tokens_across_dp, ) = self.execute_model_state self.execute_model_state = None @@ -1073,6 +1075,7 @@ def sample_tokens( self.req_states.next_prefill_tokens, self.sampler.sampling_states.temperature.gpu, self.sampler.sampling_states.seeds.gpu, + num_tokens_across_dp, ) self.req_states.draft_tokens[input_batch.idx_mapping] = draft_tokens self.draft_tokens_handler.set_draft_tokens(input_batch, draft_tokens) diff --git a/vllm/v1/worker/gpu/spec_decode/eagle/speculator.py b/vllm/v1/worker/gpu/spec_decode/eagle/speculator.py index 0c85bf65ee9c..0e6854aefefb 100644 --- a/vllm/v1/worker/gpu/spec_decode/eagle/speculator.py +++ b/vllm/v1/worker/gpu/spec_decode/eagle/speculator.py @@ -16,6 +16,7 @@ build_slot_mappings_by_layer, ) from vllm.v1.worker.gpu.block_table import BlockTables +from vllm.v1.worker.gpu.dp_utils import make_num_tokens_across_dp from vllm.v1.worker.gpu.input_batch import InputBatch, InputBuffers from vllm.v1.worker.gpu.sample.gumbel import gumbel_sample from vllm.v1.worker.gpu.spec_decode.eagle.cudagraph import EagleCudaGraphManager @@ -200,6 +201,8 @@ def propose( temperature: torch.Tensor, # [max_num_reqs] seeds: torch.Tensor, + # [data_parallel_size] + num_tokens_across_dp: torch.Tensor | None, ) -> torch.Tensor: # NOTE(woosuk): To avoid CPU-GPU synchronization without CPU knowing the # number of rejected tokens, we maintain the size of eagle's input_ids and @@ -233,7 +236,7 @@ def propose( num_tokens, attn_metadata, slot_mappings, - num_tokens_across_dp=None, # FIXME + num_tokens_across_dp=num_tokens_across_dp, ) sample_hidden_states = last_hidden_states[last_token_indices] logits = self.model.compute_logits(sample_hidden_states) @@ -315,12 +318,16 @@ def propose( slot_mappings_by_layer = build_slot_mappings_by_layer( slot_mappings, self.kv_cache_config ) + num_decode_tokens_across_dp = make_num_tokens_across_dp( + self.vllm_config.parallel_config.data_parallel_size, + num_tokens_padded, + ) self.generate_draft( num_reqs, num_tokens_padded, attn_metadata, slot_mappings_by_layer, - num_tokens_across_dp=None, # FIXME + num_tokens_across_dp=num_decode_tokens_across_dp, cudagraph_runtime_mode=cudagraph_mode, ) return self.draft_tokens[:num_reqs]