diff --git a/.github/workflows/create-release-branch.yaml b/.github/workflows/create-release-branch.yaml index 8f0ce9a2da..9ffc000880 100644 --- a/.github/workflows/create-release-branch.yaml +++ b/.github/workflows/create-release-branch.yaml @@ -164,6 +164,7 @@ jobs: needs: [prepare-release-branch, setup_and_build, discover_runner] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow steps: - name: Run pytest in tests/unit_tests run: | @@ -216,6 +217,7 @@ jobs: needs: [prepare-release-branch, setup_and_build, discover_tests, discover_runner] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow strategy: fail-fast: false matrix: @@ -248,6 +250,7 @@ jobs: needs: [prepare-release-branch, setup_and_build, discover_runner] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow steps: - name: Run Data Parallel test run: | @@ -275,6 +278,7 @@ jobs: needs: [prepare-release-branch, setup_and_build, discover_runner] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow steps: - name: Run PD disaggregate test run: | @@ -305,6 +309,7 @@ jobs: needs: [prepare-release-branch, setup_and_build, discover_runner] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow steps: - name: Run Sharegpt performance tests with warmup run: | diff --git a/.github/workflows/hourly-ci.yaml b/.github/workflows/hourly-ci.yaml index 659221c336..dd5c8a65cc 100644 --- a/.github/workflows/hourly-ci.yaml +++ b/.github/workflows/hourly-ci.yaml @@ -101,6 +101,7 @@ jobs: needs: [setup_and_build, discover_runner] # <-- UPDATED: Runs on the specific runner runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow steps: - name: Run pytest in tests/unit_tests run: | @@ -157,6 +158,7 @@ jobs: needs: [setup_and_build, discover_tests, discover_runner] # <-- UPDATED: Runs on the specific runner runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow strategy: fail-fast: false matrix: @@ -192,6 +194,7 @@ jobs: needs: [setup_and_build, discover_runner] # <-- UPDATED: Runs on the specific runner runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow steps: - name: Run Data Parallel test run: | @@ -220,6 +223,7 @@ jobs: needs: [setup_and_build, discover_runner] # <-- UPDATED: Runs on the specific runner runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow steps: - name: Run PD disaggregate test run: | diff --git a/.github/workflows/pre-merge-trigger.yaml b/.github/workflows/pre-merge-trigger.yaml index 8794b31847..0c7f9a1c26 100644 --- a/.github/workflows/pre-merge-trigger.yaml +++ b/.github/workflows/pre-merge-trigger.yaml @@ -17,8 +17,14 @@ concurrency: cancel-in-progress: true jobs: + gate: + runs-on: ubuntu-latest + environment: pre-merge-approval + steps: + - run: echo "Approved" execute_pre_merge: runs-on: ubuntu-latest + needs: gate timeout-minutes: 720 permissions: actions: write # dispatch workflows, read run status, cancel orphaned runs diff --git a/.github/workflows/pre-merge.yaml b/.github/workflows/pre-merge.yaml index 64336abd07..687c8ea7db 100644 --- a/.github/workflows/pre-merge.yaml +++ b/.github/workflows/pre-merge.yaml @@ -29,6 +29,7 @@ concurrency: jobs: retrieve_head_sha: runs-on: ubuntu-latest + timeout-minutes: 720 outputs: head_sha: ${{ steps.set_sha.outputs.head_sha }} steps: @@ -40,6 +41,7 @@ jobs: gatekeeper: needs: retrieve_head_sha runs-on: ubuntu-latest + timeout-minutes: 720 permissions: # Required to read the status of checks and PR details checks: read @@ -136,6 +138,7 @@ jobs: discover_runner: needs: gatekeeper runs-on: ${{ inputs.use_hourly_runner == 'true' && 'hourly-ci' || 'pr-ci' }} + timeout-minutes: 720 outputs: runner_name: ${{ steps.get_name.outputs.name }} steps: @@ -150,6 +153,7 @@ jobs: needs: [discover_runner, retrieve_head_sha] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + timeout-minutes: 720 outputs: matrix: ${{ steps.set-matrix.outputs.matrix }} steps: @@ -180,6 +184,7 @@ jobs: discover_calibration_tests: needs: [discover_runner, retrieve_head_sha] runs-on: ${{ needs.discover_runner.outputs.runner_name }} + timeout-minutes: 720 outputs: matrix: ${{ steps.set-matrix.outputs.matrix }} steps: @@ -207,6 +212,7 @@ jobs: # This job runs in parallel with the build job needs: [gatekeeper, retrieve_head_sha] runs-on: ubuntu-latest + timeout-minutes: 720 steps: - name: Checkout repository uses: actions/checkout@v4 @@ -235,6 +241,7 @@ jobs: if: inputs.skip_tests != 'true' # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + timeout-minutes: 720 permissions: contents: read # Required to checkout code and read history outputs: @@ -354,6 +361,8 @@ jobs: needs: [pre_merge_hpu_test_build, discover_runner, retrieve_head_sha] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow + timeout-minutes: 720 steps: - name: Run pytest in tests/unit_tests run: | @@ -378,6 +387,8 @@ jobs: needs: [pre_merge_hpu_test_build, hpu_unit_tests, discover_runner, retrieve_head_sha] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow + timeout-minutes: 720 steps: - name: Run test scripts run: | @@ -408,6 +419,8 @@ jobs: needs: [pre_merge_hpu_test_build, hpu_unit_tests, discover_runner, retrieve_head_sha] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow + timeout-minutes: 720 steps: - name: Run test scripts run: | @@ -433,6 +446,8 @@ jobs: needs: [pre_merge_hpu_test_build, hpu_unit_tests, discover_runner, retrieve_head_sha] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow + timeout-minutes: 720 steps: - name: Run test scripts run: | @@ -459,6 +474,8 @@ jobs: needs: [pre_merge_hpu_test_build, hpu_unit_tests, discover_tests, discover_runner, retrieve_head_sha] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow + timeout-minutes: 720 strategy: fail-fast: false matrix: @@ -491,6 +508,8 @@ jobs: calibration_tests: needs: [pre_merge_hpu_test_build, hpu_unit_tests, discover_calibration_tests, discover_runner, retrieve_head_sha] runs-on: ${{ needs.discover_runner.outputs.runner_name }} + environment: approved-workflow + timeout-minutes: 720 strategy: fail-fast: false matrix: @@ -522,6 +541,7 @@ jobs: calibration_arg_parsing_tests: needs: [pre_merge_hpu_test_build, discover_runner, retrieve_head_sha] runs-on: ${{ needs.discover_runner.outputs.runner_name }} + timeout-minutes: 720 steps: - name: Run calibration arg parsing tests run: | @@ -544,6 +564,7 @@ jobs: needs: [retrieve_head_sha] if: inputs.is_merge_group != 'true' runs-on: ubuntu-latest + timeout-minutes: 720 outputs: nixl_changed: ${{ steps.check.outputs.nixl_changed }} steps: @@ -571,6 +592,7 @@ jobs: needs: [check_dockerfile_changes, discover_runner, retrieve_head_sha] if: needs.check_dockerfile_changes.outputs.nixl_changed == 'true' runs-on: ${{ needs.discover_runner.outputs.runner_name }} + timeout-minutes: 720 steps: - name: Checkout repository uses: actions/checkout@v4 @@ -595,6 +617,7 @@ jobs: needs: [hpu_unit_tests, e2e, hpu_perf_tests, calibration_tests, calibration_arg_parsing_tests, discover_runner] # --- UPDATED: Run on the specific node --- runs-on: ${{ needs.discover_runner.outputs.runner_name }} + timeout-minutes: 720 # This job is required to pass for pre-merge CI. By itself it does nothing, and will only pass if all jobs specified in "needs" list pass. steps: - name: Succeeded if all previous jobs passed @@ -605,6 +628,7 @@ jobs: # This job runs after hpu-test-suite completes needs: [pre_merge_hpu_test, pre_merge_hpu_test_build] runs-on: ubuntu-latest + timeout-minutes: 720 permissions: # Permissions are required on a per-job basis pull-requests: write @@ -624,6 +648,7 @@ jobs: if: always() needs: [discover_runner, hpu_unit_tests, hpu_pd_tests, hpu_perf_tests, hpu_dp_tests, e2e, calibration_tests, calibration_arg_parsing_tests] runs-on: ${{ needs.discover_runner.outputs.runner_name }} + timeout-minutes: 720 steps: - name: Remove Docker image to free up space env: diff --git a/README.md b/README.md index 74a84f3cc5..14b0682dfc 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,8 @@ The vLLM Hardware Plugin for Intel® Gaudi® integrates [Intel® Gaudi® AI acce 5. Install torchaudio (required by some upstream vLLM models such as QWEN3_5). Use the CPU wheel with `--no-deps` to avoid pulling a conflicting CUDA torch: ```bash - pip install --no-deps torchaudio --extra-index-url https://download.pytorch.org/whl/cpu + TORCH_VERSION=$(python3 -c "import re, torch; print(re.match(r'(\d+\.\d+\.\d+)', torch.__version__).group(1))") + pip install --no-deps torchaudio==$TORCH_VERSION --extra-index-url https://download.pytorch.org/whl/cpu ``` To see all the available installation methods, such as NIXL, see the [Installation](https://vllm-gaudi.readthedocs.io/en/latest/getting_started/installation.html) guide. diff --git a/requirements.txt b/requirements.txt index 77b97ce24d..c14e18a0f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,7 @@ # Dependencies for HPU code -ray>=2.48.0 pandas>=2.2.3 numba>=0.58.0 numpy>=1.26.0 -transformers >= 4.56.0, != 5.0.*, != 5.1.*, != 5.2.*, != 5.3.*, != 5.4.*, != 5.5.0, != 5.6.* kaldi-native-fbank >= 1.18.7 decord >= 0.6.0 tblib==3.1.0 diff --git a/tests/full_tests/ci_e2e_discoverable_tests.sh b/tests/full_tests/ci_e2e_discoverable_tests.sh index c4fcc8650a..9f5e5ed0ab 100755 --- a/tests/full_tests/ci_e2e_discoverable_tests.sh +++ b/tests/full_tests/ci_e2e_discoverable_tests.sh @@ -415,6 +415,7 @@ run_longbench_qwen3_30b_fp8_static_fp8_fsdpa_slicing_compile_test() { run_gsm8k_qwen35_35b_a3b_test() { echo "➡️ Testing GSM8K on Qwen3.5-35B-A3B..." VLLM_SKIP_WARMUP=True ENABLE_APC=False VLLM_FUSED_BLOCK_SOFTMAX_ADJUSTMENT=False VLLM_GRAPH_RESERVED_MEM=0.8 \ + VLLM_PROMPT_BS_BUCKET_MAX=32 \ pytest -v -s "${VLLM_GAUDI_PREFIX}/tests/models/language/generation/test_common.py" --model_card_path "${VLLM_GAUDI_PREFIX}/tests/full_tests/model_cards/qwen3.5-35b-a3b.yaml" echo "✅ Test with Qwen3.5-35B-A3B passed." } diff --git a/tests/full_tests/model_cards/qwen3.5-35b-a3b.yaml b/tests/full_tests/model_cards/qwen3.5-35b-a3b.yaml index 2f92da9722..fdf42e5366 100644 --- a/tests/full_tests/model_cards/qwen3.5-35b-a3b.yaml +++ b/tests/full_tests/model_cards/qwen3.5-35b-a3b.yaml @@ -15,4 +15,4 @@ model_card: metrics: name: exact_match,strict-match - value: 0.75 + value: 0.9 diff --git a/tests/models/language/generation/test_common.py b/tests/models/language/generation/test_common.py index da03369475..f9a2001da7 100644 --- a/tests/models/language/generation/test_common.py +++ b/tests/models/language/generation/test_common.py @@ -9,62 +9,66 @@ def launch_lm_eval(eval_config): - trust_remote_code = eval_config.get('trust_remote_code', False) - dtype = eval_config.get('dtype', 'bfloat16') - max_num_seqs = eval_config.get('max_num_seqs', 128) - tp_size = int(os.environ.get('TP_SIZE', '1')) - enable_apc = os.environ.get('ENABLE_APC', 'True').lower() in ['true', '1'] - enforce_eager = os.environ.get('ENFORCE_EAGER', 'False').lower() in ['true', '1'] - kv_cache_dtype = os.environ.get('KV_CACHE_DTYPE', None) - task = eval_config.get('tasks', 'gsm8k') - async_scheduling = os.environ.get('ASYNC_SCHEDULING', 'False').lower() in ['true', '1'] - max_model_len = eval_config.get('max_model_len', 4096) - batch_size = eval_config.get('batch_size', 'auto') + trust_remote_code = eval_config.get("trust_remote_code", False) + dtype = eval_config.get("dtype", "bfloat16") + max_num_seqs = eval_config.get("max_num_seqs", 128) + tp_size = int(os.environ.get("TP_SIZE", "1")) + enable_apc = os.environ.get("ENABLE_APC", "True").lower() in ["true", "1"] + enforce_eager = eval_config.get("enforce_eager", False) + if "ENFORCE_EAGER" in os.environ: + enforce_eager = os.environ["ENFORCE_EAGER"].lower() in ["true", "1"] + kv_cache_dtype = os.environ.get("KV_CACHE_DTYPE", None) + task = eval_config.get("tasks", "gsm8k") + async_scheduling = os.environ.get("ASYNC_SCHEDULING", "False").lower() in ["true", "1"] + max_model_len = eval_config.get("max_model_len", 4096) + batch_size = eval_config.get("batch_size", "auto") model_args = { - 'pretrained': eval_config['model_name'], - 'tensor_parallel_size': tp_size, - 'async_scheduling': async_scheduling, - 'enforce_eager': enforce_eager, - 'enable_prefix_caching': enable_apc, - 'dtype': dtype, - 'max_model_len': max_model_len, - 'max_num_seqs': max_num_seqs, - 'trust_remote_code': trust_remote_code, - 'batch_size': batch_size, - 'enable_expert_parallel': eval_config.get('enable_expert_parallel', False), - 'chat_template_args': eval_config.get('chat_template_args', {}), - 'seed': eval_config.get('seed', 42), + "pretrained": eval_config["model_name"], + "tensor_parallel_size": tp_size, + "async_scheduling": async_scheduling, + "enforce_eager": enforce_eager, + "enable_prefix_caching": enable_apc, + "dtype": dtype, + "max_model_len": max_model_len, + "max_num_seqs": max_num_seqs, + "trust_remote_code": trust_remote_code, + "batch_size": batch_size, + "enable_expert_parallel": eval_config.get("enable_expert_parallel", False), + "chat_template_args": eval_config.get("chat_template_args", {}), + "seed": eval_config.get("seed", 42), } if kv_cache_dtype is not None: - model_args['kv_cache_dtype'] = kv_cache_dtype + model_args["kv_cache_dtype"] = kv_cache_dtype - if eval_config.get('gpu_memory_utilization') is not None: - model_args['gpu_memory_utilization'] = eval_config['gpu_memory_utilization'] - if eval_config.get('reasoning_parser') is not None: - model_args['reasoning_parser'] = eval_config['reasoning_parser'] - if eval_config.get('max_num_batched_tokens') is not None: - model_args['max_num_batched_tokens'] = eval_config['max_num_batched_tokens'] + if eval_config.get("gpu_memory_utilization") is not None: + model_args["gpu_memory_utilization"] = eval_config["gpu_memory_utilization"] + if eval_config.get("reasoning_parser") is not None: + model_args["reasoning_parser"] = eval_config["reasoning_parser"] + if eval_config.get("max_num_batched_tokens") is not None: + model_args["max_num_batched_tokens"] = eval_config["max_num_batched_tokens"] if eval_config.get("inc"): - assert os.environ.get('QUANT_CONFIG', None), "must set QUANT_CONFIG environment variable for using INC" - model_args['quantization'] = 'inc' # for both calibration and quantization + assert os.environ.get("QUANT_CONFIG", None), "must set QUANT_CONFIG environment variable for using INC" + model_args["quantization"] = "inc" # for both calibration and quantization if eval_config.get("fp8"): # for quantization in fp8 - model_args['kv_cache_dtype'] = 'fp8_inc' + model_args["kv_cache_dtype"] = "fp8_inc" kwargs = {} - if 'fewshot_as_multiturn' in eval_config: - kwargs['fewshot_as_multiturn'] = eval_config['fewshot_as_multiturn'] - if 'apply_chat_template' in eval_config: - kwargs['apply_chat_template'] = eval_config['apply_chat_template'] - if eval_config.get('max_gen_toks') is not None: - kwargs['gen_kwargs'] = f"max_gen_toks={eval_config['max_gen_toks']}" + if "fewshot_as_multiturn" in eval_config: + kwargs["fewshot_as_multiturn"] = eval_config["fewshot_as_multiturn"] + if "apply_chat_template" in eval_config: + kwargs["apply_chat_template"] = eval_config["apply_chat_template"] + if eval_config.get("max_gen_toks") is not None: + kwargs["gen_kwargs"] = f"max_gen_toks={eval_config['max_gen_toks']}" llm = VLLM(**model_args) - results = lm_eval.simple_evaluate(model=llm, - tasks=[task], - num_fewshot=eval_config["num_fewshot"], - limit=eval_config["limit"], - batch_size="auto", - **kwargs) + results = lm_eval.simple_evaluate( + model=llm, + tasks=[task], + num_fewshot=eval_config["num_fewshot"], + limit=eval_config["limit"], + batch_size="auto", + **kwargs, + ) del llm gc.collect() @@ -75,11 +79,11 @@ def test_models(model_card_path, monkeypatch) -> None: with open(model_card_path) as f: model_card = yaml.safe_load(f) print(f"{model_card=}") - model_config = model_card['model_card'] + model_config = model_card["model_card"] results = launch_lm_eval(model_config) RTOL = 0.03 - metric = model_card['metrics'] - task = model_config['tasks'] + metric = model_card["metrics"] + task = model_config["tasks"] try: measured_value = results["results"][task][metric["name"]] except KeyError as e: @@ -100,6 +104,7 @@ def __main__(args): if __name__ == "__main__": import argparse + parser = argparse.ArgumentParser(description="Test vLLM models with lm-eval") parser.add_argument("--model_card_path", type=str, required=True, help="Path to the model card YAML file.") args = parser.parse_args() diff --git a/tests/unit_tests/kv_offload/offloading_connector/test_scheduler.py b/tests/unit_tests/kv_offload/offloading_connector/test_scheduler.py index e899906e87..3b75ce2394 100644 --- a/tests/unit_tests/kv_offload/offloading_connector/test_scheduler.py +++ b/tests/unit_tests/kv_offload/offloading_connector/test_scheduler.py @@ -210,9 +210,12 @@ def test_concurrent_lookups_of_the_same_prefix(request_runner, async_scheduling: # store 1 blocks runner.new_request(token_ids=[0] * offloaded_block_size) runner.manager.prepare_store.side_effect = (lambda block_hashes, req_context: generate_store_output(block_hashes)) + # With sync scheduling, all-finished flush fires within this run. + # With async scheduling, the finish is delayed so flush fires later. runner.run( decoded_tokens=[EOS_TOKEN_ID], expected_stored_gpu_block_indexes=(0, 1, 2), + expected_flushed_gpu_block_indexes=(0, 1, 2) if not async_scheduling else (), ) # start a request to load the first block, but don't complete @@ -249,6 +252,9 @@ def test_concurrent_lookups_of_the_same_prefix(request_runner, async_scheduling: # second request will use the GPU prefix cache assert transfer_jobs == list(runner.offloading_spec.handler.transfer_specs) + # Fence index drained: stores completed before request_finished ran. + assert runner.connector_scheduler._block_id_to_pending_jobs == {} + @pytest.mark.parametrize("async_scheduling", [True, False]) def test_abort_loading_requests(request_runner, async_scheduling: bool): @@ -269,6 +275,7 @@ def test_abort_loading_requests(request_runner, async_scheduling: bool): runner.run( decoded_tokens=[EOS_TOKEN_ID], expected_stored_gpu_block_indexes=(0, 1, 2), + expected_flushed_gpu_block_indexes=(0, 1, 2) if not async_scheduling else (), ) # start a request to load the first block, but don't complete @@ -295,6 +302,7 @@ def test_abort_loading_requests(request_runner, async_scheduling: bool): runner.run( decoded_tokens=[], expected_loaded_gpu_block_indexes=(0, 1, 2), + expected_flushed_gpu_block_indexes=(0, 1, 2), ) # assert request is deleted diff --git a/tests/unit_tests/kv_offload/offloading_connector/utils.py b/tests/unit_tests/kv_offload/offloading_connector/utils.py index aab2a5c64f..91c17b9912 100644 --- a/tests/unit_tests/kv_offload/offloading_connector/utils.py +++ b/tests/unit_tests/kv_offload/offloading_connector/utils.py @@ -286,10 +286,14 @@ def new_request( def _parse_transfers(self): for transfer_spec in self.offloading_spec.get_flushed_transfers(): src_spec, dst_spec = transfer_spec - assert isinstance(src_spec, GPULoadStoreSpec) - - for block_id in src_spec.block_ids: - self.flushed_gpu_block_indexes.add(self.gpu_block_index[block_id.item()]) + if isinstance(src_spec, GPULoadStoreSpec): + # store flush + for block_id in src_spec.block_ids: + self.flushed_gpu_block_indexes.add(self.gpu_block_index[block_id.item()]) + else: + # load flush + for block_id in dst_spec.block_ids: + self.flushed_gpu_block_indexes.add(self.gpu_block_index[block_id.item()]) block_size_factor = self.offloaded_block_size // self.gpu_block_size diff --git a/tests/unit_tests/lora/test_llama_tp.py b/tests/unit_tests/lora/test_llama_tp.py index 6e8bfa8de8..1f714065d7 100755 --- a/tests/unit_tests/lora/test_llama_tp.py +++ b/tests/unit_tests/lora/test_llama_tp.py @@ -1,8 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project -import os - import vllm import vllm.config from vllm.lora.request import LoRARequest @@ -103,6 +101,5 @@ def test_llama_lora(llama32_lora_files): max_model_len=1024, max_loras=4, dtype='bfloat16', - hf_token=os.environ.get("HF_TOKEN"), ) generate_and_test(llm, llama32_lora_files) diff --git a/tests/unit_tests/lora/test_llm_with_multi_loras.py b/tests/unit_tests/lora/test_llm_with_multi_loras.py index ea21d18595..d33befbb5e 100644 --- a/tests/unit_tests/lora/test_llm_with_multi_loras.py +++ b/tests/unit_tests/lora/test_llm_with_multi_loras.py @@ -7,7 +7,6 @@ """ import pytest -import os from vllm import LLM from vllm.lora.request import LoRARequest @@ -66,7 +65,6 @@ def test_multiple_lora_requests(): gpu_memory_utilization=0.5, enforce_eager=True, dtype='bfloat16', - hf_token=os.environ.get("HF_TOKEN"), ) PROMPTS = ["Hello, my name is"] * 2 LORA_NAME = "Alice" diff --git a/tests/unit_tests/ops/test_hpu_rotary_embedding.py b/tests/unit_tests/ops/test_hpu_rotary_embedding.py index 2ef23ab4f9..35c2bb5805 100644 --- a/tests/unit_tests/ops/test_hpu_rotary_embedding.py +++ b/tests/unit_tests/ops/test_hpu_rotary_embedding.py @@ -201,6 +201,7 @@ def test_dynamic_ntk_scaling_rotary_embedding( "head_size": head_size, "rotary_dim": rotary_dim, "max_position_embeddings": max_position_embeddings, + "max_trained_positions": max_position_embeddings, "base": base, "is_neox_style": is_neox_style, "scaling_factor": scaling_factor, diff --git a/tests/unit_tests/test_bucketing.py b/tests/unit_tests/test_bucketing.py index 3145c8d3c5..24dfcfbbf1 100644 --- a/tests/unit_tests/test_bucketing.py +++ b/tests/unit_tests/test_bucketing.py @@ -232,11 +232,7 @@ def test_exponential_decode_cfgs_contiguous_pa_uses_max_blocks(mock_get_config): @patch('vllm_gaudi.extension.bucketing.exponential.get_config') def test_exponential_decode_cfgs_non_contiguous_pa_formula(mock_get_config): - """Verify non-contiguous PA decode cfg uses ceil(max_model_len/block_size)*max_num_seqs. - - Actual bounding of excessive buckets happens via the - num_ctx_tokens_less_or_equal_batched_max_model_len filter in generate_buckets(). - """ + """Verify non-contiguous PA decode cfg uses ceil(max_model_len/block_size)*max_num_seqs.""" mock_get_config.return_value = _MockConfig(use_contiguous_pa=False) strategy = ExponentialBucketingStrategy() @@ -527,36 +523,6 @@ def test_real_scenario_fallback_ctx_7408_not_truncated(): assert new_ctx == calc_fallback_value(7408, 32), (f"Fallback ctx {new_ctx} should equal calc_fallback_value result") -def test_exponential_decode_block_limit_uncapped(monkeypatch): - """Verify that decode block limit is computed from log2(max_decode_blocks). - - With the new approach, excessive warmup buckets are controlled by - filters in generate_buckets() (num_ctx_tokens_less_or_equal_batched_max_model_len) - rather than by capping the block limit in get_decode_cfgs(). - """ - monkeypatch.setenv("VLLM_EXPONENTIAL_BUCKETING", "true") - monkeypatch.setenv("VLLM_CONTIGUOUS_PA", "true") - clear_config() - get_config() - - strategy = ExponentialBucketingStrategy() - max_num_seqs = 21 - block_size = 128 - max_num_batched_tokens = 8192 - max_model_len = 131072 - max_blocks = 65536 - - bs_cfg, query_cfg, block_cfg = strategy.get_decode_cfgs(max_num_seqs, block_size, max_num_batched_tokens, - max_model_len, max_blocks) - - # max_decode_blocks = min(65536, ceil(131072/128)*21) = min(65536, 21504) = 21504 - expected_max_decode_blocks = min(max_blocks, math.ceil(max_model_len / block_size) * max_num_seqs) - expected_limit = math.ceil(math.log2(expected_max_decode_blocks)) + 1 - assert block_cfg[2] == expected_max_decode_blocks, ( - f"Expected max_decode_blocks={expected_max_decode_blocks}, got {block_cfg[2]}") - assert block_cfg[3] == expected_limit, (f"Expected decode_blocks_limit={expected_limit}, got {block_cfg[3]}") - - # --- Padding-aware bucketing tests --- @@ -664,66 +630,3 @@ def test_padding_aware_decode_cfgs_contiguous_pa_clamps_block_range(mock_get_con max_blocks=3593) assert block_cfg == [3465, 128, 3593, 899, 25] - - -# --- Tests that num_ctx_tokens_less_or_equal_batched_max_model_len filter is applied --- - - -@pytest.mark.parametrize("use_contiguous_pa", [True, False], ids=["contiguous_pa", "non_contiguous_pa"]) -@pytest.mark.parametrize( - ("max_model_len", "block_size", "max_num_seqs", "max_blocks", "max_num_batched_tokens"), - [ - (91964, 128, 256, 3593, 2048), # Qwen3-32B real scenario - (4096, 128, 64, 500, 2048), # small model - (131072, 128, 21, 65536, 8192), # long context - ], - ids=["qwen3_32b", "small_model", "long_ctx"], -) -def test_decode_buckets_satisfy_ctx_filter(monkeypatch, use_contiguous_pa, max_model_len, block_size, max_num_seqs, - max_blocks, max_num_batched_tokens): - """Every decode bucket returned by generate_buckets must satisfy - num_ctx_tokens_less_or_equal_batched_max_model_len: - ctx <= ceil(max_model_len / block_size) * bs (when ctx > ctx_range[0]) - """ - monkeypatch.setenv("VLLM_CONTIGUOUS_PA", str(use_contiguous_pa).lower()) - clear_config() - get_config() - - strategy = ExponentialBucketingStrategy() - - bs_cfg, query_cfg, block_cfg = strategy.get_decode_cfgs( - max_num_seqs=max_num_seqs, - block_size=block_size, - max_num_batched_tokens=max_num_batched_tokens, - max_model_len=max_model_len, - max_blocks=max_blocks, - ) - bs_range = strategy.get_range(bs_cfg) - query_range = strategy.get_range(query_cfg) - ctx_range = strategy.get_range(block_cfg) - - buckets = generate_buckets( - bs_range=bs_range, - query_range=query_range, - ctx_range=ctx_range, - is_prompt=False, - max_model_len=max_model_len, - max_num_seqs=max_num_seqs, - max_num_prefill_seqs=1, - max_num_batched_tokens=max_num_batched_tokens, - block_size=block_size, - max_blocks=max_blocks, - ) - - ctx_min = ctx_range[0] - max_blocks_per_seq = math.ceil(max_model_len / block_size) - - violations = [] - for bs, query, ctx in buckets: - if ctx > ctx_min and ctx > max_blocks_per_seq * bs: - violations.append((bs, query, ctx)) - - assert not violations, (f"Found {len(violations)} decode bucket(s) violating " - f"ctx <= ceil(max_model_len/block_size) * bs " - f"(max_blocks_per_seq={max_blocks_per_seq}):\n" + - "\n".join(f" bs={bs}, query={query}, ctx={ctx}" for bs, query, ctx in violations[:20])) diff --git a/tests/unit_tests/test_decode_bucket_hybrid.py b/tests/unit_tests/test_decode_bucket_hybrid.py new file mode 100644 index 0000000000..1e1d14d09f --- /dev/null +++ b/tests/unit_tests/test_decode_bucket_hybrid.py @@ -0,0 +1,440 @@ +# SPDX-License-Identifier: Apache-2.0 +############################################################################### +# Copyright (C) 2024-2026 Intel Corporation +# +# This source code is licensed under the Apache 2.0 license found in the +# LICENSE file in the root directory of this source tree. +############################################################################### +""" +Regression tests for decode bucket generation and warmup in hybrid models. + +Hybrid models (e.g., Qwen3.5) have block_size != attn_block_size: +- block_size=640: unified page size for KV cache management +- attn_block_size=128: HPU kernel page size used by paged attention + +The decode path (_create_decode_input_data) computes num_blocks using +attn_block_size. Therefore: +1. Decode buckets MUST be generated in attn_block_size units. +2. Warmup seq_lengths MUST produce the correct sum(num_blocks) to match + the target bucket after find_decode_bucket lookup. +3. For non-contiguous PA, _generate_seq_lengths MUST NOT cap num_blocks + at kv_cache_config.num_blocks (physical pool), because runtime can + exceed this via prefix-sharing. + +Regression: f24f3f9d introduced a formula for max_decode_blocks using +block_size instead of attn_block_size, and added a physical-pool cap +that prevented large decode buckets from being warmed. +""" + +import math + +import pytest +from types import SimpleNamespace +from unittest.mock import patch + +from vllm_gaudi.extension.bucketing.common import ( + HPUBucketingManager, + find_equal_or_closest_greater_config, +) +from vllm_gaudi.extension.bucketing.exponential import ExponentialBucketingStrategy +from vllm_gaudi.extension.runtime import get_config, clear_config +from vllm_gaudi.v1.worker.hpu_model_runner import HPUModelRunner + +# --- Qwen3.5 hybrid model parameters --- +_QWEN35_BLOCK_SIZE = 640 # unified page size (5 * 128) +_QWEN35_ATTN_BLOCK_SIZE = 128 # HPU kernel page size +_QWEN35_MAX_MODEL_LEN = 262144 +_QWEN35_MAX_NUM_SEQS = 45 +_QWEN35_NUM_HPU_BLOCKS = 15405 # physical blocks in attn_block_size units + + +@pytest.fixture(autouse=True) +def default_config(monkeypatch): + """Reset singleton and pin bucketing config for deterministic tests.""" + # Reset singleton to prevent state leakage between tests + HPUBucketingManager._instance = None + # Pin bucketing strategy to avoid env-dependent behavior in CI + monkeypatch.setenv("VLLM_BUCKETING_STRATEGY", "exp") + monkeypatch.delenv("VLLM_EXPONENTIAL_BUCKETING", raising=False) + clear_config() + get_config() + yield + HPUBucketingManager._instance = None + clear_config() + + +class _MockConfig: + """Lightweight mock for get_config().""" + + def __init__(self, **kwargs): + defaults = dict( + prefix_caching=False, + use_contiguous_pa=False, + merged_prefill=False, + VLLM_PROMPT_BS_BUCKET_MIN=None, + VLLM_PROMPT_BS_BUCKET_STEP=None, + VLLM_PROMPT_BS_BUCKET_MAX=None, + VLLM_PROMPT_SEQ_BUCKET_MIN=None, + VLLM_PROMPT_SEQ_BUCKET_STEP=None, + VLLM_PROMPT_SEQ_BUCKET_MAX=None, + VLLM_DECODE_BS_BUCKET_MIN=None, + VLLM_DECODE_BS_BUCKET_STEP=None, + VLLM_DECODE_BS_BUCKET_MAX=None, + VLLM_DECODE_BLOCK_BUCKET_MIN=None, + VLLM_DECODE_BLOCK_BUCKET_STEP=None, + VLLM_DECODE_BLOCK_BUCKET_MAX=None, + VLLM_PROMPT_QUERY_BUCKET_MIN=None, + ) + defaults.update(kwargs) + for k, v in defaults.items(): + object.__setattr__(self, k, v) + + +def _make_bucketing_manager(block_size, max_model_len, max_num_seqs, num_hpu_blocks): + """Create a minimally-configured HPUBucketingManager.""" + mgr = HPUBucketingManager.__new__(HPUBucketingManager) + mgr.block_size = block_size + mgr.max_model_len = max_model_len + mgr.max_num_seqs = max_num_seqs + mgr.max_num_prefill_seqs = 1 + mgr.num_hpu_blocks = num_hpu_blocks + mgr.max_num_batched_tokens = 131072 + mgr.initialized = True + mgr.mamba_chunk_size = None + mgr.mamba_chunk_size_is_explicit = False + mgr.num_speculative_tokens = None + mgr.use_sliding_window = False + mgr.fallback_bs_base_step = 2 + mgr.fallback_seq_base_step = 32 + mgr.fallback_blocks_base_step = 32 + mgr._fallback_max_ctx = 0 + return mgr + + +class _MockModelRunner: + """Minimal mock of HPUModelRunner for _generate_seq_lengths testing.""" + + def __init__(self, use_contiguous_pa, num_blocks, max_model_len, speculative_config=None): + self.use_contiguous_pa = use_contiguous_pa + self.kv_cache_config = SimpleNamespace(num_blocks=num_blocks) + self.max_model_len = max_model_len + self.speculative_config = speculative_config + + +def _generate_seq_lengths(runner, num_samples, num_blocks, block_size): + """Call HPUModelRunner._generate_seq_lengths via unbound method.""" + return HPUModelRunner._generate_seq_lengths(runner, num_samples, num_blocks, block_size) + + +# ============================================================================= +# Test 1: Decode bucket generation uses attn_block_size for hybrid models +# ============================================================================= + + +@patch('vllm_gaudi.extension.bucketing.exponential.get_config') +def test_hybrid_decode_buckets_use_attn_block_size(mock_exp_config): + """Decode buckets for hybrid model must be generated using attn_block_size. + + When block_size=640 is incorrectly used: + max_decode_blocks = ceil(262144/640)*45 = 18450 + When attn_block_size=128 is correctly used: + max_decode_blocks = ceil(262144/128)*45 = 92160 + + The warmup_model() scopes bucketing_manager.block_size to attn_block_size + before calling generate_decode_buckets(). This test verifies that using + the correct block_size produces the right max. + """ + mock_exp_config.return_value = _MockConfig(use_contiguous_pa=False) + + # With WRONG block_size (640) — the bug + mgr_wrong = _make_bucketing_manager( + block_size=_QWEN35_BLOCK_SIZE, + max_model_len=_QWEN35_MAX_MODEL_LEN, + max_num_seqs=_QWEN35_MAX_NUM_SEQS, + num_hpu_blocks=_QWEN35_NUM_HPU_BLOCKS, + ) + mgr_wrong.generate_decode_buckets() + wrong_max_ctx = max(ctx for _, _, ctx in mgr_wrong.decode_buckets) + + # With CORRECT block_size (128) — the fix + mgr_correct = _make_bucketing_manager( + block_size=_QWEN35_ATTN_BLOCK_SIZE, + max_model_len=_QWEN35_MAX_MODEL_LEN, + max_num_seqs=_QWEN35_MAX_NUM_SEQS, + num_hpu_blocks=_QWEN35_NUM_HPU_BLOCKS, + ) + mgr_correct.generate_decode_buckets() + correct_max_ctx = max(ctx for _, _, ctx in mgr_correct.decode_buckets) + + expected_max = math.ceil(_QWEN35_MAX_MODEL_LEN / _QWEN35_ATTN_BLOCK_SIZE) * _QWEN35_MAX_NUM_SEQS + wrong_expected = math.ceil(_QWEN35_MAX_MODEL_LEN / _QWEN35_BLOCK_SIZE) * _QWEN35_MAX_NUM_SEQS + + assert wrong_max_ctx <= wrong_expected, ( + f"Wrong block_size should produce max_ctx <= {wrong_expected}, got {wrong_max_ctx}") + assert correct_max_ctx <= expected_max, ( + f"Correct block_size should produce max_ctx <= {expected_max}, got {correct_max_ctx}") + assert correct_max_ctx > wrong_max_ctx, ( + f"attn_block_size={_QWEN35_ATTN_BLOCK_SIZE} should produce larger buckets than " + f"block_size={_QWEN35_BLOCK_SIZE}: {correct_max_ctx} vs {wrong_max_ctx}") + + +@patch('vllm_gaudi.extension.bucketing.exponential.get_config') +def test_hybrid_decode_buckets_cover_runtime_scenarios(mock_exp_config): + """Decode buckets must cover all runtime-reachable configurations. + + For 28 seqs at max context: 28 * ceil(262144/128) = 28 * 2048 = 57344. + A bucket >= 57344 must exist for batch_size=28. + """ + mock_exp_config.return_value = _MockConfig(use_contiguous_pa=False) + + mgr = _make_bucketing_manager( + block_size=_QWEN35_ATTN_BLOCK_SIZE, + max_model_len=_QWEN35_MAX_MODEL_LEN, + max_num_seqs=_QWEN35_MAX_NUM_SEQS, + num_hpu_blocks=_QWEN35_NUM_HPU_BLOCKS, + ) + mgr.generate_decode_buckets() + + # For each batch size, the max reachable ctx is bs * max_blocks_per_seq + max_blocks_per_seq = math.ceil(_QWEN35_MAX_MODEL_LEN / _QWEN35_ATTN_BLOCK_SIZE) + + # Check that large decode scenarios are covered + test_cases = [ + (28, 37620), # Real case from the bug report + (45, 92160), # Maximum: all seqs at max_model_len + (1, 2048), # Single seq at max context + ] + for bs, target_ctx in test_cases: + # Verify target is reachable (within physical limits for the batch) + max_reachable = bs * max_blocks_per_seq + assert target_ctx <= max_reachable, (f"Test case ({bs}, {target_ctx}) exceeds reachable max {max_reachable}") + + # Verify a covering bucket exists + found = find_equal_or_closest_greater_config(mgr.decode_buckets, (bs, 1, target_ctx)) + assert found is not None, (f"No decode bucket found >= ({bs}, 1, {target_ctx}). " + f"Max bucket for bs={bs}: " + f"{max((ctx for b, _, ctx in mgr.decode_buckets if b >= bs), default='NONE')}") + + +# ============================================================================= +# Test 2: _generate_seq_lengths does NOT cap for non-contiguous PA +# ============================================================================= + + +class TestGenerateSeqLengthsNonContiguousPA: + """Verify _generate_seq_lengths behavior for non-contiguous PA.""" + + def test_no_cap_when_num_blocks_exceeds_physical(self): + """num_blocks > kv_cache_config.num_blocks should NOT be capped. + + This is the key regression: capping prevents large decode buckets + from being warmed, causing 'not warmed-up' warnings at runtime. + """ + runner = _MockModelRunner( + use_contiguous_pa=False, + num_blocks=_QWEN35_NUM_HPU_BLOCKS, # 15405 + max_model_len=_QWEN35_MAX_MODEL_LEN, + ) + target_blocks = 37620 # Much larger than num_blocks=15405 + + seq_lengths = _generate_seq_lengths(runner, 28, target_blocks, _QWEN35_ATTN_BLOCK_SIZE) + + # Verify total blocks from seq_lengths matches target + total_blocks = sum(math.ceil((sl + 1) / _QWEN35_ATTN_BLOCK_SIZE) for sl in seq_lengths) + assert total_blocks == target_blocks, ( + f"Expected total_blocks={target_blocks}, got {total_blocks}. " + f"Non-contiguous PA must not cap at kv_cache_config.num_blocks={_QWEN35_NUM_HPU_BLOCKS}") + + def test_max_model_len_still_bounds_per_seq(self): + """Individual seq_lengths must still be clamped by max_model_len.""" + runner = _MockModelRunner( + use_contiguous_pa=False, + num_blocks=_QWEN35_NUM_HPU_BLOCKS, + max_model_len=_QWEN35_MAX_MODEL_LEN, + ) + # Large bucket: 1 seq with 92160 blocks (way beyond max_model_len/block_size=2048) + seq_lengths = _generate_seq_lengths(runner, 1, 92160, _QWEN35_ATTN_BLOCK_SIZE) + + assert len(seq_lengths) == 1 + assert seq_lengths[0] <= _QWEN35_MAX_MODEL_LEN - 1, ( + f"seq_length {seq_lengths[0]} exceeds max_model_len-1={_QWEN35_MAX_MODEL_LEN - 1}") + + @pytest.mark.parametrize("batch_size,target_blocks", [ + (28, 37620), + (45, 92160), + (14, 20000), + (1, 2048), + ]) + def test_warmup_roundtrip_targets_correct_bucket(self, batch_size, target_blocks): + """Verify warmup roundtrip: seq_lengths -> num_blocks -> find_decode_bucket. + + The warmup path generates seq_lengths from the target bucket, then the + runtime decode path recomputes num_blocks from those seq_lengths. The + resulting sum(num_blocks) must find the same bucket via find_decode_bucket. + """ + runner = _MockModelRunner( + use_contiguous_pa=False, + num_blocks=_QWEN35_NUM_HPU_BLOCKS, + max_model_len=_QWEN35_MAX_MODEL_LEN, + ) + max_blocks_per_seq = math.ceil(_QWEN35_MAX_MODEL_LEN / _QWEN35_ATTN_BLOCK_SIZE) + + # Skip unreachable buckets (can't produce these at runtime either) + max_reachable = batch_size * max_blocks_per_seq + if target_blocks > max_reachable: + pytest.skip(f"Bucket ({batch_size}, 1, {target_blocks}) is unreachable " + f"(max={max_reachable})") + + seq_lengths = _generate_seq_lengths(runner, batch_size, target_blocks, _QWEN35_ATTN_BLOCK_SIZE) + + # Simulate _create_decode_input_data's num_blocks computation + num_blocks_per_req = [math.ceil((sl + 1) / _QWEN35_ATTN_BLOCK_SIZE) for sl in seq_lengths] + total_blocks_after_roundtrip = sum(num_blocks_per_req) + + # The roundtrip total should equal the target (for reachable buckets) + assert total_blocks_after_roundtrip == target_blocks, ( + f"Roundtrip mismatch for bucket ({batch_size}, 1, {target_blocks}): " + f"got sum(num_blocks)={total_blocks_after_roundtrip}. " + f"Warmup will target wrong bucket!") + + +# ============================================================================= +# Test 3: _generate_seq_lengths DOES cap for contiguous PA +# ============================================================================= + + +class TestGenerateSeqLengthsContiguousPA: + """Verify _generate_seq_lengths caps correctly for contiguous PA.""" + + def test_caps_at_physical_blocks(self): + """For contiguous PA, num_blocks MUST be capped at kv_cache_config.num_blocks. + + This is because contiguous PA uses block_id = num_blocks - 1 as the + contiguous allocation base, which must be a valid physical block. + """ + runner = _MockModelRunner( + use_contiguous_pa=True, + num_blocks=_QWEN35_NUM_HPU_BLOCKS, # 15405 + max_model_len=_QWEN35_MAX_MODEL_LEN, + ) + target_blocks = 37620 # Larger than physical + + seq_lengths = _generate_seq_lengths(runner, 28, target_blocks, _QWEN35_ATTN_BLOCK_SIZE) + + # Total blocks should be capped at num_blocks + total_blocks = sum(math.ceil((sl + 1) / _QWEN35_ATTN_BLOCK_SIZE) for sl in seq_lengths) + assert total_blocks <= _QWEN35_NUM_HPU_BLOCKS, ( + f"Contiguous PA: total_blocks={total_blocks} exceeds physical " + f"num_blocks={_QWEN35_NUM_HPU_BLOCKS}. block_id would overflow!") + + +# ============================================================================= +# Test 4: End-to-end decode bucket max formula for hybrid models +# ============================================================================= + + +@patch('vllm_gaudi.extension.bucketing.exponential.get_config') +def test_hybrid_max_decode_blocks_formula(mock_exp_config): + """Verify max_decode_blocks = ceil(max_model_len/attn_block_size) * max_num_seqs. + + For Qwen3.5: ceil(262144/128) * 45 = 2048 * 45 = 92160. + This must NOT use block_size=640 which gives ceil(262144/640)*45 = 18450. + """ + mock_exp_config.return_value = _MockConfig(use_contiguous_pa=False) + strategy = ExponentialBucketingStrategy() + + # Using attn_block_size=128 (correct) + _, _, block_cfg = strategy.get_decode_cfgs( + max_num_seqs=_QWEN35_MAX_NUM_SEQS, + block_size=_QWEN35_ATTN_BLOCK_SIZE, + max_num_batched_tokens=131072, + max_model_len=_QWEN35_MAX_MODEL_LEN, + max_blocks=_QWEN35_NUM_HPU_BLOCKS, + ) + expected_max = math.ceil(_QWEN35_MAX_MODEL_LEN / _QWEN35_ATTN_BLOCK_SIZE) * _QWEN35_MAX_NUM_SEQS + assert block_cfg[2] == expected_max, ( + f"max_decode_blocks should be {expected_max} with attn_block_size={_QWEN35_ATTN_BLOCK_SIZE}, " + f"got {block_cfg[2]}") + + # Using block_size=640 (wrong — would produce 18450) + _, _, block_cfg_wrong = strategy.get_decode_cfgs( + max_num_seqs=_QWEN35_MAX_NUM_SEQS, + block_size=_QWEN35_BLOCK_SIZE, + max_num_batched_tokens=131072, + max_model_len=_QWEN35_MAX_MODEL_LEN, + max_blocks=_QWEN35_NUM_HPU_BLOCKS, + ) + wrong_max = math.ceil(_QWEN35_MAX_MODEL_LEN / _QWEN35_BLOCK_SIZE) * _QWEN35_MAX_NUM_SEQS + assert block_cfg_wrong[2] == wrong_max, ( + f"With block_size=640, max_decode_blocks should be {wrong_max}, got {block_cfg_wrong[2]}") + assert expected_max > wrong_max, (f"attn_block_size formula ({expected_max}) must produce larger max than " + f"block_size formula ({wrong_max})") + + +# ============================================================================= +# Test 5: Verify the bug scenario — bucket (28, 1, 37620) IS reachable +# ============================================================================= + + +def test_bucket_37620_reachable_at_runtime(): + """Bucket (28, 1, 37620) is reachable: 28 seqs averaging ~1344 blocks each. + + Each seq has context_len ≈ 171903 tokens → ceil(171904/128) = 1344 blocks. + Sum across 28 seqs ≈ 37620. This is within max_model_len per seq. + """ + attn_block_size = _QWEN35_ATTN_BLOCK_SIZE + max_blocks_per_seq = math.ceil(_QWEN35_MAX_MODEL_LEN / attn_block_size) # 2048 + batch_size = 28 + target_blocks = 37620 + + # Each seq needs target_blocks/batch_size ≈ 1344 blocks + blocks_per_seq = target_blocks / batch_size # 1343.57 + tokens_per_seq = blocks_per_seq * attn_block_size # ~171977 + + assert tokens_per_seq < _QWEN35_MAX_MODEL_LEN, (f"Scenario requires {tokens_per_seq:.0f} tokens/seq which exceeds " + f"max_model_len={_QWEN35_MAX_MODEL_LEN}") + assert blocks_per_seq <= max_blocks_per_seq, (f"Scenario requires {blocks_per_seq:.1f} blocks/seq which exceeds " + f"max_blocks_per_seq={max_blocks_per_seq}") + + +# ============================================================================= +# Test 6: Regression test — with old cap, bucket warmup targets wrong bucket +# ============================================================================= + + +def test_old_cap_causes_wrong_bucket_warmup(): + """Demonstrate that capping at kv_cache_config.num_blocks causes warmup + to target the wrong bucket, producing 'not warmed-up' warnings. + + With cap: _generate_seq_lengths(28, min(15405, 37620)=15405, 128) + → sum(num_blocks) ≈ 15405 → find_decode_bucket finds a smaller bucket. + """ + runner_capped = _MockModelRunner( + use_contiguous_pa=True, # simulate old buggy behavior (cap always) + num_blocks=_QWEN35_NUM_HPU_BLOCKS, + max_model_len=_QWEN35_MAX_MODEL_LEN, + ) + target_bucket_ctx = 37620 + + # With cap (simulates old behavior) + seq_lengths_capped = _generate_seq_lengths(runner_capped, 28, target_bucket_ctx, _QWEN35_ATTN_BLOCK_SIZE) + total_capped = sum(math.ceil((sl + 1) / _QWEN35_ATTN_BLOCK_SIZE) for sl in seq_lengths_capped) + + # Without cap (correct behavior for non-contiguous PA) + runner_uncapped = _MockModelRunner( + use_contiguous_pa=False, + num_blocks=_QWEN35_NUM_HPU_BLOCKS, + max_model_len=_QWEN35_MAX_MODEL_LEN, + ) + seq_lengths_uncapped = _generate_seq_lengths(runner_uncapped, 28, target_bucket_ctx, _QWEN35_ATTN_BLOCK_SIZE) + total_uncapped = sum(math.ceil((sl + 1) / _QWEN35_ATTN_BLOCK_SIZE) for sl in seq_lengths_uncapped) + + # Capped version misses the target + assert total_capped < target_bucket_ctx, (f"Capped version should produce fewer blocks than target: " + f"{total_capped} vs {target_bucket_ctx}") + assert total_capped <= _QWEN35_NUM_HPU_BLOCKS, ( + f"Capped version should be bounded by num_blocks={_QWEN35_NUM_HPU_BLOCKS}") + + # Uncapped version hits the target exactly + assert total_uncapped == target_bucket_ctx, (f"Uncapped version should produce exactly {target_bucket_ctx} blocks, " + f"got {total_uncapped}") diff --git a/tests/unit_tests/worker/test_ensure_multi_token_decodes_last.py b/tests/unit_tests/worker/test_ensure_multi_token_decodes_last.py new file mode 100644 index 0000000000..c3cb83a956 --- /dev/null +++ b/tests/unit_tests/worker/test_ensure_multi_token_decodes_last.py @@ -0,0 +1,160 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Unit tests for hpu_model_runner.ensure_multi_token_decodes_last. + +Covers the routing invariant introduced for KV-offload + async spec-decode +(PR #1401, originally #1264): when speculative decoding is disabled, a decode +request with more than one scheduled token (a resumed/catch-up request from +e.g. OffloadingConnector requeue) must be sorted to the end of the decode +region so that `_get_prompts_and_decodes` routes it through the prefill path, +avoiding bucket overflow / Habana workspace OOM. +""" + +import pytest +import torch +import habana_frameworks.torch # noqa: F401 + +from vllm.sampling_params import SamplingParams +from vllm.utils.platform_utils import is_pin_memory_available + +from vllm_gaudi.v1.worker.hpu_input_batch import (CachedRequestState, InputBatch) +from vllm_gaudi.v1.worker.hpu_model_runner import ensure_multi_token_decodes_last + + +def _make_request(req_id: str, prompt_len: int, num_computed_tokens: int) -> CachedRequestState: + return CachedRequestState( + req_id=req_id, + prompt_token_ids=[0] * prompt_len, + sampling_params=SamplingParams(), + pooling_params=None, + mm_features=[], + block_ids=([], ), + generator=None, + num_computed_tokens=num_computed_tokens, + output_token_ids=[], + ) + + +def _make_input_batch(reqs: list[CachedRequestState]) -> InputBatch: + batch = InputBatch( + max_num_reqs=max(len(reqs), 1), + max_model_len=1024, + max_num_batched_tokens=1024, + device=torch.device("hpu"), + pin_memory=is_pin_memory_available(), + vocab_size=1024, + block_sizes=[1], + kernel_block_sizes=[1], + ) + for i, req in enumerate(reqs): + assigned = batch.add_request(req) + assert assigned == i + return batch + + +def test_multi_token_decode_sorted_to_end_of_decode_region(): + """[1-tok decode, multi-tok decode, 1-tok decode, prompt] should become + [1-tok decode, 1-tok decode, multi-tok decode, prompt].""" + reqs = [ + # 1-tok decode: num_computed >= num_prompt + _make_request("d0", prompt_len=4, num_computed_tokens=4), + # multi-tok catch-up decode (num_scheduled_tokens > 1) + _make_request("d_multi", prompt_len=4, num_computed_tokens=4), + # another 1-tok decode + _make_request("d1", prompt_len=4, num_computed_tokens=5), + # prompt: num_computed < num_prompt + _make_request("p0", prompt_len=8, num_computed_tokens=2), + ] + batch = _make_input_batch(reqs) + scheduled = {"d0": 1, "d_multi": 5, "d1": 1, "p0": 8} + + ensure_multi_token_decodes_last(batch, scheduled) + + # Expected layout: 1-tok decodes first, then multi-tok decode, then prompt. + assert list(batch.req_ids[:batch.num_reqs]) == ["d0", "d1", "d_multi", "p0"] + # Decode region (first 3) preserves the prompt boundary. + for i in range(3): + assert batch.num_computed_tokens_cpu[i] >= batch.num_prompt_tokens[i] + # Prompt stays last. + assert batch.num_computed_tokens_cpu[3] < batch.num_prompt_tokens[3] + + +def test_no_op_when_only_single_token_decodes(): + reqs = [ + _make_request("d0", prompt_len=4, num_computed_tokens=4), + _make_request("d1", prompt_len=4, num_computed_tokens=5), + _make_request("p0", prompt_len=8, num_computed_tokens=2), + ] + batch = _make_input_batch(reqs) + scheduled = {"d0": 1, "d1": 1, "p0": 8} + original_order = list(batch.req_ids[:batch.num_reqs]) + + ensure_multi_token_decodes_last(batch, scheduled) + + assert list(batch.req_ids[:batch.num_reqs]) == original_order + + +def test_no_op_when_only_multi_token_decodes(): + """All decodes are multi-token: order of decode region is preserved.""" + reqs = [ + _make_request("d0", prompt_len=4, num_computed_tokens=4), + _make_request("d1", prompt_len=4, num_computed_tokens=5), + _make_request("p0", prompt_len=8, num_computed_tokens=2), + ] + batch = _make_input_batch(reqs) + scheduled = {"d0": 3, "d1": 4, "p0": 8} + original_order = list(batch.req_ids[:batch.num_reqs]) + + ensure_multi_token_decodes_last(batch, scheduled) + + # Both d0 and d1 are multi-tok; write_pos never advances, no swaps occur. + assert list(batch.req_ids[:batch.num_reqs]) == original_order + + +def test_decode_only_batch_no_prompt(): + """No prompt in the batch: decode_end == num_reqs.""" + reqs = [ + _make_request("d_multi", prompt_len=4, num_computed_tokens=4), + _make_request("d0", prompt_len=4, num_computed_tokens=4), + _make_request("d1", prompt_len=4, num_computed_tokens=5), + ] + batch = _make_input_batch(reqs) + scheduled = {"d_multi": 7, "d0": 1, "d1": 1} + + ensure_multi_token_decodes_last(batch, scheduled) + + assert list(batch.req_ids[:batch.num_reqs]) == ["d0", "d1", "d_multi"] + + +def test_prompt_only_batch_unchanged(): + """No decodes: function should be a no-op.""" + reqs = [ + _make_request("p0", prompt_len=8, num_computed_tokens=2), + _make_request("p1", prompt_len=8, num_computed_tokens=0), + ] + batch = _make_input_batch(reqs) + scheduled = {"p0": 6, "p1": 8} + original_order = list(batch.req_ids[:batch.num_reqs]) + + ensure_multi_token_decodes_last(batch, scheduled) + + assert list(batch.req_ids[:batch.num_reqs]) == original_order + + +def test_missing_req_id_treated_as_single_token(): + """Defensive: scheduled_tokens.get(req_id, 1) defaults to 1 if missing.""" + reqs = [ + _make_request("d0", prompt_len=4, num_computed_tokens=4), + _make_request("d_multi", prompt_len=4, num_computed_tokens=4), + ] + batch = _make_input_batch(reqs) + # d_multi is the only key; d0 absent -> treated as 1-tok decode. + scheduled = {"d_multi": 3} + + ensure_multi_token_decodes_last(batch, scheduled) + + assert list(batch.req_ids[:batch.num_reqs]) == ["d0", "d_multi"] + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/vllm_gaudi/entrypoints/openai/multi_model_api_server.py b/vllm_gaudi/entrypoints/openai/multi_model_api_server.py index 5b628aed52..c57c02cc2c 100644 --- a/vllm_gaudi/entrypoints/openai/multi_model_api_server.py +++ b/vllm_gaudi/entrypoints/openai/multi_model_api_server.py @@ -10,7 +10,7 @@ from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass -from typing import NamedTuple +from typing import Any, NamedTuple import uvloop import yaml @@ -149,6 +149,19 @@ async def sleep(self, level: int = 1, mode: str = "abort") -> None: async def wake_up(self, tags: list[str] | None = None) -> None: await self._engine.wake_up(tags=tags) + async def notify_kv_transfer_request_rejected( + self, + request_id: str, + kv_transfer_params: dict[str, Any], + *, + data_parallel_rank: int | None = None, + ) -> None: + await self._engine.notify_kv_transfer_request_rejected( + request_id=request_id, + kv_transfer_params=kv_transfer_params, + data_parallel_rank=data_parallel_rank, + ) + async def is_sleeping(self) -> bool: return await self._engine.is_sleeping() @@ -299,7 +312,7 @@ def _resolve_frontend_settings( if model_overrides.enable_auto_tool_choice is not None else args.enable_auto_tool_choice) tool_call_parser = (model_overrides.tool_call_parser if model_overrides.tool_call_parser is not None else args.tool_call_parser) - chat_template = (model_overrides.chat_template if model_overrides.chat_template is not None else args.chat_template) + chat_template = model_overrides.chat_template if model_overrides.chat_template is not None else args.chat_template return FrontendSettings( enable_auto_tool_choice=enable_auto_tool_choice, tool_call_parser=tool_call_parser, @@ -321,7 +334,7 @@ def _validate_model_frontend_overrides( if effective_enable_auto and not effective_tool_parser: raise ValueError(f"Model '{model_name}' enables auto tool choice but no tool_call_parser is set.") - if (effective_enable_auto and effective_tool_parser and effective_tool_parser not in valid_tool_parsers): + if effective_enable_auto and effective_tool_parser and effective_tool_parser not in valid_tool_parsers: raise ValueError(f"Model '{model_name}' has invalid tool_call_parser='{effective_tool_parser}'. " f"Valid options: {valid_tool_parsers}") @@ -392,8 +405,7 @@ def _load_multi_model_config(path: str, ) -> MultiModelConfigLoadResult: if default_model is None: default_model = next(iter(model_configs.keys())) if default_model not in model_configs: - raise ValueError(f"Default model '{default_model}' not found in config models: " - f"{list(model_configs.keys())}") + raise ValueError(f"Default model '{default_model}' not found in config models: {list(model_configs.keys())}") return MultiModelConfigLoadResult( model_configs=model_configs, @@ -418,8 +430,13 @@ async def build_multi_model_engine_client( args: Namespace, *, usage_context: UsageContext = UsageContext.OPENAI_API_SERVER, -) -> AsyncIterator[tuple[MultiModelEngineClient, MultiModelAsyncLLM, dict[str, BaseModelPath], dict[str, int], dict[ - str, ModelFrontendOverrides]]]: +) -> AsyncIterator[tuple[ + MultiModelEngineClient, + MultiModelAsyncLLM, + dict[str, BaseModelPath], + dict[str, int], + dict[str, ModelFrontendOverrides], +]]: config_path = _resolve_multi_model_config_path() if not config_path: raise ValueError("A multi-model config path must be set when multi-model mode is enabled. " @@ -504,7 +521,8 @@ async def _init_multi_model_state( chat_template=resolved_chat_template, chat_template_content_format=args.chat_template_content_format, default_chat_template_kwargs=args.default_chat_template_kwargs, - trust_request_chat_template=args.trust_request_chat_template) + trust_request_chat_template=args.trust_request_chat_template, + ) if "generate" in supported_tasks: from vllm.entrypoints.openai.generate.api_router import init_generate_state @@ -537,8 +555,7 @@ async def _init_multi_model_state( def _attach_multi_model_router(app: FastAPI) -> None: if not envs.VLLM_SERVER_DEV_MODE: - logger.warning("The /v1/models/switch endpoint is disabled. " - "Set VLLM_SERVER_DEV_MODE=1 to enable it.") + logger.warning("The /v1/models/switch endpoint is disabled. Set VLLM_SERVER_DEV_MODE=1 to enable it.") return router = APIRouter() diff --git a/vllm_gaudi/extension/bucketing/common.py b/vllm_gaudi/extension/bucketing/common.py index 2fd85dbdae..ba4b11a7a4 100644 --- a/vllm_gaudi/extension/bucketing/common.py +++ b/vllm_gaudi/extension/bucketing/common.py @@ -447,14 +447,6 @@ def batch_size_smaller_than_blocks(bs, query, ctx): omitted_buckets.add(("condition: bs <= ctx, ", "-> bs, query, ctx: ", bs, query, ctx)) return bs <= ctx - def num_ctx_tokens_less_or_equal_batched_max_model_len(bs, query, ctx): - is_valid = ctx <= math.ceil(max_model_len / block_size) * bs if ctx > ctx_range[0] else True - if not is_valid: - omitted_buckets.add( - ("condition: ctx <= math.ceil(max_model_len / block_size) * bs if ctx > ctx_range[0] else True", - "-> bs, query, ctx: ", bs, query, ctx)) - return is_valid - filters_map = { "prompt": { # depends only on merged_prefill @@ -463,8 +455,8 @@ def num_ctx_tokens_less_or_equal_batched_max_model_len(bs, query, ctx): }, "decode": { # depends only on contiguous PA - True: [num_ctx_tokens_less_or_equal_batched_max_model_len], - False: [batch_size_smaller_than_blocks, num_ctx_tokens_less_or_equal_batched_max_model_len], + True: [], + False: [batch_size_smaller_than_blocks], } } diff --git a/vllm_gaudi/models/minimax_m2.py b/vllm_gaudi/models/minimax_m2.py index a6d720e00e..459f480879 100644 --- a/vllm_gaudi/models/minimax_m2.py +++ b/vllm_gaudi/models/minimax_m2.py @@ -33,7 +33,7 @@ from vllm.model_executor.layers.attention import Attention from vllm.compilation.decorators import support_torch_compile from vllm.config import CacheConfig, ModelConfig, VllmConfig -from vllm.distributed import (get_pp_group, get_tensor_model_parallel_world_size, tensor_model_parallel_all_reduce) +from vllm.distributed import (get_pp_group, get_tensor_model_parallel_world_size) from vllm.model_executor.layers.fused_moe import FusedMoE from vllm.model_executor.layers.mamba.linear_attn import MiniMaxText01RMSNormTP from vllm.model_executor.layers.layernorm import RMSNorm @@ -107,9 +107,6 @@ def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: # router_logits: (bs * seq_len, n_experts) router_logits, _ = self.gate(hidden_states.to(torch.float32)) final_hidden_states = self.experts(hidden_states=hidden_states, router_logits=router_logits) - final_hidden_states = final_hidden_states - if self.tp_size > 1: - final_hidden_states = tensor_model_parallel_all_reduce(final_hidden_states) return final_hidden_states.view(bs, seq_len, hidden_dim) diff --git a/vllm_gaudi/models/qwen3_5.py b/vllm_gaudi/models/qwen3_5.py index d8b3dabf7f..c37b39fa92 100644 --- a/vllm_gaudi/models/qwen3_5.py +++ b/vllm_gaudi/models/qwen3_5.py @@ -1,5 +1,5 @@ import torch -from vllm.model_executor.layers.mamba.gdn_linear_attn import GatedDeltaNetAttention +from vllm.model_executor.layers.mamba.gdn.qwen_gdn_linear_attn import QwenGatedDeltaNetAttention from vllm.forward_context import get_forward_context from vllm_gaudi.ops.causal_conv1d_pytorch import ( @@ -26,7 +26,7 @@ def _save_ssm_state(core_attn_out, final_state, ssm_state, state_indices): return core_attn_out -class HPUGatedDeltaNetAttention(GatedDeltaNetAttention): +class HPUGatedDeltaNetAttention(QwenGatedDeltaNetAttention): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -295,10 +295,10 @@ def forward( # Replace the class in the upstream modules so that both Qwen3-Next and # Qwen3.5 model definitions instantiate HPUGatedDeltaNetAttention. -import vllm.model_executor.layers.mamba.gdn_linear_attn as _gdn_module # noqa: E402 +import vllm.model_executor.layers.mamba.gdn.qwen_gdn_linear_attn as _gdn_module # noqa: E402 import vllm.model_executor.models.qwen3_next as _qwen3_next_module # noqa: E402 import vllm.model_executor.models.qwen3_5 as _qwen3_5_module # noqa: E402 -_gdn_module.GatedDeltaNetAttention = HPUGatedDeltaNetAttention -_qwen3_next_module.GatedDeltaNetAttention = HPUGatedDeltaNetAttention -_qwen3_5_module.GatedDeltaNetAttention = HPUGatedDeltaNetAttention +_gdn_module.QwenGatedDeltaNetAttention = HPUGatedDeltaNetAttention +_qwen3_next_module.QwenGatedDeltaNetAttention = HPUGatedDeltaNetAttention +_qwen3_5_module.QwenGatedDeltaNetAttention = HPUGatedDeltaNetAttention diff --git a/vllm_gaudi/ops/hpu_compressed_tensors.py b/vllm_gaudi/ops/hpu_compressed_tensors.py index af7916020c..74d91e809b 100644 --- a/vllm_gaudi/ops/hpu_compressed_tensors.py +++ b/vllm_gaudi/ops/hpu_compressed_tensors.py @@ -20,7 +20,6 @@ CompressedTensorsConfig, CompressedTensorsMoEMethod, CompressedTensorsKVCacheMethod, - SparsityCompressionConfig, ) from vllm.model_executor.layers.quantization.compressed_tensors import (compressed_tensors_moe) from vllm.model_executor.layers.quantization.compressed_tensors.compressed_tensors_moe import ( @@ -1056,8 +1055,6 @@ def __init__( target_scheme_map: dict[str, Any], ignore: list[str], quant_format: str, - sparsity_scheme_map: dict[str, SparsityCompressionConfig], - sparsity_ignore_list: list[str], kv_cache_scheme: dict[str, Any] | None = None, config: dict[str, Any] | None = None, transform_config: dict[str, Any] | None = None, @@ -1068,8 +1065,6 @@ def __init__( target_scheme_map, ignore, quant_format, - sparsity_scheme_map, - sparsity_ignore_list, kv_cache_scheme, config, transform_config, diff --git a/vllm_gaudi/ops/hpu_fused_moe.py b/vllm_gaudi/ops/hpu_fused_moe.py index c496c79679..86a10bdebc 100644 --- a/vllm_gaudi/ops/hpu_fused_moe.py +++ b/vllm_gaudi/ops/hpu_fused_moe.py @@ -22,8 +22,6 @@ FusedTopKRouter, ) from vllm.model_executor.layers.fused_moe.router.grouped_topk_router import ( GroupedTopKRouter, ) -from vllm.model_executor.layers.fused_moe.router.router_factory import ( - EMPTY_EPLB_STATE, ) from vllm.model_executor.layers.fused_moe.router.routing_simulator_router import ( RoutingSimulatorRouter, ) from vllm.model_executor.layers.fused_moe.router.zero_expert_router import ( @@ -284,7 +282,8 @@ def patched_fused_moe_forward( ensure_moe_quant_config_init, and _sequence_parallel_context — all of which access ForwardContext and cause torch.compile graph breaks), we use a layer reference stashed on the runner at FusedMoE.__init__ time - (self._hpu_layer_ref) and call _forward_impl directly. This also + (self._hpu_layer_ref) and bypass _forward_impl for dp_size==1, + calling _apply_quant_method + _maybe_combine directly. This also bypasses self.layer_name (a per-layer string) so dynamo no longer emits per-layer string guards that trigger recompilation. @@ -297,17 +296,27 @@ def patched_fused_moe_forward( hidden_states, og_hidden_dims = self._maybe_pad_hidden_states(shared_experts_input, hidden_states) if self.moe_config.dp_size == 1: - # Use layer ref saved at FusedMoE.__init__ to avoid both the - # get_layer_from_name(self.layer_name) lookup (graph break) and - # the per-layer string guard from accessing self.layer_name. - # Replicate the remaining forward_dispatch logic that we bypass: - # 1. Sync shared experts stream for multi-stream overlap + # Bypass _forward_impl entirely for dp_size==1 to eliminate + # graph breaks from _sequence_parallel_context() (which calls + # get_forward_context()), skip the no-op _maybe_dispatch(), and + # avoid double gate / stream-sync calls that _forward_impl + # would redundantly repeat. + if self.moe_config.pcp_size > 1: + raise RuntimeError("dp_size==1 fast path does not support pcp_size > 1") + layer = self._hpu_layer_ref + layer.ensure_moe_quant_config_init() self._maybe_sync_shared_experts_stream(shared_experts_input) - # 2. Apply gate if the runner owns it (internal router mode) - if self.gate is not None: - router_logits, _ = self.gate(hidden_states) - - result = self._forward_impl(self._hpu_layer_ref, hidden_states, router_logits, shared_experts_input, input_ids) + gate = self.gate or getattr(self, "_hpu_gate_ref", None) + if gate is not None: + router_logits, _ = gate(hidden_states) + shared_output, fused_hidden = self._apply_quant_method( + layer=layer, + hidden_states=hidden_states, + router_logits=router_logits, + shared_experts_input=shared_experts_input, + input_ids=input_ids, + ) + result = self._maybe_combine(shared_output, fused_hidden) else: result = self._forward_entry(hidden_states, router_logits, shared_experts_input, input_ids, self._encode_layer_name(), self._trtllm_mxfp4_unpadded_dim()) @@ -377,8 +386,7 @@ def create_fused_moe_router( # custom routing parameters custom_routing_function: Callable | None = None, # eplb parameters - enable_eplb: bool = False, - eplb_state: EplbLayerState = EMPTY_EPLB_STATE, + eplb_state: EplbLayerState | None = None, # zero expert parameters zero_expert_type: str | None = None, num_logical_experts: int | None = None, @@ -417,7 +425,6 @@ def create_fused_moe_router( custom_routing_function: Optional custom routing function EPLB arguments: - enable_eplb: Whether EPLB is enabled eplb_state: EPLB (Expert Parallelism Load Balancing) state Zero expert arguments: @@ -440,7 +447,6 @@ def create_fused_moe_router( top_k=top_k, global_num_experts=global_num_experts, eplb_state=eplb_state, - enable_eplb=enable_eplb, indices_type_getter=indices_type_getter, ) @@ -457,7 +463,6 @@ def create_fused_moe_router( scoring_func=scoring_func, renormalize=renormalize, routed_scaling_factor=routed_scaling_factor, - enable_eplb=enable_eplb, indices_type_getter=indices_type_getter, ) @@ -476,7 +481,6 @@ def create_fused_moe_router( routed_scaling_factor=routed_scaling_factor, e_score_correction_bias=e_score_correction_bias, num_fused_shared_experts=num_fused_shared_experts, - enable_eplb=enable_eplb, indices_type_getter=indices_type_getter, ) return grouped_topk_router @@ -488,7 +492,6 @@ def create_fused_moe_router( eplb_state=eplb_state, custom_routing_function=custom_routing_function, renormalize=renormalize, - enable_eplb=enable_eplb, indices_type_getter=indices_type_getter, ) @@ -503,7 +506,6 @@ def create_fused_moe_router( scoring_func=scoring_func, renormalize=renormalize, routed_scaling_factor=routed_scaling_factor, - enable_eplb=enable_eplb, indices_type_getter=indices_type_getter, hash_indices_table=hash_indices_table, ) @@ -514,7 +516,6 @@ def create_fused_moe_router( eplb_state=eplb_state, renormalize=renormalize, scoring_func=scoring_func, - enable_eplb=enable_eplb, indices_type_getter=indices_type_getter, ) @@ -546,15 +547,14 @@ def _patched_default_moe_runner_forward(self, *args, **kwargs): def _hpu_fused_moe_init(self, *args, **kwargs): _orig_fused_moe_init(self, *args, **kwargs) - if hasattr(self, 'runner'): - object.__setattr__(self.runner, '_hpu_layer_ref', self) + if hasattr(self, "runner"): + object.__setattr__(self.runner, "_hpu_layer_ref", self) + if self.runner.gate is not None: + object.__setattr__(self.runner, "_hpu_gate_ref", self.runner.gate) FusedMoE.__init__ = _hpu_fused_moe_init -vllm.model_executor.layers.fused_moe.layer.get_compressed_expert_map = \ - get_compressed_expert_map -vllm.model_executor.layers.fused_moe.router.router_factory.create_fused_moe_router = \ - create_fused_moe_router -vllm.model_executor.layers.fused_moe.layer.create_fused_moe_router = \ - create_fused_moe_router +vllm.model_executor.layers.fused_moe.layer.get_compressed_expert_map = get_compressed_expert_map +vllm.model_executor.layers.fused_moe.router.router_factory.create_fused_moe_router = create_fused_moe_router +vllm.model_executor.layers.fused_moe.layer.create_fused_moe_router = create_fused_moe_router diff --git a/vllm_gaudi/ops/hpu_rotary_embedding.py b/vllm_gaudi/ops/hpu_rotary_embedding.py index 38d02d23f9..cfc452927a 100644 --- a/vllm_gaudi/ops/hpu_rotary_embedding.py +++ b/vllm_gaudi/ops/hpu_rotary_embedding.py @@ -668,9 +668,14 @@ def forward_oot( cos, sin = cos_sin.chunk(2, dim=-1) if positions.ndim == 2: assert self.mrope_section - - cos = torch.cat([m[i] for i, m in enumerate(cos.split(self.mrope_section, dim=-1))], dim=-1) - sin = torch.cat([m[i] for i, m in enumerate(sin.split(self.mrope_section, dim=-1))], dim=-1) + if getattr(self, "mrope_interleaved", False): + from vllm.model_executor.layers.rotary_embedding.mrope import apply_interleaved_rope + + cos = apply_interleaved_rope(cos, self.mrope_section) + sin = apply_interleaved_rope(sin, self.mrope_section) + else: + cos = torch.cat([m[i] for i, m in enumerate(cos.split(self.mrope_section, dim=-1))], dim=-1) + sin = torch.cat([m[i] for i, m in enumerate(sin.split(self.mrope_section, dim=-1))], dim=-1) if self.is_neox_style: cos = torch.cat((cos, cos), dim=-1).unsqueeze(-2) sin = torch.cat((sin, sin), dim=-1).unsqueeze(-2) diff --git a/vllm_gaudi/patches.py b/vllm_gaudi/patches.py index 4a784dd26d..c9199e9332 100644 --- a/vllm_gaudi/patches.py +++ b/vllm_gaudi/patches.py @@ -17,11 +17,19 @@ requires the device's allocator to be a ``c10::DeviceAllocator``. We replace it with an HPU-safe variant that uses ``current_platform.empty_cache()`` instead (see GAUDISW-247825). + +* ``vllm.v1.sample.ops.logprobs.batched_count_greater_than`` — upstream + decorates this function with ``@torch.compile(dynamic=True, ...)``. + Habana's ``recipe_compiler`` backend cannot handle the symbolic shapes + produced by ``dynamic=True`` (and by ``mark_unbacked`` in the caller), + raising ``TypeError: Cannot convert symbols to int``. We replace it + with a plain (uncompiled) version of the same function. The replacement + is deferred to ``load_general_plugins`` time to avoid importing + ``vllm.v1.sample.sampler`` during early plugin registration, which would + trigger a heavy import chain that interferes with platform initialisation. """ -import functools import gc -from unittest.mock import MagicMock, patch import torch @@ -43,6 +51,69 @@ def _hpu_accelerator_empty_cache() -> None: empty_cache() +def _patch_hf3fs_mock_client_for_cpu_only() -> None: + """Patch HF3FS mock client to avoid CUDA stream waits on CPU-only builds. + + Upstream mock client unconditionally calls + ``torch.cuda.current_stream().wait_event(event)`` in ``batch_write``. + In environments where PyTorch is not compiled with CUDA, that path throws + and the method returns ``-1`` for writes, causing connector unit tests to + fail. This patch keeps the same behavior but skips CUDA synchronization when + CUDA is unavailable. + """ + try: + from vllm.distributed.kv_transfer.kv_connector.v1.hf3fs.utils import hf3fs_mock_client as _mock_mod + except Exception: + # Keep plugin load resilient if the module path changes or is missing. + return + + client_cls = getattr(_mock_mod, "Hf3fsClient", None) + if client_cls is None: + return + + original_batch_write = getattr(client_cls, "batch_write", None) + if original_batch_write is None: + return + + if getattr(original_batch_write, "_vllm_gaudi_cpu_safe_patch", False): + return + + def _batch_write_cpu_safe(self, offsets, tensors, event): + if torch.cuda.is_available(): + return original_batch_write(self, offsets, tensors, event) + + results = [] + try: + data_bytes_list = [self._tensor_to_bytes(tensor) for tensor in tensors] + + with open(self._file_path, "r+b") as f: + for offset, data_bytes in zip(offsets, data_bytes_list): + if offset < 0 or offset + len(data_bytes) > self._size: + results.append(-1) + continue + + f.seek(offset) + bytes_written = f.write(data_bytes) + + if bytes_written == len(data_bytes) == self._bytes_per_page: + results.append(self._bytes_per_page) + else: + _mock_mod.logger.error( + "Write size mismatch: wrote %d, expected %d", + bytes_written, + self._bytes_per_page, + ) + results.append(-1) + except Exception as e: + _mock_mod.logger.error("Batch write error: %s", e) + results.extend([-1] * (len(offsets) - len(results))) + + return results + + _batch_write_cpu_safe._vllm_gaudi_cpu_safe_patch = True # type: ignore[attr-defined] + client_cls.batch_write = _batch_write_cpu_safe + + def _hpu_cleanup_dist_env_and_memory(shutdown_ray: bool = False) -> None: """HPU-safe replacement for ``cleanup_dist_env_and_memory``. @@ -51,6 +122,9 @@ def _hpu_cleanup_dist_env_and_memory(shutdown_ray: bool = False) -> None: ``torch.accelerator.empty_cache()`` (which is incompatible with the HPU allocator). """ + # Re-apply lazy runtime patches that may depend on import timing. + _patch_hf3fs_mock_client_for_cpu_only() + # Reset environment variable cache envs.disable_envs_cache() # Ensure all objects are not frozen before cleanup @@ -74,6 +148,31 @@ def _hpu_cleanup_dist_env_and_memory(shutdown_ray: bool = False) -> None: parallel_state.logger.warning("torch._C._host_emptyCache() only available in Pytorch >=2.5") +def _hpu_batched_count_greater_than(x: torch.Tensor, values: torch.Tensor) -> torch.Tensor: + """HPU-safe replacement for ``batched_count_greater_than``. + + Identical logic to the upstream implementation but *not* wrapped in + ``torch.compile``. The upstream decorator uses ``dynamic=True`` whose + symbolic shapes are incompatible with Habana's ``recipe_compiler`` + backend, and ``mark_unbacked`` in the caller prevents ``dynamic=False`` + from helping. + """ + return (x >= values).sum(-1) + + +def _patch_batched_count_greater_than() -> None: + """Replace ``batched_count_greater_than`` in the sampler & logprobs modules. + + Called from the ``load_general_plugins`` hook so that the heavy + ``vllm.v1.sample.*`` import chain runs *after* platform initialisation. + """ + import vllm.v1.sample.ops.logprobs as _logprobs_mod + import vllm.v1.sample.sampler as _sampler_mod + + _logprobs_mod.batched_count_greater_than = _hpu_batched_count_greater_than + _sampler_mod.batched_count_greater_than = _hpu_batched_count_greater_than + + def apply() -> None: """Install all HPU runtime monkey-patches.""" # --- torch.accelerator.empty_cache --- @@ -83,14 +182,28 @@ def apply() -> None: if not hasattr(torch._C, "_host_emptyCache"): torch._C._host_emptyCache = lambda: None - # Patch the canonical definition. + # --- cleanup_dist_env_and_memory --- parallel_state.cleanup_dist_env_and_memory = _hpu_cleanup_dist_env_and_memory - # Patch the re-export from ``vllm.distributed`` so ``from vllm.distributed - # import cleanup_dist_env_and_memory`` (used by the upstream pytest - # conftest) also picks up the HPU-safe version. import vllm.distributed as _vllm_distributed _vllm_distributed.cleanup_dist_env_and_memory = _hpu_cleanup_dist_env_and_memory + _patch_hf3fs_mock_client_for_cpu_only() + + # --- batched_count_greater_than (deferred) --- + # We cannot import the sampler modules here because the import chain + # triggers platform re-initialisation ("Device string must not be + # empty"). Instead we hook into ``load_general_plugins`` which runs + # in every process (parent + EngineCore subprocess) after the platform + # is ready. + import vllm.plugins as _plugins_mod + + _original_load_general = _plugins_mod.load_general_plugins + + def _load_general_with_hpu_patches(): + _original_load_general() + _patch_batched_count_greater_than() + + _plugins_mod.load_general_plugins = _load_general_with_hpu_patches def patch_hf3fs_mock_client(): @@ -98,29 +211,12 @@ def patch_hf3fs_mock_client(): The upstream mock client's ``batch_write`` unconditionally calls ``torch.cuda.current_stream().wait_event(event)``, which raises - ``RuntimeError`` on platforms without CUDA (e.g. HPU). We wrap - ``batch_write`` to stub ``torch.cuda.current_stream`` with a no-op - mock for the duration of the call. + ``RuntimeError`` on platforms without CUDA (e.g. HPU). This helper + installs the CPU-safe replacement for ``batch_write``. Called from ``register_utils()`` (general plugin) rather than ``apply()`` (platform plugin) to avoid circular imports — the mock client transitively imports ``vllm.config`` which is not yet fully initialized during platform registration. """ - if torch.cuda.is_available(): - return - - try: - from vllm.distributed.kv_transfer.kv_connector.v1.hf3fs.utils import ( - hf3fs_mock_client, ) - except ImportError: - return - - _orig_batch_write = hf3fs_mock_client.Hf3fsClient.batch_write - - @functools.wraps(_orig_batch_write) - def _safe_batch_write(self, offsets, tensors, event): - with patch("torch.cuda.current_stream", return_value=MagicMock()): - return _orig_batch_write(self, offsets, tensors, event) - - hf3fs_mock_client.Hf3fsClient.batch_write = _safe_batch_write + _patch_hf3fs_mock_client_for_cpu_only() diff --git a/vllm_gaudi/utils.py b/vllm_gaudi/utils.py index ecceb0e4bf..5f7dbb8298 100644 --- a/vllm_gaudi/utils.py +++ b/vllm_gaudi/utils.py @@ -287,6 +287,24 @@ def get_compile_args(self) -> dict[str, Any]: _async_sched_module.AsyncScheduler = HPUAsyncScheduler # type: ignore[misc] +# Guard Prometheus counters against negative prompt-token counts that can arise +# when KV-cache blocks are invalidated during OOM and token-count bookkeeping +# becomes temporarily inconsistent. Prometheus counters require non-negative +# increments; clamping here prevents a crash in PrometheusStatLogger.record(). +# noqa: E402 cannot be hoisted to the module top: this monkey-patch must run +# AFTER the `_async_sched_module.AsyncScheduler = HPUAsyncScheduler` rebind +# above so vllm.v1.metrics.stats picks up the HPU scheduler symbol. +import vllm.v1.metrics.stats as _stats_module # noqa: E402 + +_stats_get_by_source_orig = _stats_module.PromptTokenStats.get_by_source + + +def _hpu_get_by_source(self, source: str) -> int: + return max(0, _stats_get_by_source_orig(self, source)) + + +_stats_module.PromptTokenStats.get_by_source = _hpu_get_by_source + def patch_nixl_utils_for_hpu(): """Patch vllm.distributed.nixl_utils to use nixl._api instead of rixl._api. diff --git a/vllm_gaudi/v1/attention/backends/hpu_attn.py b/vllm_gaudi/v1/attention/backends/hpu_attn.py index 1a36a7d56c..5f6f10acf7 100644 --- a/vllm_gaudi/v1/attention/backends/hpu_attn.py +++ b/vllm_gaudi/v1/attention/backends/hpu_attn.py @@ -35,10 +35,11 @@ def get_metadata_cls() -> type["AttentionMetadata"]: @staticmethod def get_supported_kernel_block_sizes() -> list[Union[int, MultipleOf]]: - # 128 is the standard HPU kernel block size; 528 is required for - # Granite 4.0-H (granitemoehybrid) without prefix caching (16-token - # FA alignment), 768 with prefix caching (chunk-aligned). - return [128, 528, 768] + # 16 is supported for testing/smaller models; 128 is the standard HPU + # kernel block size; 528 is required for Granite 4.0-H + # (granitemoehybrid) without prefix caching (16-token FA alignment), + # 768 with prefix caching (chunk-aligned). + return [16, 128, 528, 768] @classmethod def get_preferred_block_size(cls, default_block_size: int) -> int: diff --git a/vllm_gaudi/v1/core/sched/hpu_async_scheduler.py b/vllm_gaudi/v1/core/sched/hpu_async_scheduler.py index 9fd41b1130..19a8ddebb0 100644 --- a/vllm_gaudi/v1/core/sched/hpu_async_scheduler.py +++ b/vllm_gaudi/v1/core/sched/hpu_async_scheduler.py @@ -1,10 +1,118 @@ # SPDX-License-Identifier: Apache-2.0 +from collections.abc import Iterable + from vllm.v1.core.sched.async_scheduler import AsyncScheduler -from vllm.v1.request import Request +from vllm.v1.request import Request, RequestStatus class HPUAsyncScheduler(AsyncScheduler): + def schedule(self): + """HPU override: fix stale cached-token accounting after preemption. + + After preemption a request is requeued with num_computed_tokens reset. + On the next schedule() the OffloadingConnector may assign new external + cache hits, raising num_external_computed_tokens above the stale + num_cached_tokens (which upstream only refreshes when negative). After + super().schedule() has advanced num_computed_tokens for this step, we + post-process running requests to detect this staleness + (num_cached_tokens < num_external_computed_tokens) and resync + num_cached_tokens. + + NOTE: only requests that were actually scheduled this step land in + self.running here; a request requeued by the connector but not yet + re-scheduled stays in self.waiting and the inconsistency persists + until it is picked up. The Prometheus clamp in vllm_gaudi/utils.py + guards the metrics path during that window. + """ + output = super().schedule() + for request in self.running: + # vLLM Request no longer exposes num_cached_tokens on newer + # branches. Keep the old fix only when the field exists. + if (hasattr(request, "num_cached_tokens") + and request.num_cached_tokens < request.num_external_computed_tokens): + request.num_cached_tokens = request.num_computed_tokens + return output + + def _update_requests_with_invalid_blocks( + self, + requests: Iterable[Request], + invalid_block_ids: set[int], + num_scheduled_tokens: dict[str, int], + evict_blocks: bool = True, + ) -> tuple[set[str], int, set[int]]: + """HPU override: clamp num_external_computed_tokens to 0 instead of + allowing it to go negative when OOM-invalidated blocks span both + externally-computed and locally-computed token ranges. + + NOTE: This is a near-verbatim copy of the upstream + ``vllm.v1.core.sched.async_scheduler.AsyncScheduler + ._update_requests_with_invalid_blocks``. The only functional delta is + the ``max(0, ...)`` clamp on ``request.num_external_computed_tokens`` + below (search for "HPU delta"). Keep this method in sync with + upstream when that routine evolves (hybrid memory allocator support, + new connector types, etc.). An upstream issue tracking the negative + clamp should be filed against vllm-project/vllm. + """ + affected_req_ids: set[str] = set() + total_affected_tokens = 0 + blocks_to_evict: set[int] = set() + marked_invalid_block_ids: set[int] = set() + for request in requests: + is_affected = False + marked_invalid_block = False + req_id = request.request_id + # TODO (davidb): add support for hybrid memory allocator + (req_block_ids, ) = self.kv_cache_manager.get_block_ids(req_id) + if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: + req_num_computed_tokens = (request.num_computed_tokens - num_scheduled_tokens.get(req_id, 0) if req_id + in self.failed_recving_kv_req_ids else len(req_block_ids) * self.block_size) + else: + # vLLM removed Request.num_cached_tokens in newer branches. + # Fall back to upstream-equivalent computed-token accounting. + req_num_computed_tokens = (request.num_cached_tokens if hasattr(request, "num_cached_tokens") else + request.num_computed_tokens - num_scheduled_tokens.get(req_id, 0)) + + req_num_computed_blocks = (req_num_computed_tokens + self.block_size - 1) // self.block_size + for idx, block_id in zip(range(req_num_computed_blocks), req_block_ids): + if block_id not in invalid_block_ids: + continue + + is_affected = True + + if block_id in marked_invalid_block_ids: + continue + + marked_invalid_block_ids.add(block_id) + + if marked_invalid_block: + continue + + marked_invalid_block = True + request.num_computed_tokens = idx * self.block_size + num_affected_tokens = (req_num_computed_tokens - request.num_computed_tokens) + total_affected_tokens += num_affected_tokens + # HPU delta vs upstream: clamp to 0. num_affected_tokens may + # exceed the number of externally-computed tokens when + # OOM-invalidation spans locally-computed blocks too, which + # would otherwise drive num_external_computed_tokens negative. + if hasattr(request, "num_external_computed_tokens"): + request.num_external_computed_tokens = max( + 0, + request.num_external_computed_tokens - num_affected_tokens, + ) + if evict_blocks: + blocks_to_evict.update(req_block_ids[idx:]) + + if is_affected: + if not marked_invalid_block: + total_affected_tokens += (request.num_computed_tokens - req_num_computed_tokens) + request.num_computed_tokens = req_num_computed_tokens + + affected_req_ids.add(request.request_id) + + return affected_req_ids, total_affected_tokens, blocks_to_evict + def _mamba_block_aligned_split( self, request: Request, diff --git a/vllm_gaudi/v1/worker/hpu_input_batch.py b/vllm_gaudi/v1/worker/hpu_input_batch.py index bf3207cf4e..c2ac2f8f68 100644 --- a/vllm_gaudi/v1/worker/hpu_input_batch.py +++ b/vllm_gaudi/v1/worker/hpu_input_batch.py @@ -640,12 +640,14 @@ def make_selective_sampling_metadata( # The prompt tokens are used only for applying penalties during # the sampling process. Hence copy these tensors only when # there are requests which need penalties to be applied. - prompt_token_ids = self._make_prompt_token_ids_cpu_tensor()[req_indices] + prompt_token_ids = self._make_prompt_token_ids_cpu_tensor()[req_indices].to(device=self.device, + non_blocking=True) else: # Even with skip_copy=True, we need prompt_token_ids for penalties if not self.no_penalties: cached_tensor = self._get_cached_prompt_token_ids() - prompt_token_ids = cached_tensor[req_indices] if cached_tensor is not None else None + prompt_token_ids = cached_tensor[req_indices].to( + device=self.device, non_blocking=True) if cached_tensor is not None else None else: prompt_token_ids = None diff --git a/vllm_gaudi/v1/worker/hpu_model_runner.py b/vllm_gaudi/v1/worker/hpu_model_runner.py index e0478d3e2b..8da476c5af 100644 --- a/vllm_gaudi/v1/worker/hpu_model_runner.py +++ b/vllm_gaudi/v1/worker/hpu_model_runner.py @@ -1,5 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 import collections +from collections.abc import Mapping import copy import contextlib from copy import deepcopy @@ -136,6 +137,14 @@ except ImportError: LMCacheConnectorMetadata = None +_GDN_MAMBA_TYPES: tuple[object, ...] = ("gdn_attention", "linear_attention") +try: + from vllm.v1.attention.backends.registry import MambaAttentionBackendEnum + _GDN_MAMBA_TYPES = (MambaAttentionBackendEnum.GDN_ATTN, MambaAttentionBackendEnum.LINEAR, "gdn_attention", + "linear_attention") +except (ImportError, AttributeError): + pass + _TYPE_CACHE: dict[str, dict[str, Any]] = {} HPU_TORCH_DTYPE_TO_STR_DTYPE = { @@ -427,6 +436,34 @@ def gather_list(input, indices, v): return [input[i] if i is not None else v for i in indices] +def ensure_multi_token_decodes_last(b: InputBatch, scheduled_tokens: Mapping[str, int]) -> None: + """Within the decode region, sort single-token decodes before multi-token ones. + + When spec-decode is not configured, resumed/catch-up decode requests with + many scheduled tokens (e.g. from KV offloading requeue) must be processed + via the prefill path to avoid bucket overflow in get_habana_paged_attn_buffers. + Moving them to the end of the decode region lets _get_prompts_and_decodes + route them to the prefill batch where the large scheduled-token count is + handled correctly. + + After _ensure_decodes_first the layout is: [decodes ... | prompts ...] + This function produces: [1-tok decodes | multi-tok decodes | prompts] + """ + num_reqs = b.num_reqs + decode_end = num_reqs + for i in range(num_reqs): + if b.num_computed_tokens_cpu[i] < b.num_prompt_tokens[i]: + decode_end = i + break + write_pos = 0 + for read_pos in range(decode_end): + req_id = b.req_ids[read_pos] + if scheduled_tokens.get(req_id, 1) == 1: + if read_pos != write_pos: + b.swap_states(write_pos, read_pos) + write_pos += 1 + + def get_target_layer_suffix_list(model_type) -> list[str]: # This sets the suffix for the hidden layer name, which is controlled by # VLLM_CONFIG_HIDDEN_LAYERS. The default suffix is "DecoderLayer," which is @@ -506,7 +543,8 @@ def maybe_set_mamba_kv_cache_groups_ids(model, kv_cache_config: KVCacheConfig): model = model.model mamba_like_arch = [ - "GraniteMoeHybridForCausalLM", "Qwen3_5MoeForConditionalGeneration", "Qwen3_5ForConditionalGeneration" + "GraniteMoeHybridForCausalLM", "Qwen3_5MoeForConditionalGeneration", "Qwen3_5ForConditionalGeneration", + "Qwen3NextForCausalLM" ] if not any(arch in getattr(model.config, 'architectures', []) for arch in mamba_like_arch): return @@ -1099,12 +1137,6 @@ def __init__( and (getattr(hf_text_config, "mamba_chunk_size", None) is not None or getattr(hf_text_config, "chunk_size", None) is not None)) - # Non-GDN hybrid: at least one mamba/linear-style layer and zero GDN - # (gdn_attention / linear_attention) layers. Used to gate optimizations - # that have only been validated on non-GDN hybrid topologies - # (e.g. Granite-4 Mamba2+Transformer). - self.is_non_gdn_hybrid = (self.num_mamba_like_layers > 0 and self.num_gdn == 0) - # For HPU GDN, use configured chunk size when explicitly provided; # otherwise default to 128 to match bucket alignment. if self.num_mamba_like_layers > 0: @@ -2037,6 +2069,17 @@ def _get_prompts_and_decodes( if self._is_prompt(i, scheduler_output): break + # When spec-decode is not configured, a decode request with more + # than 1 scheduled token is a resumed/catch-up request that must + # be processed via the prefill (prompt) path instead. After + # ensure_multi_token_decodes_last these requests are sorted to the + # end of the decode region so the break here is correct. + num_scheduled_tokens = scheduler_output.num_scheduled_tokens[req_id] + if num_scheduled_tokens > 1 and \ + not self.vllm_config.speculative_config: + break + + # This is decode # NOTE(chendi): To support spec decode, # we don't assume num_scheduled_tokens == 1. decode_req_ids.append(req_id) @@ -2053,11 +2096,20 @@ def _get_prompts_and_decodes( req_id = self.input_batch.req_ids[i] assert req_id is not None - # Must be prompt - assert self._is_prompt(i, scheduler_output) + num_computed_tokens = self.input_batch.num_computed_tokens_cpu[i] + num_prompt_tokens = self.input_batch.num_prompt_tokens[i] + num_scheduled_tokens = scheduler_output.num_scheduled_tokens[req_id] + + # Prompt traversal must follow the exact same predicate used above + # to partition decode vs prompt requests, including preempted + # prompt / catch-up cases handled by `_is_prompt()`. + assert self._is_prompt(i, scheduler_output), (f"Unexpected at prompt-traversal req_id={req_id} idx={i}: " + f"computed={num_computed_tokens}, " + f"prompt={num_prompt_tokens}, " + f"scheduled={num_scheduled_tokens}") + # NOTE(kzawora): In preempted sequences, num_output_tokens can be > 0, and still be a valid prefill prompt_req_ids.append(req_id) - num_scheduled_tokens = scheduler_output.num_scheduled_tokens[req_id] prompt_scheduled_tokens.append(int(num_scheduled_tokens)) return PromptDecodeInfo(prompt_req_ids, decode_req_ids, prompt_scheduled_tokens) @@ -2956,11 +3008,16 @@ def _create_decode_input_data(self, token_ids_device[:num_decodes] = self.input_ids_hpu[:num_decodes].view(-1, 1) else: token_ids_split_tensors = torch.split(self.input_ids_hpu[:total_num_scheduled_tokens], - num_tokens_per_req) - token_ids_device[:num_decodes] = \ + num_tokens_per_req[:num_decodes]) + # token_ids was already reshaped to [padded_batch*num_tokens, 1] + # (via view(-1,1) in the CPU prepare path above) before the + # async_h2d_copy, so token_ids_device has the same flat shape. + # Index [:num_decodes*num_tokens] to write all rows for the + # decode region (not just the first num_decodes rows). + token_ids_device[:num_decodes * num_tokens] = \ pad_sequence(list(token_ids_split_tensors), batch_first=True, - padding_value=0)[:num_decodes] + padding_value=0)[:num_decodes].view(-1, 1) ##################################### # NOTE(Chendi): Since we can't actually do num_tokens = 2, @@ -3834,21 +3891,6 @@ def set_attn_bias(self, attn_metadata, batch_size, seq_len, device, dtype): or not attn_metadata.is_prompt): return attn_metadata - # Extended FSDPA-native causal short-circuit for non-GDN hybrid models - # (e.g. Granite-4 Mamba2+Transformer). FusedSDPA can encode a purely - # causal mask natively via is_causal=True + valid_seq_lengths, including - # chunked prefill where block_list is non-None. Skipping the - # materialised [bs, 1, q_len, total_kv_len] attn_bias avoids a large - # add_bf16 on the attention critical path (significant at long - # context). Conservative scope: only non-GDN hybrid models; GDN / - # pure-transformer / other topologies keep the materialised bias path - # until validated. - if (self.prefill_use_fusedsdpa and self.is_causal and not self.is_pooling_model - and not getattr(self, 'sliding_window', None) - and not getattr(self, 'model_has_chunked_attention', False) - and getattr(self, 'alibi_slopes', None) is None and self.is_non_gdn_hybrid): - return attn_metadata - if attn_metadata.attn_bias is not None: return attn_metadata @@ -3985,8 +4027,17 @@ def sample_tokens(self, grammar_output: "GrammarOutput | None") -> ModelRunnerOu # Return [tokD0, tokD1, tokD2, tokP0, tokP1, tokP2] batch_changed = self.batch_changed - # If necessary, swap decodes/prompts to have all decodes on the start + # If necessary, swap decodes/prompts to have all decodes on the start. + # Use the method form (not a module-level helper) so that the + # partitioning predicate matches `_is_prompt()` exactly, including + # preempted-prompt, decoder-only, and spec-decode cases. self._ensure_decodes_first(scheduler_output) + # When spec-decode is not configured, sort multi-token catch-up + # decode requests to the end of the decode region so that + # _get_prompts_and_decodes routes them through the prefill path, + # preventing bucket overflow and Habana workspace OOM. + if not self.vllm_config.speculative_config: + ensure_multi_token_decodes_last(self.input_batch, scheduler_output.num_scheduled_tokens) # Prepare prompts/decodes info pd_info = self._get_prompts_and_decodes(scheduler_output) num_decodes = len(pd_info.decode_req_ids) @@ -4667,10 +4718,29 @@ def _remove_duplicate_submodules(self): if mlp is not None: block_gate = getattr(mlp, 'gate', None) or getattr(mlp, 'router', None) experts = getattr(mlp, 'experts', None) - if (block_gate is not None and experts is not None - and getattr(experts, '_gate', None) is block_gate): - experts._gate = None - self._detached_moe_gates.add(id(experts)) + if block_gate is not None and experts is not None: + if getattr(experts, '_gate', None) is block_gate: + experts._gate = None + self._detached_moe_gates.add(id(experts)) + # With upstream vLLM PR #35178 MoERunner is an + # nn.Module, so `self.runner.gate = gate` in + # FusedMoE.__init__ registers the shared gate + # as a child of runner. INC's + # generate_model_info() walks named_children() + # and the last-seen parent wins, so INC patches + # runner._modules['gate'] and leaves + # mlp._modules['gate'] pointing at a stale + # module whose weight Parameter has been + # mutated in-place to fp8. Unregister the gate + # from runner._modules (but keep runner.gate + # as a plain attribute so is_internal_router() + # and the runner's internal forward path keep + # working) so INC sees mlp as the sole parent. + runner = getattr(experts, 'runner', None) + if (runner is not None and isinstance(runner, torch.nn.Module) + and runner._modules.get('gate', None) is block_gate): + del runner._modules['gate'] + object.__setattr__(runner, 'gate', block_gate) def _sync_shared_moe_gates(self): """Apply SharedFusedMoE post-INC synchronization and compatibility. @@ -4724,6 +4794,15 @@ def _sync_moe_kernel_flags(module: torch.nn.Module): runner = getattr(experts, "runner", None) if runner is not None and hasattr(runner, "gate"): runner.gate = None + # Refresh the cached gate ref captured at + # FusedMoE.__init__ to the post-INC block-level gate. + # The dp_size==1 fast path (patched_fused_moe_forward) + # falls back to runner._hpu_gate_ref when runner.gate + # is None; the pre-INC reference points at the now- + # replaced module and produced shape/dtype mismatches + # under fp8. + if block_gate is not None: + object.__setattr__(runner, "_hpu_gate_ref", block_gate) if id(experts) in self._detached_moe_gates: self._detached_moe_gates.remove(id(experts)) @@ -5107,8 +5186,12 @@ def _add_dummy_request(self, num_scheduled_tokens[req_id] = scheduled_tokens def _generate_seq_lengths(self, num_samples, num_blocks, block_size): - # ensure the actual number of blocks is less than the KV cache blocks - num_blocks = min(self.kv_cache_config.num_blocks, num_blocks) + # For contiguous PA, cap num_blocks to physical KV cache size because + # block_id = num_blocks - 1 must be a valid physical block. + # For non-contiguous PA, block_id=0 is always valid and runtime can + # exceed physical blocks via prefix-sharing, so don't cap. + if self.use_contiguous_pa: + num_blocks = min(self.kv_cache_config.num_blocks, num_blocks) assert num_samples <= num_blocks blocks = [num_blocks // num_samples] * num_samples @@ -5198,11 +5281,19 @@ def _prepare_dummy_scenario(self, prompt_cfg, decode_cfg): is_prompt=True) if decode_cfg: decode_bs, decode_query_len, decode_num_blocks = decode_cfg + # Use attn_block_size (the actual kernel block granularity used in + # _create_decode_input_data) rather than block_size (the KV-manager + # page size). For hybrid models these differ after + # initialize_kv_cache aligns attn page size to mamba page size, + # causing warmup to record wrong num_blocks otherwise. + decode_block_size = self.attn_block_size if self.use_contiguous_pa: - decode_seq_lengths = [self.block_size] * decode_bs - block_id = decode_num_blocks - 1 + decode_seq_lengths = [decode_block_size] * decode_bs + # Cap block_id at physical pool — contiguous PA uses + # block_id as the allocation base which must be valid. + block_id = min(decode_num_blocks - 1, self.kv_cache_config.num_blocks - 1) else: - decode_seq_lengths = self._generate_seq_lengths(decode_bs, decode_num_blocks, self.block_size) + decode_seq_lengths = self._generate_seq_lengths(decode_bs, decode_num_blocks, decode_block_size) block_id = 0 for dsl in decode_seq_lengths: self._add_dummy_request(requests, @@ -5469,7 +5560,19 @@ def warmup_model(self) -> None: self.bucketing_manager.generate_prompt_buckets() if not self.is_pooling_model: - self.bucketing_manager.generate_decode_buckets() + # For hybrid models where HPU kernel block size (attn_block_size) + # differs from KV-cache block_size, decode buckets must be + # generated in attn_block_size units because the runtime decode + # path (_create_decode_input_data) computes num_blocks using + # attn_block_size. Scope the mutation to avoid affecting prompt + # fallback paths that still need the original block_size. + saved_block_size = self.bucketing_manager.block_size + if self.attn_block_size != self.block_size: + self.bucketing_manager.block_size = self.attn_block_size + try: + self.bucketing_manager.generate_decode_buckets() + finally: + self.bucketing_manager.block_size = saved_block_size else: self.bucketing_manager.decode_buckets = [] @@ -5811,10 +5914,19 @@ def initialize_kv_cache(self, kv_cache_config: KVCacheConfig) -> None: self.is_encoder_only_attn = False self.may_add_encoder_only_layers_to_kv_cache_config() if self.num_mamba_like_layers > 0: - # Reassign block size for hybrid models after platform.py alignments - self.block_size = self.vllm_config.cache_config.block_size - if self.enable_bucketing: - self.bucketing_manager.block_size = self.block_size + # NOTE: Do NOT reassign self.block_size or + # bucketing_manager.block_size from cache_config here. + # For hybrid models the upstream HybridAttentionMambaModelConfig + # inflates cache_config.block_size to align mamba pages (e.g. + # 1152 for Qwen3.5), but the HPU attention kernel operates at + # 128-token granularity. _create_decode_input_data computes + # num_blocks using self.attn_block_size (set below from + # prepare_kernel_block_sizes), so the bucketing manager must + # also use that same granularity. Overwriting block_size with + # the inflated KV-manager page size caused decode buckets to be + # generated at 1152-token granularity while runtime used + # 128-token granularity, leading to permanent "not warmed-up" + # warnings and recompilations. maybe_set_mamba_kv_cache_groups_ids(self.model, self.kv_cache_config) self.initialize_attn_backend(kv_cache_config) @@ -5861,8 +5973,7 @@ def initialize_kv_cache(self, kv_cache_config: KVCacheConfig) -> None: if self.num_mamba_like_layers > 0 and self._compact_gdn_enabled: self._num_gdn_groups = sum( 1 for g in kv_cache_config.kv_cache_groups - if isinstance(g.kv_cache_spec, MambaSpec) and g.kv_cache_spec.mamba_type in ("gdn_attention", - "linear_attention")) + if isinstance(g.kv_cache_spec, MambaSpec) and g.kv_cache_spec.mamba_type in _GDN_MAMBA_TYPES) # Profiling may request more sequences than max_num_seqs # (e.g. VLLM_PROFILE_DECODE=16,64 with max_num_seqs=1). # Ensure GDN compact tensors and free-list are large enough. @@ -5897,7 +6008,7 @@ def _needs_raw_buffer(kv_cache_tensor) -> bool: if isinstance(spec, FullAttentionSpec): continue if isinstance(spec, MambaSpec) and \ - spec.mamba_type in ("gdn_attention", "linear_attention"): + spec.mamba_type in _GDN_MAMBA_TYPES: continue # Standard Mamba2 or unknown spec — needs raw buffer return True @@ -5945,7 +6056,7 @@ def _needs_raw_buffer(kv_cache_tensor) -> bool: vc = torch.zeros(kv_cache_shape, dtype=kv_cache_spec.dtype, device=self.device) kv_caches[layer_name] = (kc, vc, None, None) elif isinstance(kv_cache_spec, MambaSpec) and \ - kv_cache_spec.mamba_type in ("gdn_attention", "linear_attention") and \ + kv_cache_spec.mamba_type in _GDN_MAMBA_TYPES and \ self._compact_gdn_enabled: # GDN/linear_attention: compact allocation. # All GDN groups share the same state tensor, so each @@ -5972,7 +6083,7 @@ def _needs_raw_buffer(kv_cache_tensor) -> bool: kv_caches[shared_layer] = tuple(state_tensors) break elif isinstance(kv_cache_spec, MambaSpec) and \ - kv_cache_spec.mamba_type in ("gdn_attention", "linear_attention"): + kv_cache_spec.mamba_type in _GDN_MAMBA_TYPES: # GDN/linear_attention: non-compact (baseline) allocation # using contiguous tensors with num_blocks+1 slots. if isinstance(kv_caches.get(layer_name), tuple): @@ -6037,7 +6148,7 @@ def _needs_raw_buffer(kv_cache_tensor) -> bool: vc = torch.zeros(kv_cache_shape, dtype=kv_cache_spec.dtype, device=self.device) kv_caches[layer_name] = (kc, vc, None, None) elif isinstance(kv_cache_spec, MambaSpec) and \ - kv_cache_spec.mamba_type in ("gdn_attention", "linear_attention") and \ + kv_cache_spec.mamba_type in _GDN_MAMBA_TYPES and \ self._compact_gdn_enabled: # GDN/linear_attention: compact allocation. self._compact_gdn_group_ids.add(group_idx) @@ -6057,7 +6168,7 @@ def _needs_raw_buffer(kv_cache_tensor) -> bool: kv_caches[shared_layer] = tuple(state_tensors) break elif isinstance(kv_cache_spec, MambaSpec) and \ - kv_cache_spec.mamba_type in ("gdn_attention", "linear_attention"): + kv_cache_spec.mamba_type in _GDN_MAMBA_TYPES: # GDN/linear_attention: non-compact (baseline) allocation. if isinstance(kv_caches.get(layer_name), tuple): continue @@ -6664,17 +6775,6 @@ def __init__( self.interleaved_sliding_window = (is_interleaved(vllm_config.model_config.hf_text_config) and self.sliding_window) - # Detect non-GDN hybrid topologies (e.g. Granite-4 Mamba2+Transformer). - # Used to gate the FSDPA-native causal short-circuit in _set_attn_bias. - # Mirrors the runner's num_mamba_like_layers / num_gdn computation - # (HPUModelRunner.__init__) so the same set of models is targeted. - get_num_layers = vllm_config.model_config.get_num_layers_by_block_type - parallel_config = vllm_config.parallel_config - num_mamba_like = sum( - get_num_layers(parallel_config, bt) for bt in ("mamba", "gdn_attention", "linear_attention")) - num_gdn = sum(get_num_layers(parallel_config, bt) for bt in ("gdn_attention", "linear_attention")) - self.is_non_gdn_hybrid = (num_mamba_like > 0 and num_gdn == 0) - if self.interleaved_sliding_window: self.use_window_sdpa = with_default(get_config().PT_HPU_SDPA_QKV_SLICE_MODE_FWD, False) #os.getenv("PT_HPU_SDPA_QKV_SLICE_MODE_FWD", "false").strip().lower() in ("1", "true") @@ -6707,20 +6807,6 @@ def _set_attn_bias(self, attn_metadata: HPUAttentionMetadataV1, batch_size: int, or not attn_metadata.is_prompt): return attn_metadata - # Extended FSDPA-native causal short-circuit for non-GDN hybrid models - # (e.g. Granite-4 Mamba2+Transformer). FusedSDPA handles a purely - # causal mask natively (is_causal=True + valid_seq_lengths). Skip - # materialising a [bs, 1, q_len, total_kv_len] attn_bias even during - # chunked prefill (block_list is non-None) for these topologies; this - # removes a sizable add_bf16 from the attention critical path during - # long-context chunked prefill. interleaved_sliding_window and - # chunked-attention bias paths (window_attn_bias / chunked_attn_bias) - # are populated later in process_metadata and used by hpu_attn - # instead. Conservative scope: only non-GDN hybrid models; all other - # topologies retain the original behaviour. - if (self.prefill_use_fusedsdpa and not self.interleaved_sliding_window and self.is_non_gdn_hybrid): - return attn_metadata - if attn_metadata.attn_bias is not None: return attn_metadata diff --git a/vllm_gaudi/v1/worker/hpu_worker.py b/vllm_gaudi/v1/worker/hpu_worker.py index 89ec62e9d7..b9967a8280 100644 --- a/vllm_gaudi/v1/worker/hpu_worker.py +++ b/vllm_gaudi/v1/worker/hpu_worker.py @@ -31,7 +31,7 @@ from vllm.v1.outputs import (DraftTokenIds, AsyncModelRunnerOutput, ModelRunnerOutput) from vllm.v1.worker.utils import bind_kv_cache from vllm_gaudi.utils import is_fake_hpu -from vllm_gaudi.v1.worker.hpu_model_runner import HPUModelRunner +from vllm_gaudi.v1.worker.hpu_model_runner import HPUModelRunner, _GDN_MAMBA_TYPES from vllm.v1.worker.worker_base import CompilationTimes, WorkerBase from vllm_gaudi.extension.logger import logger as init_logger @@ -441,12 +441,9 @@ def determine_available_memory(self) -> int: # Reduce reported memory so the scheduler computes fewer # num_blocks that fit the HPU separate-allocation model. has_attn = any(isinstance(s, FullAttentionSpec) for s in kv_cache_spec.values()) - has_gdn = any( - isinstance(s, MambaSpec) and s.mamba_type in ("gdn_attention", "linear_attention") - for s in kv_cache_spec.values()) + has_gdn = any(isinstance(s, MambaSpec) and s.mamba_type in _GDN_MAMBA_TYPES for s in kv_cache_spec.values()) has_standard_mamba = any( - isinstance(s, MambaSpec) and s.mamba_type not in ("gdn_attention", "linear_attention") - for s in kv_cache_spec.values()) + isinstance(s, MambaSpec) and s.mamba_type not in _GDN_MAMBA_TYPES for s in kv_cache_spec.values()) compact_gdn = os.environ.get("VLLM_COMPACT_GDN", "0").strip().lower() in ("1", "true") if has_attn and has_gdn and not compact_gdn: # When compact GDN is OFF, GDN state scales with num_blocks @@ -462,8 +459,7 @@ def determine_available_memory(self) -> int: real_attn = next(s.real_page_size_bytes for s in kv_cache_spec.values() if isinstance(s, FullAttentionSpec)) real_mamba = next( sum(math.prod(sh) * get_dtype_size(dt) for sh, dt in zip(s.shapes, s.dtypes)) - for s in kv_cache_spec.values() - if isinstance(s, MambaSpec) and s.mamba_type in ("gdn_attention", "linear_attention")) + for s in kv_cache_spec.values() if isinstance(s, MambaSpec) and s.mamba_type in _GDN_MAMBA_TYPES) total_real = real_attn + real_mamba if total_real > padded_page: factor = padded_page / total_real @@ -484,8 +480,7 @@ def determine_available_memory(self) -> int: attn_page_size = next(s.page_size_bytes for s in kv_cache_spec.values() if isinstance(s, FullAttentionSpec)) mamba_state_per_block = next( sum(math.prod(sh) * get_dtype_size(dt) for sh, dt in zip(s.shapes, s.dtypes)) - for s in kv_cache_spec.values() - if isinstance(s, MambaSpec) and s.mamba_type not in ("gdn_attention", "linear_attention")) + for s in kv_cache_spec.values() if isinstance(s, MambaSpec) and s.mamba_type not in _GDN_MAMBA_TYPES) if attn_page_size > 0: ratio = attn_page_size / (attn_page_size + mamba_state_per_block) adjusted = int(available * ratio)