diff --git a/.github/workflows/_e2e_test.yaml b/.github/workflows/_e2e_test.yaml index 94db828f482..3e84fcc6c1c 100644 --- a/.github/workflows/_e2e_test.yaml +++ b/.github/workflows/_e2e_test.yaml @@ -91,8 +91,10 @@ jobs: pytest -sv tests/e2e/singlecard/test_completion_with_prompt_embeds.py pytest -sv tests/e2e/singlecard/test_aclgraph.py pytest -sv tests/e2e/singlecard/test_aclgraph_mem.py + pytest -sv tests/e2e/singlecard/test_ascend_scheduler.py pytest -sv tests/e2e/singlecard/test_bge_model.py pytest -sv tests/e2e/singlecard/test_camem.py + pytest -sv tests/e2e/singlecard/test_chunked.py pytest -sv tests/e2e/singlecard/test_embedding.py # pytest -sv tests/e2e/singlecard/test_embedding_aclgraph.py pytest -sv tests/e2e/singlecard/test_guided_decoding.py diff --git a/docs/source/tutorials/DeepSeek-V3.2-Exp.md b/docs/source/tutorials/DeepSeek-V3.2-Exp.md index 73bc3dc9510..f00f8b40a65 100644 --- a/docs/source/tutorials/DeepSeek-V3.2-Exp.md +++ b/docs/source/tutorials/DeepSeek-V3.2-Exp.md @@ -108,7 +108,7 @@ vllm serve vllm-ascend/DeepSeek-V3.2-Exp-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` ### Multi-node Deployment @@ -160,7 +160,7 @@ vllm serve /root/.cache/Modelers_Park/DeepSeek-V3.2-Exp \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.9 \ ---additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` **Node 1** @@ -204,7 +204,7 @@ vllm serve /root/.cache/Modelers_Park/DeepSeek-V3.2-Exp \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` :::: @@ -252,7 +252,7 @@ vllm serve vllm-ascend/DeepSeek-V3.2-Exp-W8A8 \ --quantization ascend \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.9 \ ---additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` **Node 1** @@ -299,7 +299,7 @@ vllm serve vllm-ascend/DeepSeek-V3.2-Exp-W8A8 \ --quantization ascend \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` :::: diff --git a/docs/source/tutorials/multi_node.md b/docs/source/tutorials/multi_node.md index d04fa0900f9..68c7056bf67 100644 --- a/docs/source/tutorials/multi_node.md +++ b/docs/source/tutorials/multi_node.md @@ -137,7 +137,7 @@ vllm serve vllm-ascend/DeepSeek-V3.1-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.9 \ ---additional-config '{"torchair_graph_config":{"enabled":true}}' +--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true}}' ``` **Node 1** @@ -182,7 +182,7 @@ vllm serve vllm-ascend/DeepSeek-V3.1-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"torchair_graph_config":{"enabled":true}}' +--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true}}' ``` The deployment view looks like: diff --git a/docs/source/tutorials/multi_node_kimi.md b/docs/source/tutorials/multi_node_kimi.md index 84840cdff53..cb28bca95ee 100644 --- a/docs/source/tutorials/multi_node_kimi.md +++ b/docs/source/tutorials/multi_node_kimi.md @@ -93,7 +93,7 @@ vllm serve /home/cache/weights/Kimi-K2-Instruct-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.9 \ ---additional-config '{"torchair_graph_config":{"enabled":true}}' +--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true}}' ``` **Node 1** @@ -137,7 +137,7 @@ vllm serve /home/cache/weights/Kimi-K2-Instruct-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"torchair_graph_config":{"enabled":true}}' +--additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true}}' ``` The deployment view looks like: diff --git a/docs/source/tutorials/multi_npu_moge.md b/docs/source/tutorials/multi_npu_moge.md index 8a2cf0078e3..e426c0f3378 100644 --- a/docs/source/tutorials/multi_npu_moge.md +++ b/docs/source/tutorials/multi_npu_moge.md @@ -158,6 +158,11 @@ if __name__ == "__main__": 'torchair_graph_config': { 'enabled': True, }, + 'ascend_scheduler_config':{ + 'enabled': True, + 'enable_chunked_prefill' : False, + 'chunked_prefill_enabled': False + }, }) outputs = llm.generate(prompts, sampling_params) diff --git a/docs/source/user_guide/configuration/additional_config.md b/docs/source/user_guide/configuration/additional_config.md index a77d0d53d9b..448f2ec4872 100644 --- a/docs/source/user_guide/configuration/additional_config.md +++ b/docs/source/user_guide/configuration/additional_config.md @@ -27,6 +27,7 @@ The following table lists additional configuration options available in vLLM Asc | Name | Type | Default | Description | |-------------------------------------|------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------| | `torchair_graph_config` | dict | `{}` | Configuration options for torchair graph mode | +| `ascend_scheduler_config` | dict | `{}` | Configuration options for ascend scheduler | | `weight_prefetch_config` | dict | `{}` | Configuration options for weight prefetch | | `refresh` | bool | `false` | Whether to refresh global Ascend configuration content. This is usually used by rlhf or ut/e2e test case. | | `expert_map_path` | str | `None` | When using expert load balancing for an MoE model, an expert map path needs to be passed in. | @@ -60,6 +61,18 @@ The details of each configuration option are as follows: | `enable_kv_nz`| bool | `False` | Whether to enable KV Cache NZ layout. This option only takes effect on models using MLA (for example, DeepSeek). | | `enable_super_kernel` | bool | `False` | Whether to enable super kernel to fuse operators in deepseek moe layers. This option only takes effects on moe models using dynamic w8a8 quantization.| +**ascend_scheduler_config** + +| Name | Type | Default | Description | +| ---- | ---- | ------- | ----------- | +| `enabled` | bool | `False` | Whether to enable ascend scheduler for V1 engine.| +| `enable_pd_transfer` | bool | `False` | Whether to enable P-D transfer. When it is enabled, decode is started only when prefill of all requests is done. This option only takes effect on offline inference. | +| `decode_max_num_seqs` | int | `0` | Whether to change max_num_seqs of decode phase when P-D transfer is enabled. This option only takes effect when enable_pd_transfer is True. | +| `max_long_partial_prefills` | Union[int, float] | `float('inf')` | The maximum number of prompts longer than long_prefill_token_threshold that will be prefilled concurrently. | +| `long_prefill_token_threshold` | Union[int, float] | `float('inf')` | a request is considered long if the prompt is longer than this number of tokens. | + +ascend_scheduler_config also supports the options from [vllm scheduler config](https://docs.vllm.ai/en/stable/api/vllm/config.html#vllm.config.SchedulerConfig). For example, you can add `enable_chunked_prefill: True` to ascend_scheduler_config as well. + **weight_prefetch_config** | Name | Type | Default | Description | @@ -80,6 +93,12 @@ An example of additional configuration is as follows: "graph_batch_sizes_init": False, "enable_kv_nz": False }, + "ascend_scheduler_config": { + "enabled": True, + "enable_chunked_prefill": True, + "max_long_partial_prefills": 1, + "long_prefill_token_threshold": 4096, + }, "weight_prefetch_config": { "enabled": True, "prefetch_ratio": { diff --git a/docs/source/user_guide/feature_guide/graph_mode.md b/docs/source/user_guide/feature_guide/graph_mode.md index 9afa1d52632..432362899c1 100644 --- a/docs/source/user_guide/feature_guide/graph_mode.md +++ b/docs/source/user_guide/feature_guide/graph_mode.md @@ -45,14 +45,14 @@ import os from vllm import LLM # TorchAirGraph only works without chunked-prefill now -model = LLM(model="path/to/DeepSeek-R1-0528", additional_config={"torchair_graph_config": {"enabled": True}}) +model = LLM(model="path/to/DeepSeek-R1-0528", additional_config={"torchair_graph_config": {"enabled": True},"ascend_scheduler_config": {"enabled": True}}) outputs = model.generate("Hello, how are you?") ``` Online example: ```shell -vllm serve path/to/DeepSeek-R1-0528 --additional-config='{"torchair_graph_config": {"enabled": true}}' +vllm serve path/to/DeepSeek-R1-0528 --additional-config='{"torchair_graph_config": {"enabled": true},"ascend_scheduler_config": {"enabled": true}}' ``` You can find more details about additional configuration [here](../configuration/additional_config.md). diff --git a/examples/offline_inference_npu_long_seq.py b/examples/offline_inference_npu_long_seq.py index 7e3afa01bfe..2ed96f63e9d 100644 --- a/examples/offline_inference_npu_long_seq.py +++ b/examples/offline_inference_npu_long_seq.py @@ -42,6 +42,7 @@ enable_chunked_prefill=False, max_num_batched_tokens=2048, max_model_len=1024, + additional_config={"ascend_scheduler_config": {"enabled": False}}, max_num_seqs=1, block_size=128, gpu_memory_utilization=0.9 diff --git a/examples/run_dp_server.sh b/examples/run_dp_server.sh index ec0cb686b77..9b9868c4f0d 100644 --- a/examples/run_dp_server.sh +++ b/examples/run_dp_server.sh @@ -28,4 +28,4 @@ vllm serve Qwen/Qwen1.5-MoE-A2.7B \ --gpu-memory-utilization 0.9 \ --trust-remote-code \ --enforce-eager \ - --additional-config '{"torchair_graph_config":{"enabled":false, "use_cached_graph":false}}' + --additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":false, "use_cached_graph":false}}' diff --git a/tests/e2e/310p/test_offline_inference_parallel_310p.py b/tests/e2e/310p/test_offline_inference_parallel_310p.py index bb973973b96..6bf335686d1 100644 --- a/tests/e2e/310p/test_offline_inference_parallel_310p.py +++ b/tests/e2e/310p/test_offline_inference_parallel_310p.py @@ -24,12 +24,15 @@ MODELS = [ "IntervitensInc/pangu-pro-moe-model", ] -# set additional config for torchair graph +# set additional config for ascend scheduler and torchair graph ADDITIONAL_CONFIG = [{ "additional_config": { "torchair_graph_config": { "enabled": True }, + "ascend_scheduler_config": { + "enabled": True, + } } }] diff --git a/tests/e2e/multicard/test_expert_parallel.py b/tests/e2e/multicard/test_expert_parallel.py index b8f03d5f905..f1076013967 100644 --- a/tests/e2e/multicard/test_expert_parallel.py +++ b/tests/e2e/multicard/test_expert_parallel.py @@ -15,14 +15,23 @@ def test_e2e_ep_correctness(model_name): max_tokens = 5 # FIXME: Really strange that chunked prefill might lead to different results, investigate further - with VllmRunner(model_name, tensor_parallel_size=2, - enforce_eager=False) as vllm_model: + with VllmRunner( + model_name, + tensor_parallel_size=2, + additional_config={"ascend_scheduler_config": { + "enabled": True + }}, + enforce_eager=False) as vllm_model: tp_output = vllm_model.generate_greedy(example_prompts, max_tokens) - with VllmRunner(model_name, - tensor_parallel_size=2, - enable_expert_parallel=True, - enforce_eager=False) as vllm_model: + with VllmRunner( + model_name, + tensor_parallel_size=2, + enable_expert_parallel=True, + additional_config={"ascend_scheduler_config": { + "enabled": True + }}, + enforce_eager=False) as vllm_model: ep_output = vllm_model.generate_greedy(example_prompts, max_tokens) check_outputs_equal( diff --git a/tests/e2e/multicard/test_fused_moe_allgather_ep.py b/tests/e2e/multicard/test_fused_moe_allgather_ep.py index 85d246e56ba..9335e19af69 100644 --- a/tests/e2e/multicard/test_fused_moe_allgather_ep.py +++ b/tests/e2e/multicard/test_fused_moe_allgather_ep.py @@ -49,7 +49,13 @@ def test_generate_with_allgather(): tensor_parallel_size=2, max_model_len=1024, dtype="auto", - enable_expert_parallel=True) as vllm_model: + enable_expert_parallel=True, + additional_config={ + "ascend_scheduler_config": { + "enabled": True, + "chunked_prefill_enabled": False, + }, + }) as vllm_model: vllm_model.generate(example_prompts, sampling_params) @@ -70,5 +76,11 @@ def test_generate_with_alltoall(): tensor_parallel_size=2, max_model_len=1024, dtype="auto", - enable_expert_parallel=True) as vllm_model: + enable_expert_parallel=True, + additional_config={ + "ascend_scheduler_config": { + "enabled": True, + "chunked_prefill_enabled": False, + }, + }) as vllm_model: vllm_model.generate(example_prompts, sampling_params) diff --git a/tests/e2e/multicard/test_offline_inference_distributed.py b/tests/e2e/multicard/test_offline_inference_distributed.py index 1380c49e3d2..320c3bdf0b9 100644 --- a/tests/e2e/multicard/test_offline_inference_distributed.py +++ b/tests/e2e/multicard/test_offline_inference_distributed.py @@ -82,6 +82,9 @@ def test_models_distributed_DeepSeek_multistream_moe(): "enabled": True, }, "enable_multistream_moe": True, + "ascend_scheduler_config": { + "enabled": True, + }, "refresh": True, }, ) as vllm_model: @@ -151,9 +154,14 @@ def test_models_distributed_DeepSeek_W4A8DYNAMIC(model): quantization="ascend", enforce_eager=True, enable_expert_parallel=True, - additional_config={"torchair_graph_config": { - "enabled": False, - }}, + additional_config={ + "torchair_graph_config": { + "enabled": False, + }, + "ascend_scheduler_config": { + "enabled": True, + } + }, ) as vllm_model: vllm_model.generate_greedy(prompts, max_tokens) diff --git a/tests/e2e/multicard/test_prefix_caching.py b/tests/e2e/multicard/test_prefix_caching.py index e076fd01d4c..e29916623ba 100644 --- a/tests/e2e/multicard/test_prefix_caching.py +++ b/tests/e2e/multicard/test_prefix_caching.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project -"""Compare the with and without prefix caching on V1 scheduler.""" +"""Compare the with and without prefix caching on V1 scheduler or AscendScheduler.""" import pytest @@ -84,3 +84,67 @@ def test_prefix_cache_with_v1_scheduler(model: str, max_tokens: int) -> None: name_0="vllm_output", name_1="prefix_cache_output", ) + + +@pytest.mark.skip(reason="Fix me, the accuracy is not correct") +@pytest.mark.parametrize("model", MODELS) +@pytest.mark.parametrize("max_tokens", [50]) +def test_prefix_cache_with_ascend_scheduler(model: str, + max_tokens: int) -> None: + + with VllmRunner(model, + additional_config={ + 'ascend_scheduler_config': { + 'enabled': True, + }, + }, + enforce_eager=False, + max_model_len=2048, + tensor_parallel_size=2, + gpu_memory_utilization=0.7) as vllm_model: + vllm_output = vllm_model.generate_greedy(INPUT_PROMPTS, max_tokens) + + with VllmRunner(model, + additional_config={ + 'ascend_scheduler_config': { + 'enabled': True, + 'enable_prefix_caching': True, + }, + }, + enforce_eager=False, + max_model_len=2048, + tensor_parallel_size=2, + gpu_memory_utilization=0.7) as vllm_model: + prefix_cache_output = vllm_model.generate_greedy( + INPUT_PROMPTS, max_tokens) + + # TODO: enable apc and chunked prefill with ascend scheduler will lead accuracy problem. + # Disable it now. Fix it or drop the ascend scheduler in the future. + # with VllmRunner(model, + # additional_config={ + # 'ascend_scheduler_config': { + # 'enabled': True, + # 'enable_prefix_caching': True, + # "enable_chunked_prefill": True, + # }, + # }, + # enforce_eager=True, + # max_model_len=2048, + # tensor_parallel_size=2, + # gpu_memory_utilization=0.7) as vllm_model: + # chunk_prefill_prefix_cache_output = vllm_model.generate_greedy( + # INPUT_PROMPTS, max_tokens) + + check_outputs_equal( + outputs_0_lst=vllm_output, + outputs_1_lst=prefix_cache_output, + name_0="vllm_output", + name_1="prefix_cache_output", + ) + + # check_outputs_equal( + # outputs_0_lst=chunk_prefill_prefix_cache_output, + # outputs_1_lst=prefix_cache_output, + # name_0="chunk_prefill_prefix_cache_output", + # name_1="prefix_cache_output", + # ) diff --git a/tests/e2e/multicard/test_qwen3_next.py b/tests/e2e/multicard/test_qwen3_next.py index 6df2da48eb1..e51748ea1e2 100644 --- a/tests/e2e/multicard/test_qwen3_next.py +++ b/tests/e2e/multicard/test_qwen3_next.py @@ -24,7 +24,6 @@ import os from unittest.mock import patch -import pytest from modelscope import snapshot_download # type: ignore from tests.e2e.conftest import VllmRunner @@ -64,8 +63,6 @@ def test_models_distributed_Qwen3_NEXT_TP4_FULL_DECODE_ONLY(): del vllm_model -@pytest.mark.skip( - reason="Qwen3-Next + MTP doesn't work with chunked prefill. Fix Me") def test_models_distributed_Qwen3_NEXT_MTP_TP4_SIMILARITY(): example_prompts = [ "Hello, my name is", @@ -92,6 +89,12 @@ def test_models_distributed_Qwen3_NEXT_MTP_TP4_SIMILARITY(): gpu_memory_utilization=0.8, distributed_executor_backend="mp", enforce_eager=True, + additional_config={ + "ascend_scheduler_config": { + "enabled": True, + "enable_chunked_prefill": False + } + }, speculative_config={ "method": "qwen3_next_mtp", "num_speculative_tokens": 1 diff --git a/tests/e2e/multicard/test_torchair_graph_mode.py b/tests/e2e/multicard/test_torchair_graph_mode.py index ea53f8485e1..a6f3f16d860 100644 --- a/tests/e2e/multicard/test_torchair_graph_mode.py +++ b/tests/e2e/multicard/test_torchair_graph_mode.py @@ -44,6 +44,9 @@ def _deepseek_torchair_test_fixture( kwargs = {} if not use_v1_schduler: kwargs = { + "ascend_scheduler_config": { + "enabled": True, + }, "refresh": True, } additional_config.update(**kwargs) @@ -117,6 +120,9 @@ def _pangu_torchair_test_fixture( # torchair is only work without chunked-prefill now kwargs = { + "ascend_scheduler_config": { + "enabled": True, + }, "refresh": True, } additional_config.update(**kwargs) @@ -179,6 +185,9 @@ def _qwen_torchair_test_fixture( "torchair_graph_config": { "enabled": False, }, + "ascend_scheduler_config": { + "enabled": True, + }, "refresh": True, } @@ -235,6 +244,9 @@ def _deepseek_v2_lite_torchair_test_fixure( kwargs = {} if not use_v1_schduler: kwargs = { + "ascend_scheduler_config": { + "enable": True, + }, "refresh": True, } additional_config.update(**kwargs) diff --git a/tests/e2e/nightly/features/test_mtpx_deepseek_r1_0528_w8a8.py b/tests/e2e/nightly/features/test_mtpx_deepseek_r1_0528_w8a8.py index 880b44ae171..65d01b21240 100644 --- a/tests/e2e/nightly/features/test_mtpx_deepseek_r1_0528_w8a8.py +++ b/tests/e2e/nightly/features/test_mtpx_deepseek_r1_0528_w8a8.py @@ -73,7 +73,11 @@ async def test_models(model: str, mode: str) -> None: "VLLM_RPC_TIMEOUT": "3600000", "VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS": "3600000" } - additional_config: dict[str, Any] = {} + additional_config: dict[str, Any] = { + "ascend_scheduler_config": { + "enabled": False + }, + } speculative_config = { "num_speculative_tokens": 2, "method": "deepseek_mtp" diff --git a/tests/e2e/nightly/features/test_prefix_cache_deepseek_r1_0528_w8a8.py b/tests/e2e/nightly/features/test_prefix_cache_deepseek_r1_0528_w8a8.py index 80157588e71..8ac1883d1c1 100644 --- a/tests/e2e/nightly/features/test_prefix_cache_deepseek_r1_0528_w8a8.py +++ b/tests/e2e/nightly/features/test_prefix_cache_deepseek_r1_0528_w8a8.py @@ -74,6 +74,9 @@ async def test_models(model: str) -> None: "PYTORCH_NPU_ALLOC_CONF": "expandable_segments:True", } additional_config = { + "ascend_scheduler_config": { + "enabled": False + }, "torchair_graph_config": { "enabled": True, "enable_multistream_moe": False, diff --git a/tests/e2e/nightly/features/test_prefix_cache_qwen3_32b_int8.py b/tests/e2e/nightly/features/test_prefix_cache_qwen3_32b_int8.py index fdf7167b8ff..3ee23287c6a 100644 --- a/tests/e2e/nightly/features/test_prefix_cache_qwen3_32b_int8.py +++ b/tests/e2e/nightly/features/test_prefix_cache_qwen3_32b_int8.py @@ -68,7 +68,12 @@ async def test_models(model: str) -> None: port = get_open_port() env_dict = {"TASK_QUEUE_ENABLE": "1", "HCCL_OP_EXPANSION_MODE": "AIV"} - additional_config = {"enable_weight_nz_layout": True} + additional_config = { + "ascend_scheduler_config": { + "enabled": False + }, + "enable_weight_nz_layout": True + } server_args = [ "--quantization", "ascend", "--reasoning-parser", "qwen3", "--tensor-parallel-size", "4", "--port", diff --git a/tests/e2e/nightly/features/test_qwen3_32b_int8_a3_feature_stack3.py b/tests/e2e/nightly/features/test_qwen3_32b_int8_a3_feature_stack3.py index 9fa2d1e54d2..17a7f4b6e0b 100644 --- a/tests/e2e/nightly/features/test_qwen3_32b_int8_a3_feature_stack3.py +++ b/tests/e2e/nightly/features/test_qwen3_32b_int8_a3_feature_stack3.py @@ -83,7 +83,8 @@ async def test_models(model: str, tp_size: int) -> None: "0.9", "--block-size", "128", "--max-num-seqs", "256", "--enforce-eager", "--max-model-len", "35840", "--max-num-batched-tokens", "35840", "--additional-config", - '{"enable_weight_nz_layout":true}', "--compilation-config", + '{"ascend_scheduler_config":{"enabled":true},"enable_weight_nz_layout":true}', + "--compilation-config", '{"cudagraph_mode":"FULL_DECODE_ONLY", "cudagraph_capture_sizes":[1,8,24,48,60]}' ] with RemoteOpenAIServer(model, diff --git a/tests/e2e/nightly/models/test_deepseek_r1_0528_w8a8.py b/tests/e2e/nightly/models/test_deepseek_r1_0528_w8a8.py index 3dd80d4a027..c912657784a 100644 --- a/tests/e2e/nightly/models/test_deepseek_r1_0528_w8a8.py +++ b/tests/e2e/nightly/models/test_deepseek_r1_0528_w8a8.py @@ -33,6 +33,7 @@ "single", "aclgraph", "aclgraph_mlapo", + "no_chunkprefill", ] prompts = [ @@ -81,6 +82,9 @@ async def test_models(model: str, mode: str) -> None: "method": "deepseek_mtp" } additional_config = { + "ascend_scheduler_config": { + "enabled": False + }, "torchair_graph_config": { "enabled": True, "enable_multistream_moe": False, @@ -108,6 +112,10 @@ async def test_models(model: str, mode: str) -> None: if mode == "aclgraph_mlapo": env_dict["VLLM_ASCEND_ENABLE_MLAPO"] = "1" additional_config["torchair_graph_config"] = {"enabled": False} + if mode == "no_chunkprefill": + additional_config["ascend_scheduler_config"] = {"enabled": True} + i = server_args.index("--max-num-batched-tokens") + 1 + server_args[i] = "36864" server_args.extend(["--additional-config", json.dumps(additional_config)]) request_keyword_args: dict[str, Any] = { **api_keyword_args, diff --git a/tests/e2e/nightly/models/test_deepseek_r1_w8a8_eplb.py b/tests/e2e/nightly/models/test_deepseek_r1_w8a8_eplb.py index 6413aba0fcb..bca2baf0dfd 100644 --- a/tests/e2e/nightly/models/test_deepseek_r1_w8a8_eplb.py +++ b/tests/e2e/nightly/models/test_deepseek_r1_w8a8_eplb.py @@ -71,6 +71,9 @@ async def test_models(model: str) -> None: "cudagraph_mode": "FULL_DECODE_ONLY" } additional_config: dict[str, Any] = { + "ascend_scheduler_config": { + "enabled": False + }, "torchair_graph_config": { "enabled": True }, diff --git a/tests/e2e/nightly/models/test_deepseek_v3_2_exp_w8a8.py b/tests/e2e/nightly/models/test_deepseek_v3_2_exp_w8a8.py index 73cd84052c6..217b27866d9 100644 --- a/tests/e2e/nightly/models/test_deepseek_v3_2_exp_w8a8.py +++ b/tests/e2e/nightly/models/test_deepseek_v3_2_exp_w8a8.py @@ -92,6 +92,7 @@ async def test_models(model: str, tp_size: int, dp_size: int, "--gpu-memory-utilization", "0.9", "--additional-config", + '{"ascend_scheduler_config":{"enabled":true},' '"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}', ] if full_graph: diff --git a/tests/e2e/nightly/models/test_qwen2_5_vl_32b.py b/tests/e2e/nightly/models/test_qwen2_5_vl_32b.py index 77c1a7e1d73..fe6bbedf2eb 100644 --- a/tests/e2e/nightly/models/test_qwen2_5_vl_32b.py +++ b/tests/e2e/nightly/models/test_qwen2_5_vl_32b.py @@ -85,8 +85,9 @@ async def test_models(model: str, tp_size: int) -> None: str(tp_size), "--port", str(port), "--max-model-len", "30000", "--max-num-batched-tokens", "40000", "--max-num-seqs", "400", "--trust-remote-code", - "--gpu-memory-utilization", "0.8", "--compilation_config", - '{"cudagraph_mode": "FULL_DECODE_ONLY"}' + "--gpu-memory-utilization", "0.8", "--additional-config", + '{"ascend_scheduler_config":{"enabled":false}}', + "--compilation_config", '{"cudagraph_mode": "FULL_DECODE_ONLY"}' ] request_keyword_args: dict[str, Any] = { **api_keyword_args, diff --git a/tests/e2e/nightly/models/test_qwen3_235b_a22b_w8a8_eplb.py b/tests/e2e/nightly/models/test_qwen3_235b_a22b_w8a8_eplb.py index efbf77d20f8..945d7cae3b1 100644 --- a/tests/e2e/nightly/models/test_qwen3_235b_a22b_w8a8_eplb.py +++ b/tests/e2e/nightly/models/test_qwen3_235b_a22b_w8a8_eplb.py @@ -60,7 +60,11 @@ async def test_models(model: str) -> None: "PYTORCH_NPU_ALLOC_CONF": "expandable_segments:True", "VLLM_ASCEND_ENABLE_FLASHCOMM1": "1" } - additional_config: dict[str, Any] = {} + additional_config: dict[str, Any] = { + "ascend_scheduler_config": { + "enabled": False + }, + } compilation_config = {"cudagraph_mode": "FULL_DECODE_ONLY"} server_args = [ "--quantization", "ascend", "--async-scheduling", diff --git a/tests/e2e/nightly/models/test_qwen3_235b_w8a8.py b/tests/e2e/nightly/models/test_qwen3_235b_w8a8.py index 055a452e5b2..8220e4d59af 100644 --- a/tests/e2e/nightly/models/test_qwen3_235b_w8a8.py +++ b/tests/e2e/nightly/models/test_qwen3_235b_w8a8.py @@ -63,6 +63,11 @@ async def test_models(model: str, mode: str) -> None: "PYTORCH_NPU_ALLOC_CONF": "expandable_segments:True", "VLLM_ASCEND_ENABLE_FLASHCOMM1": "1" } + additional_config: dict[str, Any] = { + "ascend_scheduler_config": { + "enabled": False + }, + } compilation_config = {"cudagraph_mode": "FULL_DECODE_ONLY"} server_args = [ "--quantization", "ascend", "--async-scheduling", @@ -77,6 +82,7 @@ async def test_models(model: str, mode: str) -> None: server_args.extend( ["--compilation-config", json.dumps(compilation_config)]) + server_args.extend(["--additional-config", json.dumps(additional_config)]) request_keyword_args: dict[str, Any] = { **api_keyword_args, } diff --git a/tests/e2e/nightly/models/test_qwq_32b.py b/tests/e2e/nightly/models/test_qwq_32b.py index 824651ba6c6..a60eff224b1 100644 --- a/tests/e2e/nightly/models/test_qwq_32b.py +++ b/tests/e2e/nightly/models/test_qwq_32b.py @@ -93,6 +93,8 @@ async def test_models(model: str, mode: str, tp_size: int) -> None: server_args.remove( '{"cudagraph_mode":"FULL_DECODE_ONLY", "cudagraph_capture_sizes": [1, 8, 24, 48, 60]}' ) + server_args.append("--additional-config") + server_args.append('{"ascend_scheduler_config":{"enabled":true}}') server_args.append("--enforce-eager") request_keyword_args: dict[str, Any] = { **api_keyword_args, diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2-torchair.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2-torchair.yaml index 7bfe3f5e99c..42b70f76456 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2-torchair.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2-torchair.yaml @@ -30,7 +30,7 @@ deployment: --quantization ascend --gpu-memory-utilization 0.9 --speculative-config '{"num_speculative_tokens": 1, "method":"deepseek_mtp"}' - --additional-config '{"torchair_graph_config":{"enabled":true,"enable_multistream_moe":true},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' + --additional-config '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_moe":true},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' - server_cmd: > @@ -51,7 +51,7 @@ deployment: --quantization ascend --gpu-memory-utilization 0.9 --speculative-config '{"num_speculative_tokens": 1, "method":"deepseek_mtp"}' - --additional-config '{"torchair_graph_config":{"enabled":true,"enable_multistream_moe":true},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' + --additional-config '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_moe":true},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' benchmarks: acc: case_type: accuracy diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2.yaml index 01100f29481..cf44bc8f5e6 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2.yaml @@ -31,7 +31,7 @@ deployment: --gpu-memory-utilization 0.9 --enforce-eager --speculative-config '{"num_speculative_tokens": 1, "method":"deepseek_mtp"}' - --additional-config '{"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' + --additional-config '{"ascend_scheduler_config":{"enabled":false},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' - server_cmd: > @@ -53,5 +53,5 @@ deployment: --gpu-memory-utilization 0.9 --enforce-eager --speculative-config '{"num_speculative_tokens": 1, "method":"deepseek_mtp"}' - --additional-config '{"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' + --additional-config '{"ascend_scheduler_config":{"enabled":false},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' benchmarks: diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-EPLB.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-EPLB.yaml index 6ca189c4298..9a4c3d94407 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-EPLB.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-EPLB.yaml @@ -50,7 +50,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' - server_cmd: > @@ -80,7 +80,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -111,7 +111,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -141,7 +141,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' benchmarks: perf: case_type: performance diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8.yaml index 37a024b989a..a8e49290bd8 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8.yaml @@ -49,7 +49,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true}' + '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true}' - server_cmd: > @@ -79,7 +79,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true}' + '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -110,7 +110,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true}' + '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -140,7 +140,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true}' + '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true}' benchmarks: perf: case_type: performance diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3_2-Exp-bf16.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3_2-Exp-bf16.yaml index 40ac6476404..6dafd3ccd31 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3_2-Exp-bf16.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3_2-Exp-bf16.yaml @@ -29,7 +29,7 @@ deployment: --trust-remote-code --no-enable-prefix-caching --gpu-memory-utilization 0.9 - --additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' + --additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' - server_cmd: > @@ -49,5 +49,5 @@ deployment: --trust-remote-code --no-enable-prefix-caching --gpu-memory-utilization 0.92 - --additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' + --additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' benchmarks: diff --git a/tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py b/tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py index 6b90ec365ce..2f56d9d2ab4 100644 --- a/tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py +++ b/tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py @@ -48,26 +48,27 @@ def mtp_correctness(sampling_config: SamplingParams, if graph_mode == CUDAGraphMode.FULL: graph_mode_str = "FULL_DECODE_ONLY" - with VllmRunner(model_name, - tensor_parallel_size=1, - max_num_seqs=256, - gpu_memory_utilization=0.7, - distributed_executor_backend="mp", - enable_expert_parallel=True, - speculative_config={ - "method": - "deepseek_mtp", - "num_speculative_tokens": - num_speculative_tokens, - "disable_padded_drafter_batch": - disable_padded_drafter_batch, - }, - enforce_eager=enforce_eager, - max_model_len=2000, - compilation_config=CompilationConfig( - cudagraph_mode=graph_mode_str, - cudagraph_capture_sizes=[12], - )) as spec_llm: + with VllmRunner( + model_name, + tensor_parallel_size=1, + max_num_seqs=256, + gpu_memory_utilization=0.7, + distributed_executor_backend="mp", + enable_expert_parallel=True, + speculative_config={ + "method": "deepseek_mtp", + "num_speculative_tokens": num_speculative_tokens, + "disable_padded_drafter_batch": disable_padded_drafter_batch, + }, + enforce_eager=enforce_eager, + max_model_len=2000, + compilation_config=CompilationConfig( + cudagraph_mode=graph_mode_str, + cudagraph_capture_sizes=[12], + ), + additional_config={"ascend_scheduler_config": { + "enabled": False + }}) as spec_llm: spec_outputs = spec_llm.generate(example_prompts, sampling_config) matches = 0 diff --git a/tests/e2e/singlecard/test_ascend_scheduler.py b/tests/e2e/singlecard/test_ascend_scheduler.py new file mode 100644 index 00000000000..502a810376e --- /dev/null +++ b/tests/e2e/singlecard/test_ascend_scheduler.py @@ -0,0 +1,170 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import pytest +from vllm import SamplingParams + +from tests.e2e.conftest import VllmRunner +from tests.e2e.model_utils import check_outputs_equal + +MODEL = "Qwen/Qwen3-0.6B" + + +@pytest.mark.parametrize("enforce_eager", [True, False]) +def test_concurrent_partial_prefill(enforce_eager): + with VllmRunner(MODEL, + additional_config={ + 'ascend_scheduler_config': { + 'enabled': True, + }, + }, + max_num_seqs=3, + max_num_batched_tokens=8192, + enforce_eager=enforce_eager, + gpu_memory_utilization=0.7) as vllm_model: + outputs = vllm_model.model.generate(["Hello my name is Robert and I"] * + 3) + assert len(outputs) == 3 + for output in outputs: + assert len(output.outputs) == 1 + + +@pytest.mark.parametrize("enforce_eager", [True, False]) +def test_prefix_cache_stats_is_recorded(enforce_eager): + with VllmRunner(MODEL, + additional_config={ + 'ascend_scheduler_config': { + 'enabled': True, + }, + }, + max_num_seqs=3, + max_num_batched_tokens=8192, + enforce_eager=enforce_eager, + gpu_memory_utilization=0.7) as vllm_model: + # 17 tokens will make sure first 16 tokens are cached in a block + input_tokens = {"prompt_token_ids": [101] * 129} + _ = vllm_model.model.generate([input_tokens]) + outputs = vllm_model.model.generate([input_tokens]) + assert outputs[0].num_cached_tokens == 128 + + +@pytest.mark.parametrize("max_tokens", + [4]) # cannot align results when max_tokens > 4 +@pytest.mark.parametrize("chunked_prefill_token_size", [2048]) +def test_chunked_prefill_with_ascend_scheduler( + max_tokens: int, chunked_prefill_token_size: int) -> None: + example_prompts = [ + "vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs." + ] + max_num_seqs = chunked_prefill_token_size + max_num_batched_tokens = chunked_prefill_token_size + with VllmRunner(MODEL, + additional_config={ + 'ascend_scheduler_config': { + 'enabled': True, + 'enable_chunked_prefill': True, + }, + }, + max_num_seqs=max_num_seqs, + max_num_batched_tokens=max_num_batched_tokens, + max_model_len=2048, + gpu_memory_utilization=0.7) as vllm_model: + chunked_prefill_output = vllm_model.generate_greedy( + example_prompts, max_tokens) + + with VllmRunner(MODEL, + additional_config={ + 'ascend_scheduler_config': { + 'enabled': True, + }, + }, + max_model_len=2048, + gpu_memory_utilization=0.7) as vllm_model: + vllm_output = vllm_model.generate_greedy(example_prompts, max_tokens) + + check_outputs_equal( + outputs_0_lst=vllm_output, + outputs_1_lst=chunked_prefill_output, + name_0="vllm_output", + name_1="chunked_prefill_output", + ) + + +@pytest.mark.parametrize("max_tokens", + [4]) # cannot align results when max_tokens > 4 +@pytest.mark.parametrize("chunked_prefill_token_size", [2048]) +def test_chunked_prefill_with_scheduler_dynamic_batch( + max_tokens: int, chunked_prefill_token_size: int) -> None: + example_prompts = [ + "vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs." + ] + max_num_seqs = chunked_prefill_token_size + max_num_batched_tokens = chunked_prefill_token_size + with VllmRunner(MODEL, + additional_config={ + 'SLO_limits_for_dynamic_batch': 0, + }, + max_num_seqs=max_num_seqs, + max_num_batched_tokens=max_num_batched_tokens, + max_model_len=2048, + gpu_memory_utilization=0.7) as vllm_model: + dynamic_batch_output = vllm_model.generate_greedy( + example_prompts, max_tokens) + + with VllmRunner(MODEL, + additional_config={ + 'SLO_limits_for_dynamic_batch': -1, + }, + max_model_len=2048, + gpu_memory_utilization=0.7) as vllm_model: + vllm_output = vllm_model.generate_greedy(example_prompts, max_tokens) + + check_outputs_equal( + outputs_0_lst=vllm_output, + outputs_1_lst=dynamic_batch_output, + name_0="vllm_output", + name_1="chunked_prefill_output", + ) + + +def test_async_scheduling_eager() -> None: + prompts = [ + "Hello, my name is", + "The president of the United States is", + "The capital of France is", + "The future of AI is", + ] * 10 + sampling_params = SamplingParams(temperature=0.2, + max_tokens=10, + stop_token_ids=None) + + with VllmRunner( + "Qwen/Qwen2.5-0.5B-Instruct", + max_model_len=4096, + max_num_seqs=50, + dtype="bfloat16", + gpu_memory_utilization=0.9, + async_scheduling=True, + ) as vllm_model: + vllm_model.generate(prompts, sampling_params=sampling_params) + + +def test_async_scheduling_with_full_graph() -> None: + prompts = [ + "Hello, my name is", + "The president of the United States is", + "The capital of France is", + "The future of AI is", + ] * 10 + sampling_params = SamplingParams(temperature=0.2, + max_tokens=10, + stop_token_ids=None) + + with VllmRunner("Qwen/Qwen3-8B", + max_model_len=4096, + max_num_seqs=50, + dtype="bfloat16", + gpu_memory_utilization=0.9, + async_scheduling=True, + compilation_config={"cudagraph_mode": + "FULL"}) as vllm_model: + vllm_model.generate(prompts, sampling_params=sampling_params) diff --git a/tests/e2e/singlecard/test_chunked.py b/tests/e2e/singlecard/test_chunked.py new file mode 100644 index 00000000000..f6eacb71dac --- /dev/null +++ b/tests/e2e/singlecard/test_chunked.py @@ -0,0 +1,82 @@ +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# Copyright 2023 The vLLM team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Compare the outputs of vLLM with and without aclgraph. + +Run `pytest tests/compile/test_aclgraph.py`. +""" +import gc + +import pytest +import torch +from vllm import SamplingParams + +from tests.e2e.conftest import VllmRunner + +MODELS = ["Qwen/Qwen2.5-0.5B-Instruct"] + + +@pytest.mark.parametrize("model", MODELS) +@pytest.mark.parametrize("max_tokens", [1]) +def test_models( + model: str, + max_tokens: int, +) -> None: + prompts = ["The president of the United States is"] + + sampling_params = SamplingParams( + max_tokens=max_tokens, + temperature=0.0, + ) + + with VllmRunner(model, + long_prefill_token_threshold=20, + enforce_eager=False) as vllm_model: + output1 = vllm_model.generate(prompts, sampling_params) + + with VllmRunner(model, + enforce_eager=False, + additional_config={ + 'ascend_scheduler_config': { + 'enabled': True + }, + }) as vllm_model: + output2 = vllm_model.generate(prompts, sampling_params) + + # Extract the generated token IDs for comparison + token_ids1 = output1[0][0][0] + token_ids2 = output2[0][0][0] + + print(f"Token IDs 1: {token_ids1}") + print(f"Token IDs 2: {token_ids2}") + + # Convert token IDs to tensors and calculate cosine similarity + # Take the length of a shorter sequence to ensure consistent dimensions + min_len = min(len(token_ids1), len(token_ids2)) + + tensor1 = torch.tensor(token_ids1[:min_len], dtype=torch.float32) + tensor2 = torch.tensor(token_ids2[:min_len], dtype=torch.float32) + + # Calculate similarity using torch.cosine_similarity + similarity = torch.cosine_similarity(tensor1, tensor2, dim=0) + print(f"Token IDs cosine similarity: {similarity.item()}") + + assert similarity > 0.95 + + gc.collect() + torch.npu.empty_cache() + torch.npu.reset_peak_memory_stats() diff --git a/tests/e2e/singlecard/test_vlm.py b/tests/e2e/singlecard/test_vlm.py index 954566799c0..cc3d50f8b3d 100644 --- a/tests/e2e/singlecard/test_vlm.py +++ b/tests/e2e/singlecard/test_vlm.py @@ -20,6 +20,7 @@ Run `pytest tests/test_offline_inference.py`. """ +import pytest from vllm import SamplingParams from vllm.assets.audio import AudioAsset from vllm.assets.image import ImageAsset @@ -54,6 +55,40 @@ def test_multimodal_vl(prompt_template): assert output_str, "Generated output should not be empty." +@pytest.mark.skip(reason="This e2e test will stuck in multi-batch scenario. " + "Add this back after fixing the issue.") +def test_multimodal_ascend_scheduler(prompt_template): + image = ImageAsset("cherry_blossom") \ + .pil_image.convert("RGB") + img_questions = [ + "What is the content of this image?", + "Describe the content of this image in detail.", + "What's in the image?", + "Where is this image taken?", + ] + images = [image] * len(img_questions) + prompts = prompt_template(img_questions) + with VllmRunner("Qwen/Qwen2.5-VL-3B-Instruct", + max_model_len=4096, + additional_config={ + 'ascend_scheduler_config': { + 'enabled': True, + }, + }, + mm_processor_kwargs={ + "min_pixels": 28 * 28, + "max_pixels": 1280 * 28 * 28, + "fps": 1, + }, + enforce_eager=True) as vllm_model: + outputs = vllm_model.generate_greedy(prompts=prompts, + images=images, + max_tokens=64) + assert len(outputs) == len(prompts) + for _, output_str in outputs: + assert output_str, "Generated output should not be empty." + + def test_multimodal_audio(): audio_prompt = "".join([ f"Audio {idx+1}: <|audio_bos|><|AUDIO|><|audio_eos|>\n" diff --git a/tests/ut/core/test_schedule_config.py b/tests/ut/core/test_schedule_config.py new file mode 100644 index 00000000000..032a1a87712 --- /dev/null +++ b/tests/ut/core/test_schedule_config.py @@ -0,0 +1,134 @@ +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from vllm.config import SchedulerConfig + +from tests.ut.base import TestBase +from vllm_ascend.core.schedule_config import AscendSchedulerConfig + + +class TestAscendSchedulerConfig(TestBase): + + def setUp(self): + self.basic_scheduler_config = SchedulerConfig( + max_num_batched_tokens=8192, + max_model_len=8192, + is_multimodal_model=False, + send_delta_data=False, + ) + + def test_initialize_from_config_with_default(self): + # No additional config given, check the default value here. + ascend_config = AscendSchedulerConfig.initialize_from_config( + self.basic_scheduler_config, {}) + self.assertEqual(ascend_config.enable_chunked_prefill, False) + self.assertEqual(ascend_config.policy, "fcfs") + self.assertEqual(ascend_config.scheduler_cls, + "vllm_ascend.core.scheduler.AscendScheduler") + self.assertEqual(ascend_config.max_num_encoder_input_tokens, 8192) + self.assertEqual(ascend_config.encoder_cache_size, 8192) + + def test_initialize_from_config_with_override(self): + # test override + ascend_config = AscendSchedulerConfig.initialize_from_config( + self.basic_scheduler_config, + AscendSchedulerConfig( + enable_chunked_prefill=False, + policy="fcfs", + scheduler_cls="vllm_ascend.core.scheduler.AscendScheduler", + max_num_batched_tokens=8192, + max_model_len=2048, + max_long_partial_prefills=1, + long_prefill_token_threshold=512, + ), + ) + self.assertEqual(ascend_config.enable_chunked_prefill, False) + self.assertEqual(ascend_config.policy, "fcfs") + self.assertEqual(ascend_config.scheduler_cls, + "vllm_ascend.core.scheduler.AscendScheduler") + self.assertEqual(ascend_config.max_num_batched_tokens, 8192) + self.assertEqual(ascend_config.encoder_cache_size, 8192) + self.assertEqual(ascend_config.max_long_partial_prefills, 1) + self.assertEqual(ascend_config.long_prefill_token_threshold, 512) + + def test_not_implemented_policy(self): + with self.assertRaises(NotImplementedError) as context: + AscendSchedulerConfig.initialize_from_config( + self.basic_scheduler_config, + AscendSchedulerConfig( + policy="custom_policy", + max_num_batched_tokens=8192, + max_model_len=2048, + ), + ) + self.assertIn( + "currently AscendScheduler only supports fcfs policy", + str(context.exception), + ) + + def test_no_override(self): + ascend_config = AscendSchedulerConfig.initialize_from_config( + self.basic_scheduler_config, {}) + self.assertEqual(ascend_config.max_num_encoder_input_tokens, 8192) + self.assertEqual(ascend_config.encoder_cache_size, 8192) + + def test_valid_config_with_multimodal(self): + config = AscendSchedulerConfig.initialize_from_config( + SchedulerConfig(is_multimodal_model=True, + max_num_batched_tokens=8192), {}) + self.assertTrue(config.is_multimodal_model) + + def test_valid_config_with_chunked_prefill(self): + ascend_config = AscendSchedulerConfig.initialize_from_config( + self.basic_scheduler_config, + AscendSchedulerConfig( + enable_chunked_prefill=True, + max_num_batched_tokens=8192, + max_model_len=8192, + ), + ) + self.assertEqual(ascend_config.max_num_batched_tokens, 8192) + self.assertEqual(ascend_config.max_model_len, 8192) + self.assertTrue(ascend_config.enable_chunked_prefill) + + def test_invalid_config_without_chunked_prefill(self): + with self.assertRaises(ValueError) as context: + AscendSchedulerConfig.initialize_from_config( + self.basic_scheduler_config, + AscendSchedulerConfig( + enable_chunked_prefill=False, + max_num_batched_tokens=2048, + max_model_len=8192, + ), + ) + self.assertIn( + "Ascend scheduler is enabled without chunked prefill feature", + str(context.exception), + ) + self.assertIn("max_num_batched_tokens (2048)", str(context.exception)) + self.assertIn("max_model_len (8192)", str(context.exception)) + + def test_initialize_from_config_with_pd_transfer(self): + ascend_config = AscendSchedulerConfig.initialize_from_config( + self.basic_scheduler_config, + AscendSchedulerConfig( + enable_pd_transfer=True, + decode_max_num_seqs=48, + max_num_batched_tokens=8192, + max_model_len=4096, + ), + ) + self.assertEqual(ascend_config.enable_pd_transfer, True) + self.assertEqual(ascend_config.decode_max_num_seqs, 48) diff --git a/tests/ut/core/test_scheduler.py b/tests/ut/core/test_scheduler.py new file mode 100644 index 00000000000..53af2f4756e --- /dev/null +++ b/tests/ut/core/test_scheduler.py @@ -0,0 +1,1473 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from typing import Any, Dict, List, Optional, Tuple +from unittest.mock import MagicMock, patch + +import numpy as np +import torch +from vllm.config import (CacheConfig, KVTransferConfig, ModelConfig, + SchedulerConfig, SpeculativeConfig, VllmConfig) +from vllm.multimodal.inputs import (MultiModalFeatureSpec, + MultiModalKwargsItem, PlaceholderRange) +from vllm.sampling_params import SamplingParams +from vllm.utils.hashing import sha256 +from vllm.v1.core.kv_cache_utils import (get_request_block_hasher, + init_none_hash) +from vllm.v1.core.sched.output import SchedulerOutput +from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig, + KVCacheGroupSpec) +from vllm.v1.outputs import DraftTokenIds, ModelRunnerOutput +from vllm.v1.request import Request, RequestStatus +from vllm.v1.structured_output import StructuredOutputManager + +from tests.ut.base import TestBase +from vllm_ascend.core.scheduler import AscendScheduler +from vllm_ascend.core.scheduler_dynamic_batch import SchedulerDynamicBatch + +EOS_TOKEN_ID = 50256 +MODEL = "Qwen3-0.6B" +ENABLE_PREFIX_CACHING = None +PROMPT_LOGPROBS = None +ENABLE_CHUNKED_PREFILL = False +MAX_NUM_BATCHED_TOKENS = 10000 +LONG_PREFILL_TOKEN_THRESHOLD = 0 +NUM_SPECULATIVE_TOKENS = None +MAX_NUM_SEQS = 16 + + +def create_requests( + num_requests: int, + num_tokens: int = 10, + mm_positions: Optional[list[PlaceholderRange]] = None, + max_tokens: int = 16, + stop_token_ids: Optional[list[int]] = None, + block_size: int = 3, + hash_fn=sha256, +): + init_none_hash(hash_fn) + prompt_logprobs = PROMPT_LOGPROBS + sampling_params = SamplingParams(ignore_eos=False, + max_tokens=max_tokens, + stop_token_ids=stop_token_ids, + prompt_logprobs=prompt_logprobs) + requests = [] + for i in range(num_requests): + mm_features = [] + if mm_positions is not None: + mm_position = mm_positions[i] + for j, position in enumerate(mm_position): + identifier = f"hash{i}_{j}" + mm_feature = MultiModalFeatureSpec( + data=MultiModalKwargsItem.dummy("dummy_m"), + mm_position=position, + identifier=identifier, + modality="image") + mm_features.append(mm_feature) + request = Request(request_id=f"{i}", + prompt_token_ids=[i] * num_tokens, + sampling_params=sampling_params, + eos_token_id=EOS_TOKEN_ID, + pooling_params=None, + mm_features=mm_features if mm_features else None, + block_hasher=get_request_block_hasher( + block_size, hash_fn)) + requests.append(request) + return requests + + +def make_output(scheduler): + req_ids = [req.request_id for req in scheduler.running] + req_id_to_index = { + req.request_id: i + for i, req in enumerate(scheduler.running) + } + sampled_token_ids = [ + np.array([1000], dtype=np.int64) for _ in scheduler.running + ] + + logprobs = None + + modelrunner_output = ModelRunnerOutput( + req_ids=req_ids, + req_id_to_index=req_id_to_index, + sampled_token_ids=sampled_token_ids, + logprobs=logprobs, + prompt_logprobs_dict={}, + pooler_output=[], + ) + return modelrunner_output + + +class TestAscendScheduler(TestBase): + + @patch("vllm.config.ModelConfig.__post_init__", MagicMock()) + @patch("vllm.config.VllmConfig.__post_init__", MagicMock()) + @patch('vllm.v1.core.sched.scheduler.compute_encoder_budget') + def create_scheduler(self, mock_compute_encoder_budget): + mock_compute_encoder_budget.return_value = [100, 100] + use_kv_connector = False + block_size = 16 + + scheduler_config = SchedulerConfig( + max_num_seqs=16, + max_model_len=MAX_NUM_BATCHED_TOKENS, + long_prefill_token_threshold=LONG_PREFILL_TOKEN_THRESHOLD, + disable_chunked_mm_input=False, + enable_chunked_prefill=ENABLE_CHUNKED_PREFILL, + max_num_batched_tokens=MAX_NUM_BATCHED_TOKENS, + ) + + scheduler_config.max_num_encoder_input_tokens = 10000 + scheduler_config.encoder_cache_size = 10000 + scheduler_config.chunked_prefill_enabled = False + + model_config = ModelConfig( + model=MODEL, + task="auto", + tokenizer=MODEL, + tokenizer_mode="auto", + trust_remote_code=True, + dtype="float16", + seed=42, + max_model_len=MAX_NUM_BATCHED_TOKENS, + ) + model_config.pooler_config = MagicMock() + model_config.multimodal_config = MagicMock() + model_config.hf_config = MagicMock() + model_config.hf_config.is_encoder_decoder = False + # Cache config, optionally force APC + kwargs_cache: Dict[str, + Any] = ({} if ENABLE_PREFIX_CACHING is None else { + 'enable_prefix_caching': + ENABLE_PREFIX_CACHING + }) + cache_config = CacheConfig( + block_size=block_size, + gpu_memory_utilization=0.9, + swap_space=0, + cache_dtype="auto", + **kwargs_cache, + ) + + kv_transfer_config = KVTransferConfig( + kv_connector="SharedStorageConnector", + kv_role="kv_both", + kv_connector_extra_config={"shared_storage_path": "local_storage"}, + ) if use_kv_connector else None + + speculative_config: Optional[SpeculativeConfig] = None + if NUM_SPECULATIVE_TOKENS is not None: + speculative_config = SpeculativeConfig( + model="ngram", num_speculative_tokens=NUM_SPECULATIVE_TOKENS) + + vllm_config = VllmConfig( + scheduler_config=scheduler_config, + model_config=model_config, + cache_config=cache_config, + kv_transfer_config=kv_transfer_config, + speculative_config=speculative_config, + ) + + kv_cache_config = KVCacheConfig( + num_blocks=10000, # A large number of blocks to hold all requests + kv_cache_tensors=[], + kv_cache_groups=[ + KVCacheGroupSpec(['layer'], + FullAttentionSpec(block_size, 1, 1, + torch.float32, False, + False)) + ], + ) + cache_config.num_gpu_blocks = 10000 + + scheduler = AscendScheduler( + vllm_config=vllm_config, + kv_cache_config=kv_cache_config, + log_stats=True, + block_size=block_size, + structured_output_manager=MagicMock(spec=StructuredOutputManager), + ) + + should_advance = MagicMock() + should_advance.return_value = False + scheduler.structured_output_manager.should_advance = should_advance + + return scheduler + + def test_add_requests(self): + scheduler = self.create_scheduler() + requests = create_requests(num_requests=10) + + for i, request in enumerate(requests): + scheduler.add_request(request) + self.assertIn(request.request_id, scheduler.requests) + self.assertEqual(len(scheduler.waiting), i + 1) + + def test_finish_request(self): + scheduler = self.create_scheduler() + requests = create_requests(num_requests=10) + for request in requests: + scheduler.add_request(request) + + for i, request in enumerate(requests): + scheduler.finish_requests(request.request_id, + RequestStatus.FINISHED_ABORTED) + self.assertNotIn(request.request_id, scheduler.requests) + self.assertEqual(len(scheduler.waiting), 9 - i) + + def test_get_num_unfinished_requests(self): + scheduler = self.create_scheduler() + requests = create_requests(num_requests=10) + for request in requests: + scheduler.add_request(request) + + for i, request in enumerate(requests): + scheduler.finish_requests(request.request_id, + RequestStatus.FINISHED_STOPPED) + self.assertEqual(scheduler.get_num_unfinished_requests(), + len(requests) - i - 1) + + def test_schedule(self): + '''Test scheduling. + Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs + ''' + scheduler = self.create_scheduler() + scheduler.scheduler_config.chunked_prefill_enabled = False + requests = create_requests(num_requests=10) + for request in requests: + scheduler.add_request(request) + + # Test initial scheduling + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), len(requests)) + self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) + self.assertEqual(len(output.finished_req_ids), 0) + # Verify all requests are scheduled. + for req_id, num_tokens in output.num_scheduled_tokens.items(): + self.assertEqual(num_tokens, + len(requests[int(req_id)].prompt_token_ids)) + + # Verify requests moved from waiting to running + self.assertEqual(len(scheduler.waiting), 0) + self.assertEqual(len(scheduler.running), len(requests)) + for i, request in enumerate(requests): + self.assertEqual(scheduler.running[i], request) + + def test_schedule_multimodal_requests(self): + scheduler = self.create_scheduler() + scheduler.scheduler_config.chunked_prefill_enabled = False + mm_positions = [[PlaceholderRange(offset=i, length=10)] + for i in range(10)] + requests = create_requests( + num_requests=10, + mm_positions=mm_positions, + ) + for request in requests: + scheduler.add_request(request) + + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), len(requests)) + self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) + self.assertEqual(len(output.finished_req_ids), 0) + for req_id, num_tokens in output.num_scheduled_tokens.items(): + assert num_tokens == len(requests[int(req_id)].prompt_token_ids) + + # Verify all requests are scheduled. + for req_id, num_tokens in output.num_scheduled_tokens.items(): + self.assertEqual(num_tokens, + len(requests[int(req_id)].prompt_token_ids)) + self.assertEqual(len(output.scheduled_encoder_inputs), len(requests)) + for req_id, encoder_input in output.scheduled_encoder_inputs.items(): + assert len(encoder_input) == 1 + + # Verify requests moved from waiting to running + self.assertEqual(len(scheduler.waiting), 0) + self.assertEqual(len(scheduler.running), len(requests)) + for i, request in enumerate(requests): + self.assertEqual(scheduler.running[i], request) + + def test_concurrent_partial_prefills_schedule(self): + '''Test concurrent partial prefills scheduling. + total requests = 10, every request has 10 token. + while set long_prefill_token_threshold = 1, scheduler can + only schedule max_long_partial_prefills long request. + ''' + scheduler = self.create_scheduler() + scheduler.scheduler_config.chunked_prefill_enabled = False + scheduler.scheduler_config.max_long_partial_prefills = 2 + scheduler.scheduler_config.long_prefill_token_threshold = 1 + requests = create_requests(num_requests=10, num_tokens=20) + for request in requests: + scheduler.add_request(request) + + # Test initial scheduling + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), + scheduler.scheduler_config.max_long_partial_prefills) + self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) + self.assertEqual(len(output.finished_req_ids), 0) + + def test_schedule_enable_prefix_caching(self): + '''Test scheduling. + Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs + ''' + global ENABLE_PREFIX_CACHING + ENABLE_PREFIX_CACHING = True + global PROMPT_LOGPROBS + PROMPT_LOGPROBS = 5 + scheduler = self.create_scheduler() + scheduler.scheduler_config.chunked_prefill_enabled = False + requests = create_requests(num_requests=10) + for request in requests: + scheduler.add_request(request) + + # Test initial scheduling + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), len(requests)) + self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) + self.assertEqual(len(output.finished_req_ids), 0) + # Verify all requests are scheduled. + for req_id, num_tokens in output.num_scheduled_tokens.items(): + self.assertEqual(num_tokens, + len(requests[int(req_id)].prompt_token_ids)) + + # Verify requests moved from waiting to running + self.assertEqual(len(scheduler.waiting), 0) + self.assertEqual(len(scheduler.running), len(requests)) + for i, request in enumerate(requests): + self.assertEqual(scheduler.running[i], request) + + def test_stop_via_update_from_output(self): + """Test stopping behavior through update_from_output""" + global NUM_SPECULATIVE_TOKENS + NUM_SPECULATIVE_TOKENS = 1 + scheduler = self.create_scheduler() + + # Test case 1: Stop on EOS token + requests = create_requests(num_requests=2, max_tokens=10) + for req in requests: + req.num_computed_tokens = req.num_tokens + scheduler.requests[req.request_id] = req + scheduler.running.append(req) + req.status = RequestStatus.RUNNING + + scheduler_output = SchedulerOutput(scheduled_new_reqs=[], + scheduled_cached_reqs=[], + num_scheduled_tokens={ + requests[0].request_id: 1, + requests[1].request_id: 2 + }, + total_num_scheduled_tokens=3, + scheduled_encoder_inputs={}, + scheduled_spec_decode_tokens={ + requests[0].request_id: [], + requests[1].request_id: [10] + }, + num_common_prefix_blocks=0, + finished_req_ids=set(), + free_encoder_mm_hashes=[]) + model_output = ModelRunnerOutput( + req_ids=[req.request_id for req in requests], + req_id_to_index={ + req.request_id: i + for i, req in enumerate(requests) + }, + sampled_token_ids=[np.array([EOS_TOKEN_ID]), + np.array([10, 11]) + ], # First request hits EOS, second continues + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output, model_output) + + # Verify first request stopped, second continues + self.assertEqual(len(scheduler.running), 1) + self.assertEqual(scheduler.running[0].request_id, + requests[1].request_id) + self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) + self.assertIn(requests[0].request_id, scheduler.finished_req_ids) + self.assertEqual(list(requests[0].output_token_ids), [EOS_TOKEN_ID]) + self.assertEqual(list(requests[1].output_token_ids), [10, 11]) + + # Test case 2: Stop on custom stop token + NUM_SPECULATIVE_TOKENS = 2 + scheduler = self.create_scheduler() + requests = create_requests(num_requests=2, + max_tokens=10, + stop_token_ids=[42, 43]) + for req in requests: + req.num_computed_tokens = req.num_tokens + scheduler.requests[req.request_id] = req + scheduler.running.append(req) + req.status = RequestStatus.RUNNING + + scheduler_output = SchedulerOutput(scheduled_new_reqs=[], + scheduled_cached_reqs=[], + num_scheduled_tokens={ + requests[0].request_id: 3, + requests[1].request_id: 2 + }, + total_num_scheduled_tokens=5, + scheduled_encoder_inputs={}, + scheduled_spec_decode_tokens={ + requests[0].request_id: + [10, 42], + requests[1].request_id: [13] + }, + num_common_prefix_blocks=0, + finished_req_ids=set(), + free_encoder_mm_hashes=[]) + model_output = ModelRunnerOutput( + req_ids=[req.request_id for req in requests], + req_id_to_index={ + req.request_id: i + for i, req in enumerate(requests) + }, + sampled_token_ids=[np.array([10, 42, 12]), + np.array([13, 14]) + ], # First request hits stop token + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output, model_output) + + # Verify first request stopped on custom token + self.assertEqual(len(scheduler.running), 1) + self.assertEqual(scheduler.running[0].request_id, + requests[1].request_id) + self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) + self.assertEqual(requests[0].stop_reason, 42) + self.assertIn(requests[0].request_id, scheduler.finished_req_ids) + self.assertEqual(list(requests[0].output_token_ids), [10, 42]) + self.assertEqual(list(requests[1].output_token_ids), [13, 14]) + + # Test case 3: Stop on max tokens + NUM_SPECULATIVE_TOKENS = 2 + scheduler = self.create_scheduler() + requests = create_requests(num_requests=2, max_tokens=2) + for req in requests: + req.num_computed_tokens = req.num_tokens + scheduler.requests[req.request_id] = req + scheduler.running.append(req) + req.status = RequestStatus.RUNNING + + scheduler_output = SchedulerOutput(scheduled_new_reqs=[], + scheduled_cached_reqs=[], + num_scheduled_tokens={ + requests[0].request_id: 3, + requests[1].request_id: 1 + }, + total_num_scheduled_tokens=4, + scheduled_encoder_inputs={}, + scheduled_spec_decode_tokens={ + requests[0].request_id: + [10, 11], + requests[1].request_id: [] + }, + num_common_prefix_blocks=0, + finished_req_ids=set(), + free_encoder_mm_hashes=[]) + model_output = ModelRunnerOutput( + req_ids=[req.request_id for req in requests], + req_id_to_index={ + req.request_id: i + for i, req in enumerate(requests) + }, + sampled_token_ids=[np.array([10, 11, 12]), + np.array([13]) + ], # First request exceeds max_tokens + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + scheduler.update_from_output(scheduler_output, model_output) + + # Verify first request stopped due to length + self.assertEqual(len(scheduler.running), 1) + self.assertEqual(scheduler.running[0].request_id, + requests[1].request_id) + self.assertEqual(requests[0].status, + RequestStatus.FINISHED_LENGTH_CAPPED) + self.assertIn(requests[0].request_id, scheduler.finished_req_ids) + self.assertEqual(list(requests[0].output_token_ids), [10, 11]) + self.assertEqual(list(requests[1].output_token_ids), [13]) + + # Test case 4: Ignore EOS flag + scheduler = self.create_scheduler() + requests = create_requests(num_requests=1, max_tokens=10) + requests[0].sampling_params.ignore_eos = True + requests[0].num_computed_tokens = requests[0].num_tokens + scheduler.requests[requests[0].request_id] = requests[0] + scheduler.running.append(requests[0]) + + scheduler_output = SchedulerOutput( + scheduled_new_reqs=[], + scheduled_cached_reqs=[], + num_scheduled_tokens={requests[0].request_id: 3}, + total_num_scheduled_tokens=3, + scheduled_encoder_inputs={}, + scheduled_spec_decode_tokens={ + requests[0].request_id: [EOS_TOKEN_ID, 10] + }, + num_common_prefix_blocks=0, + finished_req_ids=set(), + free_encoder_mm_hashes=[]) + model_output = ModelRunnerOutput( + req_ids=[requests[0].request_id], + req_id_to_index={requests[0].request_id: 0}, + sampled_token_ids=[np.array([EOS_TOKEN_ID, 10, 11])], + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output, model_output) + + # Verify request continues past EOS + self.assertEqual(len(scheduler.running), 1) + self.assertFalse(requests[0].is_finished()) + self.assertEqual(list(requests[0].output_token_ids), + [EOS_TOKEN_ID, 10, 11]) + + def test_schedule_concurrent_batches(self): + global MAX_NUM_BATCHED_TOKENS + global ENABLE_PREFIX_CACHING + global ENABLE_CHUNKED_PREFILL + global MAX_NUM_SEQS + global PROMPT_LOGPROBS + ENABLE_PREFIX_CACHING = None + MAX_NUM_BATCHED_TOKENS = 1024 + MAX_NUM_SEQS = 2 + ENABLE_CHUNKED_PREFILL = True + PROMPT_LOGPROBS = None + + enable_prefix_caching_list = [None, True] + prompt_logprobs_list = [None, 5] + + for i in range(len(enable_prefix_caching_list)): + ENABLE_PREFIX_CACHING = enable_prefix_caching_list[i] + PROMPT_LOGPROBS = prompt_logprobs_list[i] + scheduler = self.create_scheduler() + requests = create_requests( + num_requests=2, + num_tokens=512, + ) + + # Schedule the first request. + scheduler.add_request(requests[0]) + scheduler_output0 = scheduler.schedule() + self.assertEqual(len(scheduler_output0.scheduled_new_reqs), 1) + self.assertEqual( + scheduler_output0.num_scheduled_tokens[requests[0].request_id], + 512) + + # The first request is still running, so only schedule the second request. + scheduler.add_request(requests[1]) + scheduler_output1 = scheduler.schedule() + self.assertEqual(len(scheduler_output1.scheduled_new_reqs), 1) + self.assertEqual( + scheduler_output1.num_scheduled_tokens[requests[1].request_id], + 512) + + # Model output of the first request. + model_runner_output = ModelRunnerOutput( + req_ids=[requests[0].request_id], + req_id_to_index={requests[0].request_id: 0}, + sampled_token_ids=[np.array([0], dtype=np.int64)], + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output0, + model_runner_output) + + # Schedule the next step. + # The first request can be scheduled again while the second + # request is still running. + scheduler.schedule() + # Model output of the second request. + model_runner_output = ModelRunnerOutput( + req_ids=[requests[1].request_id], + req_id_to_index={requests[1].request_id: 0}, + sampled_token_ids=[np.array([0], dtype=np.int64)], + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output1, + model_runner_output) + + def test_schedule_spec_decoding_stats(self): + """Test scheduling behavior with speculative decoding. + + This test verifies that: + 1. Speculated tokens get scheduled correctly + 2. Spec decoding stats properly count number of draft and accepted tokens + """ + spec_tokens_list: List[List[List[int]]] = [[[1, 2, 3]], [[1, 2, 3]], + [[1, 2], [3]], [[1]], [[]], + [[1, 2, 3], [4, 5, 6]]] + output_tokens_list: List[List[List[int]]] = [ + [np.array([1, 2, 3, 4])], [np.array([1, 5])], + [np.array([1, 2, 5]), np.array([3, 4])], [np.array([1, 2])], + [np.array([5])], [np.array([1, 2, 7]), + np.array([4, 8])] + ] + expected_list: List[Tuple[int, int, + int, List[int]]] = [(1, 3, 3, [1, 1, 1]), + (1, 3, 1, [1, 0, 0]), + (2, 3, 3, [2, 1]), + (1, 1, 1, [1]), + (0, 0, 0, [0]), + (2, 6, 3, [2, 1, 0])] + + global NUM_SPECULATIVE_TOKENS + for idx in range(len(spec_tokens_list)): + spec_tokens = spec_tokens_list[idx] + output_tokens = output_tokens_list[idx] + expected = expected_list[idx] + num_spec_tokens = max(1, max(len(t) for t in spec_tokens)) + NUM_SPECULATIVE_TOKENS = num_spec_tokens + scheduler = self.create_scheduler() + requests = create_requests(num_requests=len(spec_tokens), + num_tokens=1) + req_ids = [] + req_to_index = {} + for i, request in enumerate(requests): + scheduler.add_request(request) + req_ids.append(request.request_id) + req_to_index[request.request_id] = i + + # Schedule a decode, which will also draft speculative tokens + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), len(requests)) + self.assertEqual(output.total_num_scheduled_tokens, len(requests)) + for i in range(len(requests)): + req_id = requests[i].request_id + self.assertEqual(output.num_scheduled_tokens[req_id], 1) + self.assertNotIn(req_id, output.scheduled_spec_decode_tokens) + + model_runner_output = ModelRunnerOutput( + req_ids=req_ids, + req_id_to_index=req_to_index, + sampled_token_ids=[ + np.array([0]) for _ in range(len(requests)) + ], + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + draft_token_ids = DraftTokenIds(req_ids, spec_tokens) + + engine_core_outputs = scheduler.update_from_output( + output, model_runner_output) + scheduler.update_draft_token_ids(draft_token_ids) + + for i in range(len(requests)): + running_req = scheduler.running[i] + # The prompt token + self.assertEqual(running_req.num_computed_tokens, 1) + # The prompt token and the sampled token + self.assertEqual(running_req.num_tokens, 2) + # The prompt token, the sampled token, and the speculated tokens + self.assertEqual(running_req.num_tokens_with_spec, + 2 + len(spec_tokens[i])) + + # No draft or accepted tokens counted yet + self.assertTrue( + not engine_core_outputs + or (engine_core_outputs[0].scheduler_stats.spec_decoding_stats + is None)) + + # Schedule the speculated tokens for validation + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), 0) + # The sampled token and speculated tokens + self.assertEqual( + output.total_num_scheduled_tokens, + len(requests) + sum(len(ids) for ids in spec_tokens)) + for i in range(len(requests)): + req_id = requests[i].request_id + self.assertEqual(output.num_scheduled_tokens[req_id], + 1 + len(spec_tokens[i])) + if spec_tokens[i]: + self.assertEqual( + len(output.scheduled_spec_decode_tokens[req_id]), + len(spec_tokens[i])) + else: + self.assertNotIn(req_id, + output.scheduled_spec_decode_tokens) + + model_runner_output = ModelRunnerOutput( + req_ids=req_ids, + req_id_to_index=req_to_index, + sampled_token_ids=output_tokens, + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + engine_core_outputs = scheduler.update_from_output( + output, model_runner_output) + + scheduler_stats = engine_core_outputs[0].scheduler_stats \ + if engine_core_outputs else None + if expected[0] == 0: + self.assertIsNone(scheduler_stats.spec_decoding_stats) + else: + self.assertIsNotNone(scheduler_stats.spec_decoding_stats) + stats = scheduler_stats.spec_decoding_stats + self.assertEqual(stats.num_drafts, expected[0]) + self.assertEqual(stats.num_draft_tokens, expected[1]) + self.assertEqual(stats.num_accepted_tokens, expected[2]) + self.assertEqual(stats.num_accepted_tokens_per_pos, + expected[3]) + + def assert_scheduler_empty(self, scheduler): + """Confirm the scheduler is "empty" - i.e. no leaks.""" + # Scheduler Metadata. + scheduler = self.create_scheduler() + self.assertEqual(len(scheduler.requests), 0) + self.assertEqual(len(scheduler.waiting), 0) + self.assertEqual(len(scheduler.running), 0) + self.assertEqual(len(scheduler.finished_req_ids), 0) + + # EncoderCacheManager. + self.assertEqual(len(scheduler.encoder_cache_manager.freed), 0) + self.assertEqual(len(scheduler.encoder_cache_manager.cached), 0) + + # KVCache Manager. + self.assertEqual( + len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. + req_to_blocks), 0) + self.assertEqual( + len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. + num_cached_block), 0) + num_free_blocks = (scheduler.kv_cache_manager.block_pool. + free_block_queue.num_free_blocks) + self.assertEqual( + num_free_blocks, + scheduler.kv_cache_manager.block_pool.num_gpu_blocks - 1) + + # NOTE(rob): just the ref count on blocks will be 0. The hash + # value, etc will remain since we lazily evict for prefix cache. + for block in scheduler.kv_cache_manager.block_pool.blocks: + self.assertEqual(block.ref_cnt, 0) + + def test_memory_leak(self): + """Test that we do not have a memory leak.""" + scheduler = self.create_scheduler() + NUM_REQUESTS = 5 + NUM_TOKENS = 10 + MAX_TOKENS = 10 + requests = create_requests(num_requests=NUM_REQUESTS, + num_tokens=NUM_TOKENS, + max_tokens=MAX_TOKENS) + + # Add each request. + for request in requests: + scheduler.add_request(request) + scheduler_output = scheduler.schedule() + model_runner_output = make_output(scheduler) + scheduler.update_from_output(scheduler_output, model_runner_output) + + # Iterate until done. + while True: + scheduler_output = scheduler.schedule() + if len(scheduler.running) == 0: + break + model_runner_output = make_output(scheduler) + scheduler.update_from_output(scheduler_output, model_runner_output) + + # Confirm no memory leak. + self.assert_scheduler_empty(scheduler) + + def test_scheduler_with_pd_transfer(self): + scheduler = self.create_scheduler() + scheduler.phase = "prefill" + requests = create_requests(num_requests=32) + for request in requests: + scheduler.add_request(request) + + # 1st iteration, move 16 requests from waiting to running for prefill + scheduler_output = scheduler.schedule() + model_runner_output = make_output(scheduler) + scheduler.update_from_output(scheduler_output, model_runner_output) + first_iter_prefilled_req_num = len(scheduler.running) + self.assertEqual(len(scheduler_output.scheduled_new_reqs), + scheduler.max_num_running_reqs) + self.assertEqual(scheduler_output.scheduled_cached_reqs.num_reqs, 0) + self.assertEqual(len(scheduler_output.finished_req_ids), 0) + + # 2nd iteration, move 16 prefilled requests to finished_prefill_reqs + # and move 16 requests from waiting to running for prefill + scheduler_output = scheduler.schedule() + model_runner_output = make_output(scheduler) + scheduler.update_from_output(scheduler_output, model_runner_output) + self.assertEqual(len(scheduler.finished_prefill_reqs), + first_iter_prefilled_req_num) + + # 3rd iteration, all requests prefilled, change scheduler phase to decode + scheduler_output = scheduler.schedule() + model_runner_output = make_output(scheduler) + scheduler.update_from_output(scheduler_output, model_runner_output) + self.assertEqual(scheduler.phase, "decode") + + +class TestSchedulerDynamicBatch(TestBase): + + @patch("vllm.config.ModelConfig.__post_init__", MagicMock()) + @patch("vllm.config.VllmConfig.__post_init__", MagicMock()) + @patch('vllm.v1.core.sched.scheduler.compute_encoder_budget') + def create_scheduler(self, mock_compute_encoder_budget): + mock_compute_encoder_budget.return_value = [100, 100] + use_kv_connector = False + block_size = 16 + + scheduler_config = SchedulerConfig( + max_num_seqs=16, + max_model_len=MAX_NUM_BATCHED_TOKENS, + long_prefill_token_threshold=LONG_PREFILL_TOKEN_THRESHOLD, + disable_chunked_mm_input=False, + enable_chunked_prefill=True, + max_num_batched_tokens=MAX_NUM_BATCHED_TOKENS, + ) + + scheduler_config.max_num_encoder_input_tokens = 10000 + scheduler_config.encoder_cache_size = 10000 + scheduler_config.chunked_prefill_enabled = True + scheduler_config.SLO_limits_for_dynamic_batch = 0 + + model_config = ModelConfig( + model=MODEL, + task="auto", + tokenizer=MODEL, + tokenizer_mode="auto", + trust_remote_code=True, + dtype="float16", + seed=42, + max_model_len=MAX_NUM_BATCHED_TOKENS, + ) + model_config.pooler_config = MagicMock() + model_config.multimodal_config = MagicMock() + model_config.hf_config = MagicMock() + model_config.hf_config.is_encoder_decoder = False + # Cache config, optionally force APC + kwargs_cache: Dict[str, + Any] = ({} if ENABLE_PREFIX_CACHING is None else { + 'enable_prefix_caching': + ENABLE_PREFIX_CACHING + }) + cache_config = CacheConfig( + block_size=block_size, + gpu_memory_utilization=0.9, + swap_space=0, + cache_dtype="auto", + **kwargs_cache, + ) + + kv_transfer_config = KVTransferConfig( + kv_connector="SharedStorageConnector", + kv_role="kv_both", + kv_connector_extra_config={"shared_storage_path": "local_storage"}, + ) if use_kv_connector else None + + speculative_config: Optional[SpeculativeConfig] = None + if NUM_SPECULATIVE_TOKENS is not None: + speculative_config = SpeculativeConfig( + model="ngram", num_speculative_tokens=NUM_SPECULATIVE_TOKENS) + + vllm_config = VllmConfig( + scheduler_config=scheduler_config, + model_config=model_config, + cache_config=cache_config, + kv_transfer_config=kv_transfer_config, + speculative_config=speculative_config, + ) + + kv_cache_config = KVCacheConfig( + num_blocks=10000, # A large number of blocks to hold all requests + kv_cache_tensors=[], + kv_cache_groups=[ + KVCacheGroupSpec(['layer'], + FullAttentionSpec(block_size, 1, 1, + torch.float32, False)) + ], + ) + cache_config.num_gpu_blocks = 10000 + + scheduler = SchedulerDynamicBatch( + vllm_config=vllm_config, + kv_cache_config=kv_cache_config, + log_stats=True, + structured_output_manager=MagicMock(spec=StructuredOutputManager), + ) + + should_advance = MagicMock() + should_advance.return_value = False + scheduler.structured_output_manager.should_advance = should_advance + + return scheduler + + def test_add_requests(self): + scheduler = self.create_scheduler() + requests = create_requests(num_requests=10) + + for i, request in enumerate(requests): + scheduler.add_request(request) + self.assertIn(request.request_id, scheduler.requests) + self.assertEqual(len(scheduler.waiting), i + 1) + + def test_finish_request(self): + scheduler = self.create_scheduler() + requests = create_requests(num_requests=10) + for request in requests: + scheduler.add_request(request) + + for i, request in enumerate(requests): + scheduler.finish_requests(request.request_id, + RequestStatus.FINISHED_ABORTED) + self.assertNotIn(request.request_id, scheduler.requests) + self.assertEqual(len(scheduler.waiting), 9 - i) + + def test_get_num_unfinished_requests(self): + scheduler = self.create_scheduler() + requests = create_requests(num_requests=10) + for request in requests: + scheduler.add_request(request) + + for i, request in enumerate(requests): + scheduler.finish_requests(request.request_id, + RequestStatus.FINISHED_STOPPED) + self.assertEqual(scheduler.get_num_unfinished_requests(), + len(requests) - i - 1) + + def test_schedule(self): + '''Test scheduling. + Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs + ''' + scheduler = self.create_scheduler() + scheduler.scheduler_config.chunked_prefill_enabled = True + requests = create_requests(num_requests=10) + for request in requests: + scheduler.add_request(request) + + # Test initial scheduling + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), len(requests)) + self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) + self.assertEqual(len(output.finished_req_ids), 0) + # Verify all requests are scheduled. + for req_id, num_tokens in output.num_scheduled_tokens.items(): + self.assertEqual(num_tokens, + len(requests[int(req_id)].prompt_token_ids)) + + # Verify requests moved from waiting to running + self.assertEqual(len(scheduler.waiting), 0) + self.assertEqual(len(scheduler.running), len(requests)) + for i, request in enumerate(requests): + self.assertEqual(scheduler.running[i], request) + + def test_schedule_multimodal_requests(self): + scheduler = self.create_scheduler() + scheduler.scheduler_config.chunked_prefill_enabled = True + mm_positions = [[PlaceholderRange(offset=i, length=10)] + for i in range(10)] + requests = create_requests( + num_requests=10, + mm_positions=mm_positions, + ) + for request in requests: + scheduler.add_request(request) + + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), len(requests)) + self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) + self.assertEqual(len(output.finished_req_ids), 0) + for req_id, num_tokens in output.num_scheduled_tokens.items(): + assert num_tokens == len(requests[int(req_id)].prompt_token_ids) + + # Verify all requests are scheduled. + for req_id, num_tokens in output.num_scheduled_tokens.items(): + self.assertEqual(num_tokens, + len(requests[int(req_id)].prompt_token_ids)) + self.assertEqual(len(output.scheduled_encoder_inputs), len(requests)) + for req_id, encoder_input in output.scheduled_encoder_inputs.items(): + assert len(encoder_input) == 1 + + # Verify requests moved from waiting to running + self.assertEqual(len(scheduler.waiting), 0) + self.assertEqual(len(scheduler.running), len(requests)) + for i, request in enumerate(requests): + self.assertEqual(scheduler.running[i], request) + + def test_schedule_enable_prefix_caching(self): + '''Test scheduling. + Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs + ''' + global ENABLE_PREFIX_CACHING + ENABLE_PREFIX_CACHING = True + global PROMPT_LOGPROBS + PROMPT_LOGPROBS = 5 + scheduler = self.create_scheduler() + scheduler.scheduler_config.chunked_prefill_enabled = False + requests = create_requests(num_requests=10) + for request in requests: + scheduler.add_request(request) + + # Test initial scheduling + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), len(requests)) + self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) + self.assertEqual(len(output.finished_req_ids), 0) + # Verify all requests are scheduled. + for req_id, num_tokens in output.num_scheduled_tokens.items(): + self.assertEqual(num_tokens, + len(requests[int(req_id)].prompt_token_ids)) + + # Verify requests moved from waiting to running + self.assertEqual(len(scheduler.waiting), 0) + self.assertEqual(len(scheduler.running), len(requests)) + for i, request in enumerate(requests): + self.assertEqual(scheduler.running[i], request) + + def test_stop_via_update_from_output(self): + """Test stopping behavior through update_from_output""" + global NUM_SPECULATIVE_TOKENS + NUM_SPECULATIVE_TOKENS = 1 + scheduler = self.create_scheduler() + + # Test case 1: Stop on EOS token + requests = create_requests(num_requests=2, max_tokens=10) + for req in requests: + req.num_computed_tokens = req.num_tokens + scheduler.requests[req.request_id] = req + scheduler.running.append(req) + req.status = RequestStatus.RUNNING + + scheduler_output = SchedulerOutput(scheduled_new_reqs=[], + scheduled_cached_reqs=[], + num_scheduled_tokens={ + requests[0].request_id: 1, + requests[1].request_id: 2 + }, + total_num_scheduled_tokens=3, + scheduled_encoder_inputs={}, + scheduled_spec_decode_tokens={ + requests[0].request_id: [], + requests[1].request_id: [10] + }, + num_common_prefix_blocks=0, + finished_req_ids=set(), + free_encoder_mm_hashes=[]) + model_output = ModelRunnerOutput( + req_ids=[req.request_id for req in requests], + req_id_to_index={ + req.request_id: i + for i, req in enumerate(requests) + }, + sampled_token_ids=[np.array([EOS_TOKEN_ID]), + np.array([10, 11]) + ], # First request hits EOS, second continues + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output, model_output) + + # Verify first request stopped, second continues + self.assertEqual(len(scheduler.running), 1) + self.assertEqual(scheduler.running[0].request_id, + requests[1].request_id) + self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) + self.assertIn(requests[0].request_id, scheduler.finished_req_ids) + self.assertEqual(list(requests[0].output_token_ids), [EOS_TOKEN_ID]) + self.assertEqual(list(requests[1].output_token_ids), [10, 11]) + + # Test case 2: Stop on custom stop token + NUM_SPECULATIVE_TOKENS = 2 + scheduler = self.create_scheduler() + requests = create_requests(num_requests=2, + max_tokens=10, + stop_token_ids=[42, 43]) + for req in requests: + req.num_computed_tokens = req.num_tokens + scheduler.requests[req.request_id] = req + scheduler.running.append(req) + req.status = RequestStatus.RUNNING + + scheduler_output = SchedulerOutput(scheduled_new_reqs=[], + scheduled_cached_reqs=[], + num_scheduled_tokens={ + requests[0].request_id: 3, + requests[1].request_id: 2 + }, + total_num_scheduled_tokens=5, + scheduled_encoder_inputs={}, + scheduled_spec_decode_tokens={ + requests[0].request_id: + [10, 42], + requests[1].request_id: [13] + }, + num_common_prefix_blocks=0, + finished_req_ids=set(), + free_encoder_mm_hashes=[]) + model_output = ModelRunnerOutput( + req_ids=[req.request_id for req in requests], + req_id_to_index={ + req.request_id: i + for i, req in enumerate(requests) + }, + sampled_token_ids=[np.array([10, 42, 12]), + np.array([13, 14]) + ], # First request hits stop token + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output, model_output) + + # Verify first request stopped on custom token + self.assertEqual(len(scheduler.running), 1) + self.assertEqual(scheduler.running[0].request_id, + requests[1].request_id) + self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) + self.assertEqual(requests[0].stop_reason, 42) + self.assertIn(requests[0].request_id, scheduler.finished_req_ids) + self.assertEqual(list(requests[0].output_token_ids), [10, 42]) + self.assertEqual(list(requests[1].output_token_ids), [13, 14]) + + # Test case 3: Stop on max tokens + NUM_SPECULATIVE_TOKENS = 2 + scheduler = self.create_scheduler() + requests = create_requests(num_requests=2, max_tokens=2) + for req in requests: + req.num_computed_tokens = req.num_tokens + scheduler.requests[req.request_id] = req + scheduler.running.append(req) + req.status = RequestStatus.RUNNING + + scheduler_output = SchedulerOutput(scheduled_new_reqs=[], + scheduled_cached_reqs=[], + num_scheduled_tokens={ + requests[0].request_id: 3, + requests[1].request_id: 1 + }, + total_num_scheduled_tokens=4, + scheduled_encoder_inputs={}, + scheduled_spec_decode_tokens={ + requests[0].request_id: + [10, 11], + requests[1].request_id: [] + }, + num_common_prefix_blocks=0, + finished_req_ids=set(), + free_encoder_mm_hashes=[]) + model_output = ModelRunnerOutput( + req_ids=[req.request_id for req in requests], + req_id_to_index={ + req.request_id: i + for i, req in enumerate(requests) + }, + sampled_token_ids=[np.array([10, 11, 12]), + np.array([13]) + ], # First request exceeds max_tokens + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + scheduler.update_from_output(scheduler_output, model_output) + + # Verify first request stopped due to length + self.assertEqual(len(scheduler.running), 1) + self.assertEqual(scheduler.running[0].request_id, + requests[1].request_id) + self.assertEqual(requests[0].status, + RequestStatus.FINISHED_LENGTH_CAPPED) + self.assertIn(requests[0].request_id, scheduler.finished_req_ids) + self.assertEqual(list(requests[0].output_token_ids), [10, 11]) + self.assertEqual(list(requests[1].output_token_ids), [13]) + + # Test case 4: Ignore EOS flag + scheduler = self.create_scheduler() + requests = create_requests(num_requests=1, max_tokens=10) + requests[0].sampling_params.ignore_eos = True + requests[0].num_computed_tokens = requests[0].num_tokens + scheduler.requests[requests[0].request_id] = requests[0] + scheduler.running.append(requests[0]) + + scheduler_output = SchedulerOutput( + scheduled_new_reqs=[], + scheduled_cached_reqs=[], + num_scheduled_tokens={requests[0].request_id: 3}, + total_num_scheduled_tokens=3, + scheduled_encoder_inputs={}, + scheduled_spec_decode_tokens={ + requests[0].request_id: [EOS_TOKEN_ID, 10] + }, + num_common_prefix_blocks=0, + finished_req_ids=set(), + free_encoder_mm_hashes=[]) + model_output = ModelRunnerOutput( + req_ids=[requests[0].request_id], + req_id_to_index={requests[0].request_id: 0}, + sampled_token_ids=[np.array([EOS_TOKEN_ID, 10, 11])], + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output, model_output) + + # Verify request continues past EOS + self.assertEqual(len(scheduler.running), 1) + self.assertFalse(requests[0].is_finished()) + self.assertEqual(list(requests[0].output_token_ids), + [EOS_TOKEN_ID, 10, 11]) + + def test_schedule_concurrent_batches(self): + global MAX_NUM_BATCHED_TOKENS + global ENABLE_PREFIX_CACHING + global ENABLE_CHUNKED_PREFILL + global MAX_NUM_SEQS + global PROMPT_LOGPROBS + ENABLE_PREFIX_CACHING = None + MAX_NUM_BATCHED_TOKENS = 1024 + MAX_NUM_SEQS = 2 + ENABLE_CHUNKED_PREFILL = True + PROMPT_LOGPROBS = None + + enable_prefix_caching_list = [None, True] + prompt_logprobs_list = [None, 5] + + for i in range(len(enable_prefix_caching_list)): + ENABLE_PREFIX_CACHING = enable_prefix_caching_list[i] + PROMPT_LOGPROBS = prompt_logprobs_list[i] + scheduler = self.create_scheduler() + requests = create_requests( + num_requests=2, + num_tokens=512, + ) + + # Schedule the first request. + scheduler.add_request(requests[0]) + scheduler_output0 = scheduler.schedule() + self.assertEqual(len(scheduler_output0.scheduled_new_reqs), 1) + self.assertEqual( + scheduler_output0.num_scheduled_tokens[requests[0].request_id], + 512) + + # The first request is still running, so only schedule the second request. + scheduler.add_request(requests[1]) + scheduler_output1 = scheduler.schedule() + self.assertEqual(len(scheduler_output1.scheduled_new_reqs), 1) + self.assertEqual( + scheduler_output1.num_scheduled_tokens[requests[1].request_id], + 512) + + # Model output of the first request. + model_runner_output = ModelRunnerOutput( + req_ids=[requests[0].request_id], + req_id_to_index={requests[0].request_id: 0}, + sampled_token_ids=[np.array([0])], + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output0, + model_runner_output) + + # Schedule the next step. + # The first request can be scheduled again while the second + # request is still running. + scheduler.schedule() + # Model output of the second request. + model_runner_output = ModelRunnerOutput( + req_ids=[requests[1].request_id], + req_id_to_index={requests[1].request_id: 0}, + sampled_token_ids=[np.array([0])], + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + scheduler.update_from_output(scheduler_output1, + model_runner_output) + + def test_schedule_spec_decoding_stats(self): + """Test scheduling behavior with speculative decoding. + + This test verifies that: + 1. Speculated tokens get scheduled correctly + 2. Spec decoding stats properly count number of draft and accepted tokens + """ + spec_tokens_list: List[List[List[int]]] = [[[1, 2, 3]], [[1, 2, 3]], + [[1, 2], [3]], [[1]], [[]], + [[1, 2, 3], [4, 5, 6]]] + output_tokens_list: List[List[List[int]]] = [ + [np.array([1, 2, 3, 4])], [np.array([1, 5])], + [np.array([1, 2, 5]), np.array([3, 4])], [np.array([1, 2])], + [np.array([5])], [np.array([1, 2, 7]), + np.array([4, 8])] + ] + expected_list: List[Tuple[int, int, + int, List[int]]] = [(1, 3, 3, [1, 1, 1]), + (1, 3, 1, [1, 0, 0]), + (2, 3, 3, [2, 1]), + (1, 1, 1, [1]), + (0, 0, 0, [0]), + (2, 6, 3, [2, 1, 0])] + + global NUM_SPECULATIVE_TOKENS + for idx in range(len(spec_tokens_list)): + spec_tokens = spec_tokens_list[idx] + output_tokens = output_tokens_list[idx] + expected = expected_list[idx] + num_spec_tokens = max(1, max(len(t) for t in spec_tokens)) + NUM_SPECULATIVE_TOKENS = num_spec_tokens + scheduler = self.create_scheduler() + requests = create_requests(num_requests=len(spec_tokens), + num_tokens=1) + req_ids = [] + req_to_index = {} + for i, request in enumerate(requests): + scheduler.add_request(request) + req_ids.append(request.request_id) + req_to_index[request.request_id] = i + + # Schedule a decode, which will also draft speculative tokens + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), len(requests)) + self.assertEqual(output.total_num_scheduled_tokens, len(requests)) + for i in range(len(requests)): + req_id = requests[i].request_id + self.assertEqual(output.num_scheduled_tokens[req_id], 1) + self.assertNotIn(req_id, output.scheduled_spec_decode_tokens) + + model_runner_output = ModelRunnerOutput( + req_ids=req_ids, + req_id_to_index=req_to_index, + sampled_token_ids=[ + np.array([0]) for _ in range(len(requests)) + ], + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + draft_token_ids = DraftTokenIds(req_ids, spec_tokens) + + engine_core_outputs = scheduler.update_from_output( + output, model_runner_output) + scheduler.update_draft_token_ids(draft_token_ids) + + for i in range(len(requests)): + running_req = scheduler.running[i] + # The prompt token + self.assertEqual(running_req.num_computed_tokens, 1) + # The prompt token and the sampled token + self.assertEqual(running_req.num_tokens, 2) + # The prompt token, the sampled token, and the speculated tokens + self.assertEqual(running_req.num_tokens_with_spec, + 2 + len(spec_tokens[i])) + + # No draft or accepted tokens counted yet + self.assertTrue( + not engine_core_outputs + or (engine_core_outputs[0].scheduler_stats.spec_decoding_stats + is None)) + + # Schedule the speculated tokens for validation + output = scheduler.schedule() + self.assertEqual(len(output.scheduled_new_reqs), 0) + # The sampled token and speculated tokens + self.assertEqual( + output.total_num_scheduled_tokens, + len(requests) + sum(len(ids) for ids in spec_tokens)) + for i in range(len(requests)): + req_id = requests[i].request_id + self.assertEqual(output.num_scheduled_tokens[req_id], + 1 + len(spec_tokens[i])) + if spec_tokens[i]: + self.assertEqual( + len(output.scheduled_spec_decode_tokens[req_id]), + len(spec_tokens[i])) + else: + self.assertNotIn(req_id, + output.scheduled_spec_decode_tokens) + + model_runner_output = ModelRunnerOutput( + req_ids=req_ids, + req_id_to_index=req_to_index, + sampled_token_ids=output_tokens, + logprobs=None, + prompt_logprobs_dict={}, + pooler_output=[]) + + engine_core_outputs = scheduler.update_from_output( + output, model_runner_output) + + scheduler_stats = engine_core_outputs[0].scheduler_stats \ + if engine_core_outputs else None + if expected[0] == 0: + self.assertIsNone(scheduler_stats.spec_decoding_stats) + else: + self.assertIsNotNone(scheduler_stats.spec_decoding_stats) + stats = scheduler_stats.spec_decoding_stats + self.assertEqual(stats.num_drafts, expected[0]) + self.assertEqual(stats.num_draft_tokens, expected[1]) + self.assertEqual(stats.num_accepted_tokens, expected[2]) + self.assertEqual(stats.num_accepted_tokens_per_pos, + expected[3]) + + def assert_scheduler_empty(self, scheduler): + """Confirm the scheduler is "empty" - i.e. no leaks.""" + # Scheduler Metadata. + scheduler = self.create_scheduler() + self.assertEqual(len(scheduler.requests), 0) + self.assertEqual(len(scheduler.waiting), 0) + self.assertEqual(len(scheduler.running), 0) + self.assertEqual(len(scheduler.finished_req_ids), 0) + + # EncoderCacheManager. + self.assertEqual(len(scheduler.encoder_cache_manager.freed), 0) + self.assertEqual(len(scheduler.encoder_cache_manager.cached), 0) + + # KVCache Manager. + self.assertEqual( + len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. + req_to_blocks), 0) + self.assertEqual( + len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. + num_cached_block), 0) + num_free_blocks = (scheduler.kv_cache_manager.block_pool. + free_block_queue.num_free_blocks) + self.assertEqual( + num_free_blocks, + scheduler.kv_cache_manager.block_pool.num_gpu_blocks - 1) + + # NOTE(rob): just the ref count on blocks will be 0. The hash + # value, etc will remain since we lazily evict for prefix cache. + for block in scheduler.kv_cache_manager.block_pool.blocks: + self.assertEqual(block.ref_cnt, 0) + + def test_memory_leak(self): + """Test that we do not have a memory leak.""" + scheduler = self.create_scheduler() + NUM_REQUESTS = 5 + NUM_TOKENS = 10 + MAX_TOKENS = 10 + requests = create_requests(num_requests=NUM_REQUESTS, + num_tokens=NUM_TOKENS, + max_tokens=MAX_TOKENS) + + # Add each request. + for request in requests: + scheduler.add_request(request) + scheduler_output = scheduler.schedule() + model_runner_output = make_output(scheduler) + scheduler.update_from_output(scheduler_output, model_runner_output) + + # Iterate until done. + while True: + scheduler_output = scheduler.schedule() + if len(scheduler.running) == 0: + break + model_runner_output = make_output(scheduler) + scheduler.update_from_output(scheduler_output, model_runner_output) + + # Confirm no memory leak. + self.assert_scheduler_empty(scheduler) diff --git a/tests/ut/ops/test_linear.py b/tests/ut/ops/test_linear.py index 2f30e4f05c2..1b3a7268fc6 100644 --- a/tests/ut/ops/test_linear.py +++ b/tests/ut/ops/test_linear.py @@ -99,6 +99,7 @@ def test_oproj_tp(self): ascend_config._ASCEND_CONFIG = MagicMock() ascend_config._ASCEND_CONFIG.oproj_tensor_parallel_size = 2 + ascend_config._ASCEND_CONFIG.ascend_scheduler_config.enabled = False linear = AscendRowParallelLinear( input_size=16, diff --git a/tests/ut/ops/test_vocab_parallel_embedding.py b/tests/ut/ops/test_vocab_parallel_embedding.py index 531df28140f..37ea1af11a6 100644 --- a/tests/ut/ops/test_vocab_parallel_embedding.py +++ b/tests/ut/ops/test_vocab_parallel_embedding.py @@ -209,7 +209,12 @@ def setUp(self): return_value=torch.randn(1, self.vocab_size)), patch( "vllm_ascend.ops.vocab_parallel_embedding.get_lmhead_tp_group.all_gather", - return_value=torch.randn(1, self.vocab_size)) + return_value=torch.randn(1, self.vocab_size)), + patch( + "vllm_ascend.core.schedule_config.AscendSchedulerConfig.initialize_from_config", + return_value=MagicMock(max_num_batched_tokens=1000, + max_model_len=512, + enable_chunked_prefill=False)) ] for p in self.patches: diff --git a/tests/ut/quantization/test_w8a8_dynamic.py b/tests/ut/quantization/test_w8a8_dynamic.py index 76d510dd7d7..f25192c28c0 100644 --- a/tests/ut/quantization/test_w8a8_dynamic.py +++ b/tests/ut/quantization/test_w8a8_dynamic.py @@ -33,6 +33,13 @@ def setUp(self, mock_get_ep_group, mock_get_ascend_config, mock_get_ep_group.return_value = mock_ep_group mock_ascend_config = Mock() + # 创建一个具有具体属性的 Mock 对象来表示 ascend_scheduler_config + mock_ascend_scheduler_config = Mock() + mock_ascend_scheduler_config.enabled = False + mock_ascend_scheduler_config.max_num_batched_tokens = 1024 + mock_ascend_scheduler_config.max_model_len = 2048 + mock_ascend_config.ascend_scheduler_config = mock_ascend_scheduler_config + mock_ascend_config.torchair_graph_config = Mock(enabled=False) mock_ascend_config.enable_chunked_prefill = False mock_get_ascend_config.return_value = mock_ascend_config diff --git a/tests/ut/test_ascend_config.py b/tests/ut/test_ascend_config.py index be066179f1d..718bc85f1a2 100644 --- a/tests/ut/test_ascend_config.py +++ b/tests/ut/test_ascend_config.py @@ -56,6 +56,9 @@ def test_init_ascend_config_without_additional_config(self): self.assertTrue(torchair_graph_config.enable_frozen_parameter) self.assertFalse(torchair_graph_config.enable_kv_nz) + ascend_scheduler_config = ascend_config.ascend_scheduler_config + self.assertFalse(ascend_scheduler_config.enabled) + @_clean_up_ascend_config def test_init_ascend_config_with_additional_config(self): test_vllm_config = VllmConfig() @@ -71,6 +74,9 @@ def test_init_ascend_config_with_additional_config(self): "enable_kv_nz": True }, "multistream_overlap_shared_expert": True, + "ascend_scheduler_config": { + "enabled": True + }, "expert_map_path": "test_expert_map_path", "refresh": True, } @@ -88,6 +94,9 @@ def test_init_ascend_config_with_additional_config(self): self.assertTrue(torchair_graph_config.enable_frozen_parameter) self.assertTrue(torchair_graph_config.enable_kv_nz) + ascend_scheduler_config = ascend_config.ascend_scheduler_config + self.assertTrue(ascend_scheduler_config.enabled) + @_clean_up_ascend_config def test_init_ascend_config_with_refresh(self): test_vllm_config = VllmConfig() diff --git a/tests/ut/test_platform.py b/tests/ut/test_platform.py index 6cc070b602e..5fe5cde3e80 100644 --- a/tests/ut/test_platform.py +++ b/tests/ut/test_platform.py @@ -32,6 +32,7 @@ def mock_vllm_config(): def mock_vllm_ascend_config(): mock_ascend_config = MagicMock() mock_ascend_config.torchair_graph_config.enabled = False + mock_ascend_config.ascend_scheduler_config.enabled = False mock_ascend_config.enable_shared_expert_dp = False return mock_ascend_config @@ -521,6 +522,31 @@ def test_check_and_update_config_310p_no_custom_ops( self.platform.check_and_update_config(vllm_config) self.assertEqual(vllm_config.compilation_config.custom_ops, []) + @patch('vllm_ascend.utils.get_ascend_device_type', + return_value=AscendDeviceType._910_93) + @patch("vllm_ascend.ascend_config.check_ascend_config") + @patch("vllm_ascend.ascend_config.init_ascend_config") + @patch( + "vllm_ascend.core.recompute_schedule_config.RecomputeSchedulerConfig.initialize_from_config" + ) + def test_check_and_update_config_ascend_scheduler_config( + self, mock_init_recompute, mock_init_ascend, mock_check_ascend, + mock_soc_version): + mock_ascend_config = TestNPUPlatform.mock_vllm_ascend_config() + mock_ascend_config.ascend_scheduler_config.enabled = True + mock_init_ascend.return_value = mock_ascend_config + vllm_config = TestNPUPlatform.mock_vllm_config() + vllm_config.parallel_config.tensor_parallel_size = 1 + mock_init_recompute.return_value = MagicMock() + + with patch("vllm_ascend.core.schedule_config.AscendSchedulerConfig" + ) as mock_scheduler: + from vllm_ascend import platform + + importlib.reload(platform) + self.platform.check_and_update_config(vllm_config) + mock_scheduler.initialize_from_config.assert_called_once() + @patch('vllm_ascend.platform.get_ascend_config') def test_get_attn_backend_cls_use_v1_and_mla(self, mock_get_ascend_config): mock_config = MagicMock() diff --git a/tests/ut/test_utils.py b/tests/ut/test_utils.py index 8ff1419edbb..29ed7b444e2 100644 --- a/tests/ut/test_utils.py +++ b/tests/ut/test_utils.py @@ -253,10 +253,12 @@ def test_update_aclgraph_sizes(self): model_path = os.path.join(os.path.dirname(__file__), "fake_weight") test_model_config = ModelConfig(model=model_path, enforce_eager=True) test_parallel_config = ParallelConfig() + ascend_config = {"ascend_scheduler_config": {"enabled": False}} test_vllm_config = VllmConfig( model_config=test_model_config, compilation_config=test_compilation_config, - parallel_config=test_parallel_config) + parallel_config=test_parallel_config, + additional_config=ascend_config) utils.update_aclgraph_sizes(test_vllm_config) os.environ['HCCL_OP_EXPANSION_MODE'] = 'AIV' utils.update_aclgraph_sizes(test_vllm_config) diff --git a/tests/ut/torchair/models/test_torchair_deepseek_v2.py b/tests/ut/torchair/models/test_torchair_deepseek_v2.py index e1a5625bf9c..35e1bb99a87 100644 --- a/tests/ut/torchair/models/test_torchair_deepseek_v2.py +++ b/tests/ut/torchair/models/test_torchair_deepseek_v2.py @@ -235,6 +235,8 @@ def test_torchair_deepseek_v2_mlp(mock_distributed, base_config): hidden_act="silu", quant_config=None) assert isinstance(mlp.act_fn, TorchairDeepseekV2SiluAndMul) + ascend_config = MagicMock() + ascend_config._ASCEND_CONFIG.ascend_scheduler_config.enabled = False with patch( "vllm_ascend.torchair.models.torchair_deepseek_v2.QuantizationConfig" ) as mock_quant_config: diff --git a/vllm_ascend/ascend_config.py b/vllm_ascend/ascend_config.py index f3c3deeddf0..16d16a4d7c8 100644 --- a/vllm_ascend/ascend_config.py +++ b/vllm_ascend/ascend_config.py @@ -39,6 +39,11 @@ def __init__(self, vllm_config): self.torchair_graph_config = TorchairGraphConfig( torchair_graph_config, vllm_config, additional_config) + ascend_scheduler_config = additional_config.get( + "ascend_scheduler_config", {}) + self.ascend_scheduler_config = AscendSchedulerConfig( + ascend_scheduler_config) + # Dump / PrecisionDebugger configuration dump_config_path = additional_config.get("dump_config", None) self.dump_config = DumpConfig(dump_config_path) @@ -215,6 +220,20 @@ def __init__(self, torchair_graph_config, vllm_config, additional_config): ) +class AscendSchedulerConfig: + """ + Configuration Object for ascend_scheduler_config from additional_config + """ + + def __init__(self, ascend_scheduler_config: dict): + self.enabled = ascend_scheduler_config.get("enabled", False) + # Ascend scheduler is based on vllm v0 scheduler, so we should support + # all vllm v0 scheduler configs as well. + for k, v in ascend_scheduler_config.items(): + if not hasattr(self, k): + setattr(self, k, v) + + class DumpConfig: """ Configuration object for dump/PrecisionDebugger settings. diff --git a/vllm_ascend/core/schedule_config.py b/vllm_ascend/core/schedule_config.py new file mode 100644 index 00000000000..32d63cbc402 --- /dev/null +++ b/vllm_ascend/core/schedule_config.py @@ -0,0 +1,105 @@ +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# This file is a part of the vllm-ascend project. +# + +from dataclasses import dataclass, fields +from typing import Type, Union + +from vllm.config import SchedulerConfig + +MAX_INT = 2147483647 + + +@dataclass +class AscendSchedulerConfig(SchedulerConfig): + enable_chunked_prefill: bool = False + max_long_partial_prefills: int = 1 + long_prefill_token_threshold: int = MAX_INT + policy: str = "fcfs" + scheduler_cls: Union[str, Type[object]] = ( + "vllm_ascend.core.scheduler.AscendScheduler") + enable_pd_transfer: bool = False + decode_max_num_seqs: int = 0 + + @classmethod + def initialize_from_config( + cls, + vllm_scheduler_config: SchedulerConfig, + ascend_scheduler_config, + ): + scheduler_config = { + field.name: getattr(vllm_scheduler_config, field.name) + for field in fields(vllm_scheduler_config) if field.init + } + # Override default values into original SchedulerConfig + scheduler_config["enable_chunked_prefill"] = False + scheduler_config["max_long_partial_prefills"] = None + scheduler_config["long_prefill_token_threshold"] = None + scheduler_config["policy"] = "fcfs" + scheduler_config["scheduler_cls"] = ( + "vllm_ascend.core.scheduler.AscendScheduler") + scheduler_config["enable_pd_transfer"] = False + scheduler_config["decode_max_num_seqs"] = 0 + # Override params in original SchedulerConfig with params in ascend_scheduler_config + for k, _ in scheduler_config.items(): + if hasattr(ascend_scheduler_config, k): + scheduler_config[k] = getattr(ascend_scheduler_config, k) + return cls(**scheduler_config) + + def __post_init__(self, *args) -> None: + self.max_num_encoder_input_tokens = self.max_num_batched_tokens + self.encoder_cache_size = self.max_num_batched_tokens + self.chunked_prefill_enabled = self.enable_chunked_prefill + if (self.max_num_batched_tokens < self.max_model_len + and not self.chunked_prefill_enabled): + raise ValueError( + "Ascend scheduler is enabled without chunked prefill feature. " + f"Argument max_num_batched_tokens ({self.max_num_batched_tokens}) is " + f"smaller than max_model_len ({self.max_model_len}). " + "This effectively limits the maximum sequence length to " + "max_num_batched_tokens and makes vLLM reject longer " + "sequences. Please increase max_num_batched_tokens or " + "decrease max_model_len.") + # concurrent partial prefills. Default is 1 meaning not enabled. + if self.max_long_partial_prefills is None: + self.max_long_partial_prefills = 1 + self.long_prefill_token_threshold = MAX_INT + + if self.long_prefill_token_threshold is None or \ + self.long_prefill_token_threshold <= 0: + if self.max_model_len is None: + self.long_prefill_token_threshold = MAX_INT + else: + self.long_prefill_token_threshold = \ + max(1, int(self.max_model_len * 0.04)) + + if self.max_long_partial_prefills < 0: + raise ValueError( + f"max_long_partial_prefills must be non-negative, but got " + f"{self.max_long_partial_prefills}") + if self.long_prefill_token_threshold < 0: + raise ValueError( + f"long_prefill_token_threshold must be non-negative, but got " + f"{self.long_prefill_token_threshold}") + + if self.policy != "fcfs": + raise NotImplementedError( + f"currently AscendScheduler only supports fcfs policy, got {self.policy}" + ) + if getattr(self, "scheduler_delay_factor", 0) > 0: + raise NotImplementedError( + "currently AscendScheduler doesn't support scheduler_delay_factor." + ) diff --git a/vllm_ascend/core/scheduler.py b/vllm_ascend/core/scheduler.py new file mode 100644 index 00000000000..800536d1568 --- /dev/null +++ b/vllm_ascend/core/scheduler.py @@ -0,0 +1,592 @@ +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# This file is a part of the vllm-ascend project. +# +import time +from collections import deque +from typing import Iterable, Optional, Union + +from vllm.config import VllmConfig +from vllm.distributed.kv_events import KVEventBatch +from vllm.logger import logger +from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalRegistry +from vllm.utils.math_utils import cdiv +from vllm.v1.core.kv_cache_manager import KVCacheBlocks +from vllm.v1.core.sched.output import NewRequestData, SchedulerOutput +from vllm.v1.core.sched.scheduler import Scheduler +from vllm.v1.engine import EngineCoreEventType, EngineCoreOutputs +from vllm.v1.kv_cache_interface import KVCacheConfig +from vllm.v1.outputs import ModelRunnerOutput +from vllm.v1.request import Request, RequestStatus +from vllm.v1.structured_output import StructuredOutputManager + + +class AscendScheduler(Scheduler): + """This Scheduler extends vllm's original v1 scheduler + with prefill-first scheduling strategy.""" + + def _initialize_common(self) -> None: + """Initialize common attributes shared across all versions.""" + self.scheduled_req_ids: set[str] = set() + self.running: list[Request] = [] + self.finished_prefill_reqs: deque[Request] = deque() + + enable_pd_transfer = getattr(self.scheduler_config, + 'enable_pd_transfer', False) + decode_max_num_seqs = getattr(self.scheduler_config, + 'decode_max_num_seqs', 0) + self.phase = "" if not enable_pd_transfer else "prefill" + self.decode_max_num_running_reqs = max(self.max_num_running_reqs, + decode_max_num_seqs) + + def __init__( + self, + vllm_config: VllmConfig, + kv_cache_config: KVCacheConfig, + structured_output_manager: StructuredOutputManager, + block_size: Optional[int] = None, + mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY, + include_finished_set: bool = False, + log_stats: bool = False, + ) -> None: + # Call the parent class's __init__ method + super().__init__(vllm_config, kv_cache_config, + structured_output_manager, block_size, mm_registry, + include_finished_set, log_stats) + + # Initialize common attributes + self._initialize_common() + + def schedule(self) -> SchedulerOutput: + if self.scheduler_config.chunked_prefill_enabled: + return super().schedule() + scheduled_new_reqs: list[Request] = [] + scheduled_resumed_reqs: list[Request] = [] + scheduled_running_reqs: list[Request] = [] + preempted_reqs: list[Request] = [] + + req_to_new_blocks: dict[str, KVCacheBlocks] = {} + num_scheduled_tokens: dict[str, int] = {} + token_budget = self.max_num_scheduled_tokens + + # Encoder-related. + scheduled_encoder_inputs: dict[str, list[int]] = {} + encoder_budget = self.max_num_encoder_input_tokens + + # Spec decode-related. + scheduled_spec_decode_tokens: dict[str, list[int]] = {} + + # For logging. + scheduled_timestamp = time.monotonic() + + # Record scheduled LoRA requests. + scheduled_loras: set[int] = set() + + # Use a temporary deque to collect requests that need to be skipped + # and put back at the head of the waiting queue later + skipped_waiting_requests: deque[Request] = deque() + + if self.phase == "prefill": + remaining_running_reqs = [] + for request in self.running: + # move request has finished prefill to finished_prefill_reqs + if request.num_tokens > request.num_prompt_tokens: + self.finished_prefill_reqs.append(request) + else: + remaining_running_reqs.append(request) + self.running = remaining_running_reqs + # all request prefilled, change phase to decode + if not self.waiting and not self.running: + self.phase = "decode" + # Skip long prompt requests in prefill stage. + # long_prefill_budget is float('inf') if not use. + if self.vllm_config.scheduler_config.long_prefill_token_threshold == 0: + long_prefill_budget = float('inf') + long_prefill_token_threshold = float('inf') + else: + long_prefill_budget = self.vllm_config.scheduler_config.max_long_partial_prefills + long_prefill_token_threshold = self.vllm_config.scheduler_config.long_prefill_token_threshold + + # Schedule prefill requests first. + while self.waiting and token_budget > 0: + if len(self.running) == (self.decode_max_num_running_reqs + if self.phase == "decode" else + self.max_num_running_reqs): + + break + + request = self.waiting[0] + + def skip_cur_request(): + self.waiting.popleft() + skipped_waiting_requests.appendleft(request) + + # P/D: skip request if still waiting for remote kvs. + if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: + is_ready = self._update_waiting_for_remote_kv(request) + if is_ready: + request.status = RequestStatus.WAITING + else: + skip_cur_request() + continue + + # Check that adding the request still respects the max_loras + # constraint. + if (self.lora_config and request.lora_request and + (len(scheduled_loras) == self.lora_config.max_loras + and request.lora_request.lora_int_id not in scheduled_loras)): + # Scheduling would exceed max_loras, skip. + skip_cur_request() + continue + + num_external_computed_tokens = 0 + load_kv_async = False + + # Get already-cached tokens. + if request.num_computed_tokens == 0: + new_computed_blocks, num_new_local_computed_tokens = \ + self.kv_cache_manager.get_computed_blocks( + request) + + # Get externally-cached tokens if using a KVConnector. + if self.connector is not None: + num_external_computed_tokens, load_kv_async = ( + self.connector.get_num_new_matched_tokens( + request, num_new_local_computed_tokens)) + + # Total computed tokens (local + external). + num_computed_tokens = (num_new_local_computed_tokens + + num_external_computed_tokens) + else: + # P/D: skip checking prefix cache if loaded from remote kvs. + new_computed_blocks = ( + self.kv_cache_manager.create_empty_block_list()) + num_new_local_computed_tokens = 0 + num_computed_tokens = request.num_computed_tokens + + encoder_inputs_to_schedule = None + new_encoder_budget = encoder_budget + + # P/D: loading remote KV, do not allocate for new work. + if load_kv_async: + assert num_external_computed_tokens > 0 + num_new_tokens = 0 + blocks = None + # Number of tokens to be scheduled. + else: + prompt_limit = self._get_prompt_limit(request) + # We use `request.num_tokens` instead of + # `request.num_prompt_tokens` to consider the resumed + # requests, which have output tokens. + num_new_tokens = request.num_tokens - num_computed_tokens + max_tokens_in_kvcache = (self.kv_cache_config.num_blocks * + self.block_size) + prompt_limit = min(prompt_limit, max_tokens_in_kvcache) + + # Finish request that exceeds prompt_limit or kv cache size. + if num_new_tokens > prompt_limit: + logger.warning( + "Input prompt (%d tokens) is too long" + " and exceeds limit of %d", + num_new_tokens, + prompt_limit, + ) + request.status = RequestStatus.FINISHED_IGNORED + self.finished_req_ids.add( # type: ignore + request.request_id) # type: ignore + self.waiting.popleft() + continue + + if num_new_tokens > token_budget: + # Scheduling would exceed token_budget, skip. + skip_cur_request() + continue + assert num_new_tokens > 0 + blocks = new_computed_blocks.blocks[0] + + # Schedule encoder inputs. + if request.has_encoder_inputs: + (encoder_inputs_to_schedule, num_new_tokens, + new_encoder_budget, + _) = self._try_schedule_encoder_inputs( + request, num_computed_tokens, num_new_tokens, + encoder_budget) + if num_new_tokens == 0 or len( + encoder_inputs_to_schedule) == 0: + # The request cannot be scheduled. + break + + watermark = getattr(self.scheduler_config, "watermark", 0.01) + if not self._check_watermark_for_prefill(request, num_new_tokens, + blocks, watermark): + # Scheduling would exceed watermark, skip. + skip_cur_request() + continue + + if num_new_tokens > long_prefill_token_threshold \ + and long_prefill_budget <= 0: + skip_cur_request() + continue + + new_blocks = self.kv_cache_manager.allocate_slots( + request, + num_new_tokens + num_external_computed_tokens, + num_new_local_computed_tokens, + new_computed_blocks=new_computed_blocks, + num_lookahead_tokens=self.num_lookahead_tokens, + delay_cache_blocks=load_kv_async) + if new_blocks is None: + # The request cannot be scheduled. + break + + # KVConnector: update internal state after allocation. + # This information is used to determine if a load is + # needed for this request. + if self.connector is not None: + self.connector.update_state_after_alloc( + request, + new_computed_blocks + new_blocks, + num_external_computed_tokens, + ) + + self.waiting.popleft() + if load_kv_async: + # If loading async, allocate memory and put request + # into the WAITING_FOR_REMOTE_KV state. + skipped_waiting_requests.appendleft(request) + request.status = RequestStatus.WAITING_FOR_REMOTE_KVS + continue + + self.running.append(request) + if self.log_stats: + request.record_event(EngineCoreEventType.SCHEDULED, + scheduled_timestamp) + self.scheduled_req_ids.add(request.request_id) + # Check request status. + if request.status == RequestStatus.WAITING: + scheduled_new_reqs.append(request) + elif request.status == RequestStatus.PREEMPTED: + scheduled_resumed_reqs.append(request) + else: + raise RuntimeError(f"Invalid request status: {request.status}") + + if self.lora_config and request.lora_request: + scheduled_loras.add(request.lora_request.lora_int_id) + + req_to_new_blocks[ + request.request_id] = self.kv_cache_manager.get_blocks( + request.request_id) + # Update request info. + num_scheduled_tokens[request.request_id] = num_new_tokens + token_budget -= num_new_tokens + if num_new_tokens > long_prefill_token_threshold: + long_prefill_budget -= 1 + request.status = RequestStatus.RUNNING + request.num_computed_tokens = num_computed_tokens + # Count the number of prefix cached tokens. + if request.num_cached_tokens < 0: + request.num_cached_tokens = num_computed_tokens + + # Encoder-related. + if encoder_inputs_to_schedule: + scheduled_encoder_inputs[request.request_id] = ( + encoder_inputs_to_schedule) + # Allocate the encoder cache. + for i in encoder_inputs_to_schedule: + self.encoder_cache_manager.allocate(request, i) + encoder_budget = new_encoder_budget + + # Put back any skipped requests at the head of the waiting queue + if skipped_waiting_requests: + self.waiting.extendleft(skipped_waiting_requests) + + if self.phase == "decode": + while len( + self.running + ) < self.decode_max_num_running_reqs and self.finished_prefill_reqs: + request = self.finished_prefill_reqs.popleft() + self.running.append(request) + + # If no prefill requests are scheduled, + # Schedule decode requests next. + if len(self.scheduled_req_ids) == 0: + req_index = 0 + while req_index < len(self.running) and token_budget > 0: + request = self.running[req_index] + if request.request_id in self.scheduled_req_ids: + # This request has already been scheduled. + req_index += 1 + continue + + num_new_tokens = (request.num_tokens_with_spec - + request.num_computed_tokens) + assert (request.num_tokens - request.num_computed_tokens) == 1 + num_new_tokens = min(num_new_tokens, token_budget) + # Make sure the input position does not exceed the max model len. + # This is necessary when using spec decoding. + num_new_tokens = min( + num_new_tokens, + self.max_model_len - request.num_computed_tokens) + + # Schedule encoder inputs. + encoder_inputs_to_schedule = None + new_encoder_budget = encoder_budget + if request.has_encoder_inputs: + (encoder_inputs_to_schedule, num_new_tokens, + new_encoder_budget) = self._try_schedule_encoder_inputs( + request, request.num_computed_tokens, num_new_tokens, + encoder_budget) + + # Check that adding the request still respects the max_loras + # constraint. + if self.lora_config and request.lora_request and ( + len(scheduled_loras) == self.lora_config.max_loras + and request.lora_request.lora_int_id + not in scheduled_loras): + # Scheduling would exceed max_loras, skip. + num_new_tokens = 0 + + if num_new_tokens == 0: + # The request cannot be scheduled because one of the following + # reason: + # 1. No new tokens to schedule. This may happen when PP>1 and + # we have already scheduled all prompt tokens but they are + # not finished yet. + # 2. Adding the request exceeds the max_loras constraint. + # NOTE(woosuk): Here, by doing `continue` instead of `break`, + # we do not strictly follow the FCFS scheduling policy and + # allow the lower-priority requests to be scheduled. + req_index += 1 + continue + + while True: + new_blocks = self.kv_cache_manager.allocate_slots( + request, + num_new_tokens, + num_lookahead_tokens=self.num_lookahead_tokens) + if new_blocks is None: + # The request cannot be scheduled. + # Preempt the lowest-priority request. + preempted_req = self.running.pop() + self.kv_cache_manager.free(preempted_req) + preempted_req.status = RequestStatus.PREEMPTED + preempted_req.num_computed_tokens = 0 + if self.log_stats: + preempted_req.record_event( + EngineCoreEventType.PREEMPTED, + scheduled_timestamp) + self.waiting.appendleft(preempted_req) + preempted_reqs.append(preempted_req) + if preempted_req == request: + # No more request to preempt. + can_schedule = False + break + else: + # The request can be scheduled. + can_schedule = True + break + if not can_schedule: + break + assert new_blocks is not None + + # Schedule the request. + scheduled_running_reqs.append(request) + self.scheduled_req_ids.add(request.request_id) + req_to_new_blocks[request.request_id] = new_blocks + num_scheduled_tokens[request.request_id] = num_new_tokens + token_budget -= num_new_tokens + req_index += 1 + + # Speculative decode related. + if request.spec_token_ids: + num_scheduled_spec_tokens = (num_new_tokens + + request.num_computed_tokens - + request.num_tokens) + if num_scheduled_spec_tokens > 0: + # Trim spec_token_ids list to num_scheduled_spec_tokens. + del request.spec_token_ids[num_scheduled_spec_tokens:] + scheduled_spec_decode_tokens[request.request_id] = ( + request.spec_token_ids) + + # Encoder-related. + if encoder_inputs_to_schedule: + scheduled_encoder_inputs[request.request_id] = ( + encoder_inputs_to_schedule) + # Allocate the encoder cache. + for i in encoder_inputs_to_schedule: + self.encoder_cache_manager.allocate(request, i) + encoder_budget = new_encoder_budget + + # Record scheduled LoRA requests. + if self.lora_config and request.lora_request: + scheduled_loras.add(request.lora_request.lora_int_id) + + # Check if the scheduling constraints are satisfied. + total_num_scheduled_tokens = sum(num_scheduled_tokens.values()) + assert total_num_scheduled_tokens <= self.max_num_scheduled_tokens + assert token_budget >= 0 + assert len( + self.running + ) <= self.decode_max_num_running_reqs if self.phase == "decode" else self.max_num_running_reqs + assert len(scheduled_new_reqs) + len(scheduled_resumed_reqs) + len( + scheduled_running_reqs) <= len(self.running) + + # Get the longest common prefix among all requests in the running queue. + # This can be potentially used for cascade attention. + num_common_prefix_blocks = [0] * len( + self.kv_cache_config.kv_cache_groups) + if self.running: + any_request = self.running[0] + num_common_prefix_blocks = ( + self.kv_cache_manager.get_num_common_prefix_blocks( + any_request.request_id)) + + # Construct the scheduler output. + new_reqs_data = [ + NewRequestData.from_request( + req, req_to_new_blocks[req.request_id].get_block_ids()) + for req in scheduled_new_reqs + ] + + cached_reqs_data = self._make_cached_request_data( + scheduled_running_reqs, scheduled_resumed_reqs, + num_scheduled_tokens, scheduled_spec_decode_tokens, + req_to_new_blocks) + scheduled_cached_reqs = cached_reqs_data + scheduler_output = SchedulerOutput( + scheduled_new_reqs=new_reqs_data, + scheduled_cached_reqs=scheduled_cached_reqs, + num_scheduled_tokens=num_scheduled_tokens, + total_num_scheduled_tokens=total_num_scheduled_tokens, + scheduled_spec_decode_tokens=scheduled_spec_decode_tokens, + scheduled_encoder_inputs=scheduled_encoder_inputs, + num_common_prefix_blocks=num_common_prefix_blocks, + # finished_req_ids is an existing state in the scheduler, + # instead of being newly scheduled in this step. + # It contains the request IDs that are finished in between + # the previous and the current steps. + finished_req_ids=self.finished_req_ids, # type: ignore + free_encoder_mm_hashes=self.encoder_cache_manager. + get_freed_mm_hashes(), + ) + # NOTE(Kuntai): this function is designed for multiple purposes: + # 1. Plan the KV cache store + # 2. Wrap up all the KV cache load / save ops into an opaque object + # 3. Clear the internal states of the connector + if self.connector is not None: + meta = self.connector.build_connector_meta(scheduler_output) + scheduler_output.kv_connector_metadata = meta + + events = self.kv_cache_manager.take_events() + if events: + batch = KVEventBatch(ts=time.time(), events=events) + self.kv_event_publisher.publish(batch) + + # Advance the number of computed tokens for the request AFTER + # the request is scheduled. + # 1. The scheduler_output of the current step has to include the + # original number of scheduled tokens to determine input IDs. + # 2. Advance the number of computed tokens here allowing us to + # schedule the prefill request again immediately in the next + # scheduling step. + # 3. If some tokens (e.g. spec tokens) are rejected later, the number of + # computed tokens will be adjusted in update_from_output. + for req_id, num_scheduled_token in num_scheduled_tokens.items(): + self.requests[req_id].num_computed_tokens += num_scheduled_token + + self.finished_req_ids = set() # type: ignore + return scheduler_output + + def _check_watermark_for_prefill(self, + request, + num_new_tokens, + computed_blocks, + watermark=0.01): + computed_blocks = computed_blocks or [] + watermark_blocks = self.kv_cache_config.num_blocks * watermark + num_computed_tokens = (request.num_computed_tokens + + len(computed_blocks) * self.block_size) + num_required_blocks = cdiv(num_new_tokens + num_computed_tokens, + self.block_size) + req_blocks = self.kv_cache_manager.coordinator.get_blocks( + request.request_id) + num_new_blocks = (num_required_blocks - len(req_blocks[0]) - + len(computed_blocks)) + num_evictable_computed_blocks = sum(1 for blk in computed_blocks + if blk.ref_cnt == 0) + # If number of free blocks is less than water mark after allocating, don't allocate. + if (self.kv_cache_manager.block_pool.get_num_free_blocks() - + num_evictable_computed_blocks - + num_new_blocks) < watermark_blocks: + return False + return True + + def _get_prompt_limit(self, request: Request) -> int: + if (self.scheduler_config.chunked_prefill_enabled + and not self.scheduler_config.is_multi_step): + prompt_limit = self.vllm_config.model_config.max_model_len + else: + prompt_limit = min( + self.vllm_config.model_config.max_model_len, + self.scheduler_config.max_num_batched_tokens, + ) + + # Model is fine tuned with long context. Return the fine tuned max_len. + if request.lora_request and request.lora_request.long_lora_max_len: + assert prompt_limit <= request.lora_request.long_lora_max_len + return request.lora_request.long_lora_max_len + else: + return prompt_limit + + def finish_requests( + self, + request_ids: Union[str, Iterable[str]], + finished_status: RequestStatus, + ) -> None: + """Handles the finish signal from outside the scheduler. + + For example, the API server can abort a request when the client + disconnects. + """ + for req_id in request_ids: + request = self.requests.get(req_id) + if request is None: + # Invalid request ID. + continue + if request.status == RequestStatus.RUNNING: + self.scheduled_req_ids.discard(request.request_id) + super().finish_requests(request_ids, finished_status) + + def update_from_output( + self, + scheduler_output: SchedulerOutput, + model_runner_output: ModelRunnerOutput, + ) -> EngineCoreOutputs: + num_scheduled_tokens = scheduler_output.num_scheduled_tokens + + # NOTE(woosuk): As len(self.running) can be up to 1K or more, the below + # loop can be a performance bottleneck. We should do our best to avoid + # expensive operations inside the loop. + for request in self.running: + req_id = request.request_id + num_tokens_scheduled = num_scheduled_tokens.get(req_id, 0) + if num_tokens_scheduled == 0: + # The request was not scheduled in this step. + continue + if req_id in self.scheduled_req_ids: + self.scheduled_req_ids.remove(req_id) + + return super().update_from_output(scheduler_output, + model_runner_output) diff --git a/vllm_ascend/platform.py b/vllm_ascend/platform.py index c7bbe390366..7cc84fc6ae3 100644 --- a/vllm_ascend/platform.py +++ b/vllm_ascend/platform.py @@ -153,6 +153,7 @@ def check_and_update_config(cls, vllm_config: VllmConfig) -> None: model_config = vllm_config.model_config parallel_config = vllm_config.parallel_config cache_config = vllm_config.cache_config + ascend_scheduler_config = ascend_config.ascend_scheduler_config kv_cache_dtype = vllm_config.additional_config.get( "kv_cache_dtype", None) @@ -290,23 +291,35 @@ def check_and_update_config(cls, vllm_config: VllmConfig) -> None: if cache_config: if cache_config.block_size is None: cache_config.block_size = 128 - # ignore block size check if model is qwen3-next - # TODO(MengqingCao): Remove the model_type check, after resolving the hidden error in get_kv_cache_groups. - if not (model_config - and model_config.hf_config.model_type == "qwen3_next"): - # we must set block size to 128 if prefix caching is enabled or chunked prefill is enabled - if cache_config.enable_prefix_caching or \ - (vllm_config.scheduler_config and vllm_config.scheduler_config.enable_chunked_prefill): - if cache_config.block_size != 128: - logger.warning( - "block size must be set to 128 on NPU platform.") - cache_config.block_size = 128 + + if cache_config.enable_prefix_caching or \ + not ascend_scheduler_config.enabled or \ + getattr(ascend_scheduler_config, "enable_chunked_prefill", False): + logger.warning( + "If chunked prefill or prefix caching is enabled, block size must be set to 128." + ) + origin_block_size = cache_config.block_size + cache_config.block_size = 128 + # TODO(MengqingCao): Remove the model_type check, after resolving the hidden error in get_kv_cache_groups. + if model_config and model_config.hf_config.model_type == "qwen3_next": + logger.warning( + "When running qwen3-next model, block_size needs to be restored to its original value." + ) + cache_config.block_size = origin_block_size # Activate custom ops for v1, except on 310P if get_ascend_device_type() != AscendDeviceType._310P: compilation_config.custom_ops = ["all"] - if ascend_config.recompute_scheduler_enable: + # If ascend_scheduler_config is enabled, + # extents original scheduler_config to use AscendScheduler. + if ascend_config.ascend_scheduler_config.enabled: + from vllm_ascend.core.schedule_config import AscendSchedulerConfig + ascend_scheduler_config = AscendSchedulerConfig.initialize_from_config( + vllm_config.scheduler_config, + ascend_config.ascend_scheduler_config) + vllm_config.scheduler_config = ascend_scheduler_config + elif ascend_config.recompute_scheduler_enable: from vllm_ascend.core.recompute_schedule_config import \ RecomputeSchedulerConfig recompute_scheduler_config = RecomputeSchedulerConfig.initialize_from_config( diff --git a/vllm_ascend/profiling_config.py b/vllm_ascend/profiling_config.py index 8e0dfadfded..b682593334f 100644 --- a/vllm_ascend/profiling_config.py +++ b/vllm_ascend/profiling_config.py @@ -44,6 +44,11 @@ handler: msserviceprofiler.vllm_profiler.vllm_v1.batch_hookers:schedule name: batchFrameworkProcessing +- symbol: vllm_ascend.core.scheduler:AscendScheduler.schedule + min_version: "0.9.1" + handler: msserviceprofiler.vllm_profiler.vllm_v1.batch_hookers:schedule + name: batchFrameworkProcessing + - symbol: vllm.v1.core.sched.scheduler:Scheduler._free_request min_version: "0.9.1" handler: msserviceprofiler.vllm_profiler.vllm_v1.batch_hookers:free_request diff --git a/vllm_ascend/torchair/torchair_attention.py b/vllm_ascend/torchair/torchair_attention.py index 086a4dff14f..16fcb385c8d 100644 --- a/vllm_ascend/torchair/torchair_attention.py +++ b/vllm_ascend/torchair/torchair_attention.py @@ -451,7 +451,8 @@ def forward( else: raise NotImplementedError( "Torchair graph mode with non-MLA attention backend is still experimental." - "v1 scheduler(chunked prefill) is not supported at this moment. " - ) + "v1 scheduler(chunked prefill) is not supported at this moment. Please" + "setting 'ascend_scheduler_config':{'enabled':true} in additional_config" + "to use ascend scheduler.") return output.view(num_tokens, self.hidden_size) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 404192654f2..2e7c4ea299b 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -330,6 +330,10 @@ def __init__(self, vllm_config: VllmConfig, device: torch.device): # Ascend-specific configurations self.ascend_config = get_ascend_config() + if self.ascend_config.ascend_scheduler_config.enabled: + self.chunked_prefill_enabled = self.scheduler_config.chunked_prefill_enabled + else: + self.chunked_prefill_enabled = True self.weight_prefetch_method = WeightPrefetchMethod( self.ascend_config.weight_prefetch_config) # Dump / PrecisionDebugger configuration now comes from AscendConfig @@ -1938,6 +1942,7 @@ def _generate_process_reqs_hidden_states(self, attn_metadata, with_prefill, def _build_attn_state(self, num_reqs, num_scheduled_tokens, num_valid_tokens): + ascend_config = get_ascend_config() if np.array_equal(self.seq_lens_np[:num_reqs], num_scheduled_tokens): attn_state = AscendAttentionState.PrefillNoCache # We assume it is the decode stage, where prefill occurs but only one token is not hit in cache. @@ -1954,7 +1959,7 @@ def _build_attn_state(self, num_reqs, num_scheduled_tokens, else: attn_state = AscendAttentionState.ChunkedPrefill # splitfuse - elif self.scheduler_config.enable_chunked_prefill: + elif not ascend_config.ascend_scheduler_config.enabled or self.chunked_prefill_enabled: attn_state = AscendAttentionState.ChunkedPrefill else: attn_state = AscendAttentionState.PrefillCacheHit