diff --git a/.github/workflows/_e2e_test.yaml b/.github/workflows/_e2e_test.yaml index 3c125ad04a2..12662354fa9 100644 --- a/.github/workflows/_e2e_test.yaml +++ b/.github/workflows/_e2e_test.yaml @@ -19,7 +19,7 @@ on: continue_on_error: required: false type: boolean - default: false + default: true # The following inputs are used by comment-triggered E2E tests (/e2e ). # They carry space-separated pytest paths, categorized by runner type. # Leave empty (default) when running label-triggered full/light suites. diff --git a/.github/workflows/pr_test_full.yaml b/.github/workflows/pr_test_full.yaml index aa0b660d381..5b223facc34 100644 --- a/.github/workflows/pr_test_full.yaml +++ b/.github/workflows/pr_test_full.yaml @@ -102,7 +102,7 @@ jobs: strategy: fail-fast: false matrix: - vllm_version: [35141a7eeda941a60ad5a4956670c60fd5a77029] + vllm_version: [35141a7eeda941a60ad5a4956670c60fd5a77029, v0.18.0] needs: [parse-trigger] if: ${{ needs.parse-trigger.outputs.allowed == 'true' }} uses: ./.github/workflows/_e2e_test.yaml diff --git a/tests/ut/worker/test_block_table.py b/tests/ut/worker/test_block_table.py index ba378e9b1da..f031b3811b0 100644 --- a/tests/ut/worker/test_block_table.py +++ b/tests/ut/worker/test_block_table.py @@ -18,6 +18,8 @@ import numpy as np import torch + +# import vllm.utils.cpu_triton_utils as cpu_tl from vllm.distributed.parallel_state import GroupCoordinator from tests.ut.base import TestBase @@ -25,7 +27,7 @@ class TestBlockTableComputeSlotMapping(TestBase): """Test suite for BlockTable.compute_slot_mapping() method - + This test suite covers different configurations of DCP (Decode Context Parallelism), PCP (Prefill Context Parallelism), and cp_kv_cache_interleave_size to ensure correct slot_mapping calculation on different ranks. @@ -41,13 +43,13 @@ def setUp(self): self.device = torch.device("cpu") self.kernel_sizes = [128] - def create_block_table(self, dcp_world_size, dcp_rank, pcp_world_size, - pcp_rank, cp_kv_cache_interleave_size): + def create_block_table(self, dcp_world_size, dcp_rank, pcp_world_size, pcp_rank, cp_kv_cache_interleave_size): """Helper method to create BlockTable with mocked distributed groups""" - with patch('vllm_ascend.worker.block_table.get_dcp_group') as mock_get_dcp_group, \ - patch('vllm_ascend.worker.block_table.get_pcp_group') as mock_get_pcp_group: - + with ( + patch("vllm_ascend.worker.block_table.get_dcp_group") as mock_get_dcp_group, + patch("vllm_ascend.worker.block_table.get_pcp_group") as mock_get_pcp_group, + ): # Mock DCP group mock_dcp_group = MagicMock(spec=GroupCoordinator) mock_dcp_group.world_size = dcp_world_size @@ -71,7 +73,8 @@ def create_block_table(self, dcp_world_size, dcp_rank, pcp_world_size, device=self.device, kernel_sizes=self.kernel_sizes, cp_kv_cache_interleave_size=cp_kv_cache_interleave_size, - num_speculative_tokens=0) + num_speculative_tokens=0, + ) return block_table @@ -79,15 +82,12 @@ def setup_block_table_data(self, block_table, num_reqs=2): """Helper method to populate block table with test data""" # Add block IDs for each request for i in range(num_reqs): - block_ids = list(range(i * 4, - (i + 1) * 4)) # [0,1,2,3], [4,5,6,7], etc. + block_ids = list(range(i * 4, (i + 1) * 4)) # [0,1,2,3], [4,5,6,7], etc. block_table.add_row(block_ids, i) - def _test_slot_mapping_for_ranks(self, dcp_world_size, pcp_world_size, - cp_kv_cache_interleave_size, - test_configs): + def _test_slot_mapping_for_ranks(self, dcp_world_size, pcp_world_size, cp_kv_cache_interleave_size, test_configs): """Helper method to test slot_mapping across multiple ranks - + Args: dcp_world_size: Number of DCP ranks pcp_world_size: Number of PCP ranks @@ -97,31 +97,46 @@ def _test_slot_mapping_for_ranks(self, dcp_world_size, pcp_world_size, for dcp_rank, pcp_rank, req_indices, positions, expected_result in test_configs: with self.subTest(dcp_rank=dcp_rank, pcp_rank=pcp_rank): block_table = self.create_block_table( - dcp_world_size, dcp_rank, pcp_world_size, pcp_rank, - cp_kv_cache_interleave_size) + dcp_world_size, dcp_rank, pcp_world_size, pcp_rank, cp_kv_cache_interleave_size + ) num_reqs = max(req_indices) + 1 if len(req_indices) > 0 else 1 self.setup_block_table_data(block_table, num_reqs=num_reqs) - block_table.compute_slot_mapping(req_indices, positions) + # Build query_start_loc [num_reqs + 1] from req_indices. + # query_start_loc holds the cumulative token count per request, + # e.g. req_indices=[0,0,1,1] -> query_start_loc=[0,2,4]. + num_tokens = len(positions) + counts = np.bincount(req_indices, minlength=num_reqs) + query_start_loc_np = np.concatenate([[0], np.cumsum(counts)]).astype(np.int32) + query_start_loc = torch.from_numpy(query_start_loc_np) + + # positions must be a torch int64 tensor to match the + # _compute_slot_mapping_kernel's positions_ptr type. + positions_tensor = torch.from_numpy(positions.astype(np.int64)) + # block_table._compute_slot_mapping_kernel = cpu_tl.compute_slot_mapping_kernel + block_table.compute_slot_mapping(num_reqs, query_start_loc, positions_tensor) + + actual_result = block_table.slot_mapping.np[:num_tokens] - actual_result = block_table.slot_mapping.np[:len(positions)] np.testing.assert_array_equal( - actual_result, expected_result, + actual_result, + expected_result, f"DCP={dcp_world_size}, PCP={pcp_world_size}, " f"interleave={cp_kv_cache_interleave_size}, " - f"dcp_rank={dcp_rank}, pcp_rank={pcp_rank}") + f"dcp_rank={dcp_rank}, pcp_rank={pcp_rank}", + ) def test_compute_slot_mapping_dcp1_pcp1_interleave1(self): """Test compute_slot_mapping with DCP=1, PCP=1, interleave_size=1 - + With no parallelism (DCP=1, PCP=1), all tokens are local to the single rank. - + Setup: - Block size: 16 - Request 0 has blocks: [0, 1, 2, 3] - Request 1 has blocks: [4, 5, 6, 7] - + Test positions for each request: - Request 0, position 0: block_id=0, offset=0 → slot = 0*128+0 = 0 - Request 0, position 1: block_id=0, offset=1 → slot = 0*128+1 = 1 @@ -137,14 +152,13 @@ def test_compute_slot_mapping_dcp1_pcp1_interleave1(self): (0, 0, req_indices, positions, expected_result), ] - self._test_slot_mapping_for_ranks(dcp_world_size=1, - pcp_world_size=1, - cp_kv_cache_interleave_size=1, - test_configs=test_configs) + self._test_slot_mapping_for_ranks( + dcp_world_size=1, pcp_world_size=1, cp_kv_cache_interleave_size=1, test_configs=test_configs + ) def test_compute_slot_mapping_dcp4_pcp2_interleave1(self): """Test compute_slot_mapping with DCP=4, PCP=2, interleave_size=1 - + With interleave_size=1, tokens are distributed round-robin across all 8 ranks: - Position 0 → Rank 0 - Position 1 → Rank 1 @@ -183,28 +197,25 @@ def test_compute_slot_mapping_dcp4_pcp2_interleave1(self): for pcp_rank in range(2): for dcp_rank in range(4): current_rank = 4 * pcp_rank + dcp_rank - expected_result = np.array(rank_expectations[current_rank], - dtype=np.int32) - test_configs.append((dcp_rank, pcp_rank, req_indices, - positions, expected_result)) + expected_result = np.array(rank_expectations[current_rank], dtype=np.int32) + test_configs.append((dcp_rank, pcp_rank, req_indices, positions, expected_result)) - self._test_slot_mapping_for_ranks(dcp_world_size=4, - pcp_world_size=2, - cp_kv_cache_interleave_size=1, - test_configs=test_configs) + self._test_slot_mapping_for_ranks( + dcp_world_size=4, pcp_world_size=2, cp_kv_cache_interleave_size=1, test_configs=test_configs + ) def test_compute_slot_mapping_dcp4_pcp2_interleave128(self): """Test compute_slot_mapping with DCP=4, PCP=2, interleave_size=128 - + With interleave_size=128, tokens are distributed in chunks of 128 across ranks. Virtual block size = 16 * 4 * 2 = 128 - + Token distribution with interleave_size=128: - Positions 0-127 belong to rank 0 (first chunk of 128) - Positions 128-255 belong to rank 1 (second chunk of 128) - Positions 256-383 belong to rank 2 (third chunk of 128) - And so on... - + Using 130 positions ensures we test both rank 0 (positions 0-127) and rank 1 (positions 128-129). """ num_positions = 130 @@ -245,14 +256,13 @@ def test_compute_slot_mapping_dcp4_pcp2_interleave128(self): expected_result = [-1] * 130 test_configs.append( - (dcp_rank, pcp_rank, req_indices, positions, - np.array(expected_result, dtype=np.int32))) + (dcp_rank, pcp_rank, req_indices, positions, np.array(expected_result, dtype=np.int32)) + ) - self._test_slot_mapping_for_ranks(dcp_world_size=4, - pcp_world_size=2, - cp_kv_cache_interleave_size=128, - test_configs=test_configs) + self._test_slot_mapping_for_ranks( + dcp_world_size=4, pcp_world_size=2, cp_kv_cache_interleave_size=128, test_configs=test_configs + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/vllm_ascend/attention/attention_v1.py b/vllm_ascend/attention/attention_v1.py index 6443c5216e2..756827776b3 100644 --- a/vllm_ascend/attention/attention_v1.py +++ b/vllm_ascend/attention/attention_v1.py @@ -278,7 +278,10 @@ def build( ) block_table = common_attn_metadata.block_table_tensor - seq_lens = common_attn_metadata.seq_lens_cpu[:num_reqs] + if common_attn_metadata.seq_lens is not None: + seq_lens = common_attn_metadata.seq_lens[:num_reqs].to("cpu") + else: + seq_lens = common_attn_metadata.seq_lens_cpu[:num_reqs] slot_mapping = common_attn_metadata.slot_mapping[:num_actual_tokens] # this slot_mapping override doesn't work since vllm will override it again. We should fix it vllm. @@ -688,7 +691,26 @@ def full_graph_pa( graph_params.handles[num_tokens].append(handle) return output - def _get_fia_params(self, key: torch.Tensor, value: torch.Tensor, attn_metadata: AscendMetadata): + def _get_fia_params(self, key: torch.Tensor, value: torch.Tensor, attn_metadata: AscendMetadata, kv_cache=None): + # PrefillNoCache doesn't need key_cache, but other modes do + # Only initialize/require cache for modes that actually use it + if attn_metadata.attn_state != AscendAttentionState.PrefillNoCache: + # Initialize cache from kv_cache if not already set (for DecodeOnly mode) + if self.key_cache is None and kv_cache is not None: + if ( + isinstance(kv_cache, torch.Tensor) + and kv_cache.dim() > 0 + and kv_cache.shape[0] == 2 + or isinstance(kv_cache, (list, tuple)) + and len(kv_cache) >= 2 + ): + self.key_cache, self.value_cache = kv_cache[0], kv_cache[1] + + if self.key_cache is None: + raise RuntimeError( + f"key_cache is None in _get_fia_params for mode {attn_metadata.attn_state}. kv_cache={kv_cache}" + ) + if attn_metadata.attn_state == AscendAttentionState.PrefillNoCache: block_size = 128 block_table = None @@ -766,6 +788,7 @@ def forward_fused_infer_attention( value: torch.Tensor, attn_metadata: AscendMetadata, output: torch.Tensor, + kv_cache=None, ): # we inherit ForwardContext in model runner v2, when enable model # runner v2, there is not capturing attribute in forward_context, @@ -781,7 +804,9 @@ def forward_fused_infer_attention( and self.sinks is None ): return self._forward_fia_slidingwindow(query, attn_metadata, output) - key, value, block_size, block_table, actual_seq_lengths_kv = self._get_fia_params(key, value, attn_metadata) + key, value, block_size, block_table, actual_seq_lengths_kv = self._get_fia_params( + key, value, attn_metadata, kv_cache + ) num_tokens = attn_metadata.actual_seq_lengths_q[-1] query = query[:num_tokens] if ( @@ -927,7 +952,7 @@ def forward_impl( ): output = self.forward_paged_attention(query, attn_metadata, output) else: - output = self.forward_fused_infer_attention(query, key, value, attn_metadata, output) + output = self.forward_fused_infer_attention(query, key, value, attn_metadata, output, kv_cache) return output @@ -963,6 +988,20 @@ def forward( num_tokens = query.shape[0] if attn_metadata is None: return output.fill_(0) + + # Initialize key_cache and value_cache from kv_cache if not already set. + # This is needed for DecodeOnly mode where key/value are None but we still + # need access to the cache for attention computation. + if self.key_cache is None and kv_cache is not None: + if ( + isinstance(kv_cache, torch.Tensor) + and kv_cache.dim() > 0 + and kv_cache.shape[0] == 2 + or isinstance(kv_cache, (list, tuple)) + and len(kv_cache) >= 2 + ): + self.key_cache, self.value_cache = kv_cache[0], kv_cache[1] + output_padded = None if key is not None and value is not None: output_padded = output diff --git a/vllm_ascend/attention/mla_v1.py b/vllm_ascend/attention/mla_v1.py index 3b1c575b871..601160344a2 100644 --- a/vllm_ascend/attention/mla_v1.py +++ b/vllm_ascend/attention/mla_v1.py @@ -435,7 +435,11 @@ def build( query_seq_lens_cpu = query_start_loc_cpu[1:] - query_start_loc_cpu[:-1] self.query_lens = query_seq_lens_cpu[:num_reqs] - self.seq_lens = common_attn_metadata.seq_lens_cpu[:num_reqs] + self.seq_lens = None + if common_attn_metadata.seq_lens_cpu is not None: + self.seq_lens = common_attn_metadata.seq_lens_cpu[:num_reqs] + else: + self.seq_lens = common_attn_metadata.seq_lens[:num_reqs].to("cpu") self.graph_pad_size = common_attn_metadata.graph_pad_size block_table_size = self.get_block_table_size(common_attn_metadata, BUILD_METADATA_STEP_PREFILL) diff --git a/vllm_ascend/attention/sfa_v1.py b/vllm_ascend/attention/sfa_v1.py index 5ed5cac8782..8a0e3b90b5e 100644 --- a/vllm_ascend/attention/sfa_v1.py +++ b/vllm_ascend/attention/sfa_v1.py @@ -240,7 +240,12 @@ def build( cum_query_lens = common_attn_metadata.query_start_loc[1 : num_reqs + 1] seq_lens = common_attn_metadata.seq_lens[:num_reqs] - seq_lens_cpu = common_attn_metadata.seq_lens_cpu[:num_reqs] + + seq_lens_cpu = None + if common_attn_metadata.seq_lens_cpu is not None: + seq_lens_cpu = common_attn_metadata.seq_lens_cpu[:num_reqs] + else: + seq_lens_cpu = common_attn_metadata.seq_lens[:num_reqs].to("cpu") cos, sin = get_cos_and_sin_mla(input_positions, True) diff --git a/vllm_ascend/attention/utils.py b/vllm_ascend/attention/utils.py index 9926efbc901..f9ca09b9a12 100644 --- a/vllm_ascend/attention/utils.py +++ b/vllm_ascend/attention/utils.py @@ -175,8 +175,10 @@ def unpadded(self, num_actual_tokens: int, num_actual_reqs: int) -> "AscendCommo query_start_loc=self.query_start_loc[: num_actual_reqs + 1], query_start_loc_cpu=self.query_start_loc_cpu[: num_actual_reqs + 1], seq_lens=self.seq_lens[:num_actual_reqs], - seq_lens_cpu=self.seq_lens_cpu[:num_actual_reqs], - num_computed_tokens_cpu=self.num_computed_tokens_cpu[:num_actual_reqs], + seq_lens_cpu=self.seq_lens_cpu[:num_actual_reqs] if self.seq_lens_cpu is not None else None, + num_computed_tokens_cpu=self.num_computed_tokens_cpu[:num_actual_reqs] + if self.num_computed_tokens_cpu is not None + else None, num_reqs=num_actual_reqs, num_actual_tokens=num_actual_tokens, max_query_len=self.max_query_len, diff --git a/vllm_ascend/batch_invariant.py b/vllm_ascend/batch_invariant.py index d1a42c383c6..666272610b5 100644 --- a/vllm_ascend/batch_invariant.py +++ b/vllm_ascend/batch_invariant.py @@ -28,6 +28,20 @@ torch_sum = torch.sum +def vllm_is_batch_invariant() -> bool: + """Check if batch-invariant mode is enabled. + + This is a compatibility wrapper for the vllm function that was removed + in recent upstream vLLM refactoring. + """ + # Try to access from envs module, fall back to environment variable + if hasattr(envs, "VLLM_BATCH_INVARIANT"): + return bool(envs.VLLM_BATCH_INVARIANT) + else: + # Fallback to environment variable for older vLLM versions + return bool(int(os.getenv("VLLM_BATCH_INVARIANT", "0"))) + + if HAS_TRITON: from vllm_ascend.ops.triton.batch_invariant.matmul import ( addmm_batch_invariant, diff --git a/vllm_ascend/spec_decode/eagle_proposer.py b/vllm_ascend/spec_decode/eagle_proposer.py index 9db0ae537e4..0d7fefb6a47 100644 --- a/vllm_ascend/spec_decode/eagle_proposer.py +++ b/vllm_ascend/spec_decode/eagle_proposer.py @@ -561,11 +561,14 @@ def _propose( common_attn_metadata.block_table_tensor = self._adjust_tensor( common_attn_metadata.block_table_tensor, num_reqs_padded ) - common_attn_metadata.seq_lens = self._adjust_tensor(common_attn_metadata.seq_lens, num_reqs_padded) - common_attn_metadata.seq_lens_cpu = self._adjust_tensor(common_attn_metadata.seq_lens_cpu, num_reqs_padded) - common_attn_metadata.num_computed_tokens_cpu = self._adjust_tensor( - common_attn_metadata.num_computed_tokens_cpu, num_reqs_padded + common_attn_metadata.seq_lens = self._adjust_tensor(self.runner.seq_lens, num_reqs_padded) + common_attn_metadata.seq_lens_cpu = self._adjust_tensor( + self.runner.optimistic_seq_lens_cpu, num_reqs_padded ) + if common_attn_metadata.num_computed_tokens_cpu is not None: + common_attn_metadata.num_computed_tokens_cpu = self._adjust_tensor( + common_attn_metadata.num_computed_tokens_cpu, num_reqs_padded + ) else: num_reqs_padded = common_attn_metadata.num_reqs @@ -1164,9 +1167,10 @@ def attn_update_stack_num_spec_norm( common_attn_metadata.seq_lens_cpu = self._adjust_tensor( common_attn_metadata.seq_lens_cpu, input_batch_size ) - common_attn_metadata.num_computed_tokens_cpu = self._adjust_tensor( - common_attn_metadata.num_computed_tokens_cpu, input_batch_size - ) + if common_attn_metadata.num_computed_tokens_cpu is not None: + common_attn_metadata.num_computed_tokens_cpu = self._adjust_tensor( + common_attn_metadata.num_computed_tokens_cpu, input_batch_size + ) common_attn_metadata.query_start_loc = self.arange[: input_batch_size + 1] common_attn_metadata.query_start_loc_cpu = torch.from_numpy( self.token_arange_np[: input_batch_size + 1] @@ -1193,8 +1197,10 @@ def attn_update_stack_num_spec_norm( # in the merged graph, it does not affect position 1 # FIXME(lilinsiman) common_attn_metadata.seq_lens = common_attn_metadata.seq_lens.clone() - common_attn_metadata.seq_lens_cpu = common_attn_metadata.seq_lens_cpu.clone() - common_attn_metadata.num_computed_tokens_cpu = common_attn_metadata.num_computed_tokens_cpu.clone() + if common_attn_metadata.seq_lens_cpu is not None: + common_attn_metadata.seq_lens_cpu = common_attn_metadata.seq_lens_cpu.clone() + if common_attn_metadata.num_computed_tokens_cpu is not None: + common_attn_metadata.num_computed_tokens_cpu = common_attn_metadata.num_computed_tokens_cpu.clone() common_attn_metadata.positions = common_attn_metadata.positions.clone() # NOTE(woosuk): We should handle the case where the draft model @@ -1222,11 +1228,12 @@ def attn_update_stack_num_spec_norm( # For the requests that exceed the max model length, we set the # sequence length to 1 to minimize their overheads in attention. common_attn_metadata.seq_lens[:batch_size].masked_fill_(exceeds_max_model_len, 1) - - common_attn_metadata.seq_lens_cpu[:batch_size] = common_attn_metadata.seq_lens_cpu[:batch_size] + 1 - exceeds_mask = common_attn_metadata.seq_lens_cpu[:batch_size] >= self.max_model_len - common_attn_metadata.seq_lens_cpu[:batch_size].masked_fill_(exceeds_mask, 1) - common_attn_metadata.num_computed_tokens_cpu[:batch_size] += 1 + if common_attn_metadata.seq_lens_cpu is not None: + common_attn_metadata.seq_lens_cpu[:batch_size] = common_attn_metadata.seq_lens_cpu[:batch_size] + 1 + exceeds_mask = common_attn_metadata.seq_lens_cpu[:batch_size] >= self.max_model_len + common_attn_metadata.seq_lens_cpu[:batch_size].masked_fill_(exceeds_mask, 1) + if common_attn_metadata.num_computed_tokens_cpu is not None: + common_attn_metadata.num_computed_tokens_cpu[:batch_size] += 1 if self.uses_mrope: common_attn_metadata.positions[:batch_size].copy_(clamped_positions[0]) else: @@ -1304,7 +1311,7 @@ def attn_update_stack_num_spec_norm( def prepare_next_token_ids_padded( self, - common_attn_metadata: CommonAttentionMetadata, + seq_lens_cpu: torch.Tensor, sampled_token_ids: torch.Tensor, requests: dict[str, CachedRequestState], gpu_input_batch: InputBatch, @@ -1324,11 +1331,9 @@ def prepare_next_token_ids_padded( # Precompute get_token_id for when there is no valid next token num_reqs = gpu_input_batch.num_reqs + seq_lens_list = seq_lens_cpu[:num_reqs].tolist() self.backup_next_token_ids.np[:num_reqs] = np.array( - [ - requests[gpu_input_batch.req_ids[i]].get_token_id(common_attn_metadata.seq_lens_cpu[i].item()) - for i in range(num_reqs) - ] + [requests[gpu_input_batch.req_ids[i]].get_token_id(seq_lens_list[i]) for i in range(num_reqs)] ) self.backup_next_token_ids.copy_to_gpu(num_reqs) diff --git a/vllm_ascend/spec_decode/utils.py b/vllm_ascend/spec_decode/utils.py new file mode 100644 index 00000000000..7f407cc2782 --- /dev/null +++ b/vllm_ascend/spec_decode/utils.py @@ -0,0 +1,31 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import torch + + +def update_num_computed_tokens_for_batch_change( + num_computed_tokens: torch.Tensor, + num_accepted_tokens: torch.Tensor, + prev_positions: torch.Tensor, + valid_sampled_token_count: torch.Tensor, + prev_num_draft_tokens: torch.Tensor, + cpu_num_computed_tokens: torch.Tensor, +) -> None: + """Correct num_computed_tokens for async spec decode drift. + + Requests that had drafts: corrected = prev_gpu + valid_count. + New requests or non-draft (e.g. prefills): use CPU value directly. + """ + # Clamp because prev_positions can be -1 for new requests + gather_indices = prev_positions.clamp(min=0) + + valid_counts = valid_sampled_token_count[gather_indices] + prev_computed = num_computed_tokens[gather_indices] + prev_drafts = prev_num_draft_tokens[gather_indices] + + participating = (prev_positions >= 0) & (prev_drafts > 0) + corrected = prev_computed + valid_counts.int() + + n = prev_positions.shape[0] + num_computed_tokens[:n].copy_(torch.where(participating, corrected, cpu_num_computed_tokens)) + num_accepted_tokens.copy_(torch.where(participating, valid_counts, num_accepted_tokens)) diff --git a/vllm_ascend/worker/block_table.py b/vllm_ascend/worker/block_table.py index 3c812aa4432..f4bf6d4e344 100644 --- a/vllm_ascend/worker/block_table.py +++ b/vllm_ascend/worker/block_table.py @@ -2,7 +2,9 @@ import torch from vllm.distributed import get_dcp_group, get_pcp_group from vllm.utils.math_utils import cdiv +from vllm.v1.attention.backends.utils import PAD_SLOT_ID from vllm.v1.utils import CpuGpuBuffer +from vllm.v1.worker.block_table import _compute_slot_mapping_kernel from vllm.v1.worker.cp_utils import get_total_cp_world_size @@ -117,80 +119,34 @@ def swap_row(self, src: int, tgt: int) -> None: self.block_table.np[[src, tgt]] = self.block_table.np[[tgt, src]] - def compute_slot_mapping(self, req_indices: np.ndarray, positions: np.ndarray) -> None: - # E.g., [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] - # -> [0, 0, K, K, K + 1, K + 1, K + 2, 2 * K, 2 * K, 2 * K + 1] - # where K is the max_num_blocks_per_req and the block size is 2. - # NOTE(woosuk): We can't simply use `token_indices // block_size` - # here because M (max_model_len) is not necessarily divisible by - # block_size. - - if self.dcp_world_size * self.pcp_world_size > 1: - # Note(hc): The DCP implement store kvcache with an interleave - # style, the kvcache for the token whose token_idx is i is - # always stored on the GPU whose dcp_rank equals i % pcp_world_size: - - # Use a "virtual block" which equals to world_size * block_size - # for block_table_indices calculation. - virtual_block_size = self.block_size * self.dcp_world_size * self.pcp_world_size - - # IMPORTANT: In hybrid mode, positions are in logical block space, - # but we need to map them to the correct logical block table indices - logical_block_idx = positions // virtual_block_size - - # Account for the expanded logical table - # (always needed with unified tensor) - # Each physical block is split into multiple logical blocks - # The logical table has been expanded to accommodate this - block_table_indices = ( - req_indices * self.max_num_blocks_per_req * self.blocks_per_phys_block + logical_block_idx - ) - - block_numbers = self.block_table.np.ravel()[block_table_indices] - # Use virtual_block_size for mask calculation, which marks local - # tokens. - virtual_block_offsets = positions % virtual_block_size - self.current_rank = self.dcp_world_size * self.pcp_rank + self.dcp_rank - mask = ( - virtual_block_offsets // self.cp_kv_cache_interleave_size % (self.dcp_world_size * self.pcp_world_size) - == self.current_rank - ) - # Calculate local block_offsets - block_offsets = ( - virtual_block_offsets - // (self.dcp_world_size * self.pcp_world_size * self.cp_kv_cache_interleave_size) - * self.cp_kv_cache_interleave_size - + virtual_block_offsets % self.cp_kv_cache_interleave_size - ) - # Calculate slot_mapping - slot_mapping = block_numbers * self.block_size + block_offsets - # Write final slots, use -1 for not-local - self.slot_mapping.np[: req_indices.shape[0]] = np.where(mask, slot_mapping, -1) - else: - assert self.kernel_sizes is not None - if self.block_size == self.kernel_sizes[0]: - # IMPORTANT: In hybrid mode, positions are in logical block space, - # but we need to map them to the correct logical block table indices - logical_block_idx = positions // self.block_size - - # Account for the expanded logical table - # (always needed with unified tensor) - # Each physical block is split into multiple logical blocks - # The logical table has been expanded to accommodate this - block_table_indices = ( - req_indices * self.max_num_blocks_per_req * self.blocks_per_phys_block + logical_block_idx - ) - - block_numbers = self.block_table.np.ravel()[block_table_indices] - block_offsets = positions % self.block_size - np.add(block_numbers * self.block_size, block_offsets, out=self.slot_mapping.np[: req_indices.shape[0]]) + def compute_slot_mapping( + self, + num_reqs: int, + query_start_loc: torch.Tensor, + positions: torch.Tensor, + ) -> None: + num_tokens = positions.shape[0] + total_cp_world_size = self.pcp_world_size * self.dcp_world_size + total_cp_rank = self.pcp_rank * self.dcp_world_size + self.dcp_rank + _compute_slot_mapping_kernel[(num_reqs + 1,)]( + num_tokens, + self.max_num_batched_tokens, + query_start_loc, + positions, + self.block_table.gpu, + self.block_table.gpu.stride(0), + self.block_size, + self.slot_mapping.gpu, + TOTAL_CP_WORLD_SIZE=total_cp_world_size, + TOTAL_CP_RANK=total_cp_rank, + CP_KV_CACHE_INTERLEAVE_SIZE=self.cp_kv_cache_interleave_size, + PAD_ID=PAD_SLOT_ID, + BLOCK_SIZE=1024, + ) def commit_block_table(self, num_reqs: int) -> None: self.block_table.copy_to_gpu(num_reqs) - def commit_slot_mapping(self, num_tokens: int) -> None: - self.slot_mapping.copy_to_gpu(num_tokens) - def clear(self) -> None: self.block_table.fill_(0) self.block_table.cpu.fill_(0) @@ -299,18 +255,19 @@ def swap_row(self, src: int, tgt: int) -> None: for block_table in self.block_tables: block_table.swap_row(src, tgt) - def compute_slot_mapping(self, req_indices: np.ndarray, positions: np.ndarray) -> None: + def compute_slot_mapping( + self, + num_reqs: int, + query_start_loc: torch.Tensor, + positions: torch.Tensor, + ) -> None: for block_table in self.block_tables: - block_table.compute_slot_mapping(req_indices, positions) + block_table.compute_slot_mapping(num_reqs, query_start_loc, positions) def commit_block_table(self, num_reqs: int) -> None: for block_table in self.block_tables: block_table.commit_block_table(num_reqs) - def commit_slot_mapping(self, num_tokens: int) -> None: - for block_table in self.block_tables: - block_table.commit_slot_mapping(num_tokens) - def clear(self) -> None: for block_table in self.block_tables: block_table.clear() diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index c3f9615eb34..15018868a6f 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -118,6 +118,7 @@ from vllm_ascend.spec_decode.medusa_proposer import AscendMedusaProposer from vllm_ascend.spec_decode.ngram_proposer import AscendNgramProposer from vllm_ascend.spec_decode.suffix_proposer import AscendSuffixDecodingProposer +from vllm_ascend.spec_decode.utils import update_num_computed_tokens_for_batch_change from vllm_ascend.utils import ( calc_split_factor, check_gdn_layer, @@ -335,11 +336,8 @@ def __init__(self, vllm_config: VllmConfig, device: torch.device): ) # TODO(zhenwenqi) after https://github.com/vllm-project/vllm/pull/28988 is merged, we can delete this self.input_ids = self._make_buffer(max_buffer_num_tokens, dtype=torch.int32) - # Resize positions to accommodate PCP padding. Keep as plain GPU - # tensor (not CpuGpuBuffer) so upstream _preprocess can slice it. self.positions = torch.zeros( - max_buffer_num_tokens, dtype=torch.int64, device=self.device - ) + max_buffer_num_tokens, dtype=torch.int64, device=self.device) # Create a CPU numpy buffer for positions computation when # self.positions is a plain tensor (non-CpuGpuBuffer case). @@ -650,19 +648,31 @@ def _prepare_inputs( self.with_prefill = with_prefill # Get positions. - # Use query_pos.np as output buffer for _get_cumsum_and_arange to avoid - # corrupting self.arange_np (which is used as both read source and would - # be overwritten if used as arange_out, causing aliasing bugs). + cu_num_tokens = self._get_cumsum_and_arange( + num_scheduled_tokens, self.query_pos.np + ) positions_np = self._positions_np_buf[:total_num_scheduled_tokens] - cu_num_tokens = self._get_cumsum_and_arange(num_scheduled_tokens, self.query_pos.np) np.add( self.input_batch.num_computed_tokens_cpu[req_indices], self.query_pos.np[: cu_num_tokens[-1]], - out=positions_np + out=positions_np, ) - self.input_batch.block_table.compute_slot_mapping(req_indices, positions_np) - self.input_batch.block_table.commit_slot_mapping(total_num_scheduled_tokens) + # For PCP, compute slot_mapping on GPU using pre-PCP-split positions. + # Use blocking .to(device) to ensure data lands on GPU before PCP + # modifies CPU position buffers. PCP and async spec decode are + # mutually exclusive, so the sync is acceptable. + if self.pcp_size > 1: + pre_pcp_positions = torch.from_numpy( + positions_np[:total_num_scheduled_tokens] + ).to(self.device) + pre_pcp_qsl = torch.zeros( + num_reqs + 1, dtype=torch.int32, device=self.device) + pre_pcp_qsl[1:num_reqs + 1] = torch.from_numpy( + cu_num_tokens + ).to(dtype=torch.int32, device=self.device) + self.input_batch.block_table.compute_slot_mapping( + num_reqs, pre_pcp_qsl, pre_pcp_positions) if self.use_cp: self.pcp_manager.init_batch_info( @@ -780,22 +790,11 @@ def _prepare_inputs( self.gdn_query_start_loc.np[num_reqs + 1 :].fill(cu_num_tokens[-1]) self.gdn_query_start_loc.copy_to_gpu() - self.num_computed_tokens[:num_reqs].copy_( - self.input_batch.num_computed_tokens_cpu_tensor[:num_reqs], - non_blocking=True, - ) - - self.num_scheduled_tokens.np[:num_reqs] = num_scheduled_tokens - self.num_scheduled_tokens.copy_to_gpu(num_reqs) - num_scheduled_tokens_gpu = self.num_scheduled_tokens.gpu[:num_reqs] - self.seq_lens[:num_reqs] = ( - self.num_computed_tokens[:num_reqs] + num_scheduled_tokens_gpu - ) - self.seq_lens[num_reqs:].fill_(0) - - # Compute optimistic seq_lens on CPU (assumes all draft tokens from - # previous iteration accepted). Used by _pool() and discard logic. + # Compute optimistic seq_lens (assumes all draft tokens from previous + # iteration accepted). Store in optimistic_seq_lens_cpu for use by + # _build_attention_metadata (max_seq_len) and discard_request_mask. + # seq_lens (GPU) will be computed later using the same optimistic values. torch.add( self.input_batch.num_computed_tokens_cpu_tensor[:num_reqs], torch.from_numpy(num_scheduled_tokens), @@ -803,12 +802,14 @@ def _prepare_inputs( ) self.optimistic_seq_lens_cpu[num_reqs:].fill_(0) + # Build prev_positions mapping: current pos -> prev pos (-1 if new). + # Used for gathering from previous iteration's GPU tensors. + prev_req_id_to_index = self.input_batch.prev_req_id_to_index + self._compute_prev_positions(num_reqs) + # Fill unused with -1. Needed for reshape_and_cache in attention_cp self.query_start_loc.gpu[num_reqs + 1 :].fill_(-1) - # Build prev_positions mapping for async scheduling input_ids handling. - self._compute_prev_positions(num_reqs) - # Copy the tensors to the NPU. self._prepare_input_ids(scheduler_output, num_reqs, total_num_scheduled_tokens, cu_num_tokens) # Calculate M-RoPE positions. @@ -827,12 +828,6 @@ def _prepare_inputs( self.xdrope_positions.cpu[:, :total_num_scheduled_tokens], non_blocking=True, ) - else: - # Common case (1D positions) - self.positions[:total_num_scheduled_tokens].copy_( - self._positions_cpu_buf[:total_num_scheduled_tokens], - non_blocking=True, - ) # Record the index of requests that should not be sampled, # so that we could clear the sampled tokens before returning @@ -850,12 +845,102 @@ def _prepare_inputs( ) discard_requests_mask = original_seq_lens_np < num_tokens_np else: - discard_requests_mask = self.optimistic_seq_lens_cpu.numpy()[:num_reqs] < num_tokens_np + discard_requests_mask = self.optimistic_seq_lens_cpu[:num_reqs].numpy() < num_tokens_np discard_request_indices = np.nonzero(discard_requests_mask)[0] self.num_discarded_requests = len(discard_request_indices) self.discard_request_indices.np[: self.num_discarded_requests] = discard_request_indices self.discard_request_indices.copy_to_gpu(self.num_discarded_requests) + + # Sync num_accepted_tokens from CPU (set by + # _update_states_after_model_execute for hybrid models). + if self.num_accepted_tokens_event is not None: + self.num_accepted_tokens_event.synchronize() + self.num_accepted_tokens.np[:num_reqs] = ( + self.input_batch.num_accepted_tokens_cpu[:num_reqs] + ) + self.num_accepted_tokens.np[num_reqs:].fill(1) + self.num_accepted_tokens.copy_to_gpu() + else: + self.num_accepted_tokens.np.fill(1) + self.num_accepted_tokens.gpu.fill_(1) + + # Update num_computed_tokens on GPU. In async spec decode, + # CPU values are optimistic (all drafts accepted). The kernel + # corrects on GPU using the previous step's + # valid_sampled_token_count_gpu. Otherwise, just copy from CPU. + if ( + self.use_async_spec_decode + and self.valid_sampled_token_count_gpu is not None + and prev_req_id_to_index + ): + self.prev_positions.copy_to_gpu(num_reqs) + self.prev_num_draft_tokens.copy_to_gpu() + cpu_values = self.input_batch.num_computed_tokens_cpu_tensor[:num_reqs].to( + device=self.device, non_blocking=True + ) + update_num_computed_tokens_for_batch_change( + self.num_computed_tokens, + self.num_accepted_tokens.gpu[:num_reqs], + self.prev_positions.gpu[:num_reqs], + self.valid_sampled_token_count_gpu, + self.prev_num_draft_tokens.gpu, + cpu_values, + ) + else: + self.num_computed_tokens[:num_reqs].copy_( + self.input_batch.num_computed_tokens_cpu_tensor[:num_reqs], + non_blocking=True, + ) + + self.req_indices.np[:total_num_scheduled_tokens] = req_indices + self.req_indices.copy_to_gpu(total_num_scheduled_tokens) + req_indices_gpu = self.req_indices.gpu[:total_num_scheduled_tokens] + + self.query_pos.copy_to_gpu(total_num_scheduled_tokens) + self.num_scheduled_tokens.np[:num_reqs] = num_scheduled_tokens + self.num_scheduled_tokens.copy_to_gpu(num_reqs) + num_scheduled_tokens_gpu = self.num_scheduled_tokens.gpu[:num_reqs] + # fix prefix cache ci test + if self.pcp_size > 1: + # When PCP (Prefill Context Parallel) is enabled, positions use + # special PCP offsets (position_pcp) that are only computed on CPU. + # Copy the correctly-computed CPU positions to GPU instead of + # recomputing on GPU (which would miss the PCP offsets). + self.positions[:total_num_scheduled_tokens].copy_( + torch.from_numpy( + positions_np[:total_num_scheduled_tokens] + ).to(self.device), + non_blocking=True, + ) + else: + self.positions[:total_num_scheduled_tokens] = ( + self.num_computed_tokens[req_indices_gpu].to(torch.int64) + + self.query_pos.gpu[:total_num_scheduled_tokens] + ) + self.seq_lens[:num_reqs] = ( + self.num_computed_tokens[:num_reqs] + num_scheduled_tokens_gpu + ) + self.seq_lens[num_reqs:].fill_(0) + + # For non-PCP, compute slot_mapping on GPU. PCP slot_mapping was + # already computed on GPU before PCP split the positions. + if self.pcp_size <= 1: + self.input_batch.block_table.compute_slot_mapping( + num_reqs, + self.query_start_loc.gpu[: num_reqs + 1], + self.positions[:total_num_scheduled_tokens], + ) + + if self.use_async_spec_decode and (self.uses_mrope or self.uses_xdrope_dim > 0): + drift = self.num_computed_tokens[req_indices_gpu].to( + torch.int64 + ) - self.input_batch.num_computed_tokens_cpu_tensor[req_indices].to( + device=self.device, dtype=torch.int64, non_blocking=True + ) + target = self.mrope_positions if self.uses_mrope else self.xdrope_positions + target.gpu[:, :total_num_scheduled_tokens] += drift + use_spec_decode = len(scheduler_output.scheduled_spec_decode_tokens) > 0 if not use_spec_decode: # NOTE(woosuk): Due to chunked prefills, the batch may contain @@ -885,11 +970,12 @@ def _prepare_inputs( draft_token_ids, ) in scheduler_output.scheduled_spec_decode_tokens.items(): req_idx = self.input_batch.req_id_to_index[req_id] - num_draft_tokens[req_idx] = len(draft_token_ids) + draft_len = len(draft_token_ids) + num_draft_tokens[req_idx] = draft_len if (self.is_kv_consumer and req_id in new_schedule_reqs) or \ (self.input_batch.num_computed_tokens_cpu[req_idx] >= \ self.input_batch.num_prompt_tokens[req_idx]): - num_decode_draft_tokens[req_idx] = len(draft_token_ids) + num_decode_draft_tokens[req_idx] = draft_len else: num_decode_draft_tokens[req_idx] = -1 @@ -972,24 +1058,23 @@ def _calc_spec_decode_metadata( # Compute the logits indices. # [4, 1, 3, 1, 2] num_sampled_tokens = num_draft_tokens + 1 - # Step 1. [4, 5, 8, 9, 11] - cu_num_sampled_tokens = np.cumsum(num_sampled_tokens, dtype=np.int32) - total_num_sampled_tokens = cu_num_sampled_tokens[-1] - # Step 2. [0, 0, 0, 0, 4, 5, 5, 5, 8, 9, 9] - cumsums_offsets = np.repeat(cu_num_sampled_tokens - num_sampled_tokens, num_sampled_tokens) - # Step 3. [0, 1, 2, 3, 0, 0, 1, 2, 0, 0, 1] - arange = self.arange_np[:total_num_sampled_tokens] - cumsums_offsets - # Step 4. [0, 0, 0, 0, 103, 104, 104, 104, 206, 207, 207] + # Step 1. + # cu_num_sampled_tokens: [4, 5, 8, 9, 11] + # _arange_scratch[:11]: [0, 1, 2, 3, 0, 0, 1, 2, 0, 0, 1] + cu_num_sampled_tokens = self._get_cumsum_and_arange( + num_sampled_tokens, self._arange_scratch, cumsum_dtype=np.int32 + ) + # Step 2. [0, 0, 0, 0, 103, 104, 104, 104, 206, 207, 207] logits_indices = np.repeat(cu_num_scheduled_tokens - num_sampled_tokens, num_sampled_tokens) - # Step 5. [0, 1, 2, 3, 103, 104, 105, 106, 206, 207, 208] - logits_indices += arange + # Step 3. [0, 1, 2, 3, 103, 104, 105, 106, 206, 207, 208] + logits_indices += self._arange_scratch[: cu_num_sampled_tokens[-1]] # while pcp > 1, decode results may contain padding (from pcp all-gather), # update logits_indices after getting draft_token_ids from ori logits_indices if self.pcp_size > 1: cu_num_scheduled_tokens = cu_num_scheduled_tokens * self.pcp_size - num_pcp_pads logits_indices_pcp = np.repeat(cu_num_scheduled_tokens - num_sampled_tokens, num_sampled_tokens) - logits_indices_pcp += arange + logits_indices_pcp += self._arange_scratch[: cu_num_sampled_tokens[-1]] logits_indices_pcp = torch.from_numpy(logits_indices_pcp).pin_memory().to(self.device, non_blocking=True) # Compute the bonus logits indices. @@ -1080,7 +1165,7 @@ def propose_draft_token_ids( ) assert self.drafter is not None next_token_ids, valid_sampled_tokens_count = self.drafter.prepare_next_token_ids_padded( - common_attn_metadata, + self.optimistic_seq_lens_cpu, sampled_token_ids, self.requests, self.input_batch, @@ -1209,7 +1294,7 @@ def execute_model( with record_function_or_nullcontext("prepare input"): with self.synchronize_input_prep(): # Update persistent batch states. - self._update_states(scheduler_output) + deferred_state_corrections_fn = self._update_states(scheduler_output) if has_ec_transfer() and get_ec_transfer().is_producer: with self.maybe_get_ec_connector_output( @@ -1312,6 +1397,12 @@ def execute_model( # '_update_states_after_model_execute', which is not overridden in vLLM-Ascend. # We simply utilize the implementation in vLLM. if self.cache_config.mamba_cache_mode == "align": + # preprocess_mamba reads req_state.num_computed_tokens (CPU) + # to decide copy operations, so we must apply deferred + # corrections before it runs. + if deferred_state_corrections_fn: + deferred_state_corrections_fn() + deferred_state_corrections_fn = None mamba_utils.preprocess_mamba( scheduler_output, self.kv_cache_config, @@ -1323,6 +1414,14 @@ def execute_model( self.model.get_mamba_state_copy_func(), self._get_mamba_copy_bufs(), ) + # preprocess_mamba resets num_accepted_tokens_cpu to 1 + # for requests whose state was copied to a new block. + # Re-sync to GPU so the mamba kernel reads from the + # correct initial state slot (init_token_idx = 0). + self.num_accepted_tokens.np[:num_reqs] = ( + self.input_batch.num_accepted_tokens_cpu[:num_reqs] + ) + self.num_accepted_tokens.copy_to_gpu(num_reqs) use_spec_decode = len(scheduler_output.scheduled_spec_decode_tokens) > 0 ubatch_slices_attn = ubatch_slices_padded if pad_attn else ubatch_slices @@ -1510,6 +1609,11 @@ def execute_model( batch_desc, ) self.kv_connector_output = kv_connector_output + + # Now the batch has been launched we can wait for corrections from the + # previous model forward without breaking async scheduling. + if deferred_state_corrections_fn: + deferred_state_corrections_fn() return None @torch.inference_mode() @@ -1574,6 +1678,8 @@ def sample_tokens( assert self.sampling_done_event is not None self.sampling_done_event.record() + self.valid_sampled_token_count_gpu: torch.Tensor | None = None + def propose_draft_token_ids(sampled_token_ids): assert spec_decode_common_attn_metadata is not None self._draft_token_ids = self.propose_draft_token_ids( @@ -2088,11 +2194,8 @@ def _build_attention_metadata( # window size when capturing to make sure the correct kernel is selected. max_seq_len = self.max_model_len else: - max_seq_len = self.optimistic_seq_lens_cpu[:num_reqs].max().item() - if use_spec_decode and self.need_accepted_tokens: - self.num_accepted_tokens.np[:num_reqs] = self.input_batch.num_accepted_tokens_cpu[:num_reqs] - self.num_accepted_tokens.np[num_reqs:].fill(1) - self.num_accepted_tokens.copy_to_gpu() + max_seq_len = self.optimistic_seq_lens_cpu.numpy()[:num_reqs].max().item() + kv_cache_groups = self.kv_cache_config.kv_cache_groups @@ -2157,15 +2260,24 @@ def _get_block_table_and_slot_mapping(kv_cache_gid: int): block_table_gid_0, slot_mapping_gid_0 = _get_block_table_and_slot_mapping(0) self.long_seq_metadata, block_table_gid_0 = _get_pcp_metadata(block_table_gid_0) + num_computed_tokens_cpu = self.input_batch.num_computed_tokens_cpu_tensor[ + :num_reqs_padded + ] + seq_lens_cpu = self.optimistic_seq_lens_cpu[:num_reqs_padded] + if self.use_async_spec_decode: + # GPU tensors are authoritative in async mode. + seq_lens_cpu = None + num_computed_tokens_cpu = None cm_base = AscendCommonAttentionMetadata( query_start_loc=self.query_start_loc.gpu[: num_reqs_padded + 1], query_start_loc_cpu=self.query_start_loc.cpu[: num_reqs_padded + 1], seq_lens=self.seq_lens[:num_reqs_padded], # TODO - seq_lens_cpu=self.optimistic_seq_lens_cpu[:num_reqs_padded], + seq_lens_cpu=seq_lens_cpu, # TODO - num_computed_tokens_cpu=self.input_batch.num_computed_tokens_cpu_tensor[:num_reqs_padded], + # num_computed_tokens_cpu=self.input_batch.num_computed_tokens_cpu_tensor[:num_reqs_padded], + num_computed_tokens_cpu=num_computed_tokens_cpu, num_reqs=num_reqs_padded, num_actual_tokens=num_tokens, max_query_len=max_query_len, @@ -2439,14 +2551,13 @@ def _dummy_run( if is_graph_capturing and using_paged_attention(num_tokens, self.vllm_config) else max_query_len ) # type: ignore[assignment] - self.seq_lens[:num_reqs_padded] = torch.tensor( - seq_lens, - dtype=self.seq_lens.dtype, - device=self.seq_lens.device, - ) - self.seq_lens[num_reqs_padded:] = 0 - cum_num_tokens = self._get_cumsum_and_arange(num_scheduled_tokens, self.query_pos.np) + self.optimistic_seq_lens_cpu[:num_reqs] = seq_lens + self.optimistic_seq_lens_cpu[num_reqs:].fill_(0) + self.seq_lens.copy_(self.optimistic_seq_lens_cpu, non_blocking=True) + + cum_num_tokens = self._get_cumsum_and_arange( + num_scheduled_tokens, self.query_pos.np) self.query_start_loc.np[1 : num_reqs_padded + 1] = cum_num_tokens self.query_start_loc.copy_to_gpu() num_reqs_padded = self._pad_query_start_loc_for_fia( diff --git a/vllm_ascend/worker/npu_input_batch.py b/vllm_ascend/worker/npu_input_batch.py index a48ea5efbb6..98dfafa92b5 100644 --- a/vllm_ascend/worker/npu_input_batch.py +++ b/vllm_ascend/worker/npu_input_batch.py @@ -164,7 +164,7 @@ def __init__( # Speculative decoding self.num_accepted_tokens_cpu_tensor = torch.ones( - (max_num_reqs,), dtype=torch.int64, device="cpu", pin_memory=pin_memory + (max_num_reqs,), dtype=torch.int32, device="cpu", pin_memory=pin_memory ) self.num_accepted_tokens_cpu = self.num_accepted_tokens_cpu_tensor.numpy()