From 67199afddcf498a4c848b52d9122164df4594def Mon Sep 17 00:00:00 2001 From: Isotr0py <2037008807@qq.com> Date: Sun, 15 Jun 2025 01:56:45 +0800 Subject: [PATCH 1/4] fix tp with flex attn Signed-off-by: Isotr0py <2037008807@qq.com> --- vllm/v1/engine/core.py | 2 ++ vllm/v1/worker/gpu_worker.py | 5 +++++ vllm/v1/worker/tpu_worker.py | 5 +++++ 3 files changed, 12 insertions(+) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index f36a491a1970..e150c5f1346d 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -84,6 +84,8 @@ def __init__(self, vllm_config.cache_config.num_gpu_blocks = num_gpu_blocks vllm_config.cache_config.num_cpu_blocks = num_cpu_blocks + self.collective_rpc("initialize_cache", + args=(num_gpu_blocks, num_cpu_blocks)) self.structured_output_manager = StructuredOutputManager(vllm_config) diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index b7d244f27045..58795e3fe292 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -112,6 +112,11 @@ def wake_up(self, tags: Optional[list[str]] = None) -> None: buffer.data.copy_(self._sleep_saved_buffers[name].data) self._sleep_saved_buffers = {} + def initialize_cache(self, num_gpu_blocks: int, + num_cpu_blocks: int) -> None: + self.cache_config.num_gpu_blocks = num_gpu_blocks + self.cache_config.num_cpu_blocks = num_cpu_blocks + def init_device(self): if self.device_config.device.type == "cuda": # torch.distributed.all_reduce does not free the input tensor until diff --git a/vllm/v1/worker/tpu_worker.py b/vllm/v1/worker/tpu_worker.py index 5da481baeeea..87af8e476707 100644 --- a/vllm/v1/worker/tpu_worker.py +++ b/vllm/v1/worker/tpu_worker.py @@ -93,6 +93,11 @@ def __init__( if self.model_config.seed is None: self.model_config.seed = 0 + def initialize_cache(self, num_gpu_blocks: int, + num_cpu_blocks: int) -> None: + self.cache_config.num_gpu_blocks = num_gpu_blocks + self.cache_config.num_cpu_blocks = num_cpu_blocks + def init_device(self): os.environ["PJRT_DEVICE"] = "TPU" # Note: Currently the XLA compiler wrongly uses 2D ring strategy on 1D From 55d41b71fde8c7efa752d4896ece2fdc378c233b Mon Sep 17 00:00:00 2001 From: Isotr0py <2037008807@qq.com> Date: Sun, 15 Jun 2025 02:05:36 +0800 Subject: [PATCH 2/4] disable compile for tp Signed-off-by: Isotr0py <2037008807@qq.com> --- vllm/v1/attention/backends/flex_attention.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/vllm/v1/attention/backends/flex_attention.py b/vllm/v1/attention/backends/flex_attention.py index c8cb1481c8b4..7415d1867cbc 100644 --- a/vllm/v1/attention/backends/flex_attention.py +++ b/vllm/v1/attention/backends/flex_attention.py @@ -13,6 +13,7 @@ from vllm.attention.backends.abstract import (AttentionBackend, AttentionImpl, AttentionMetadata, AttentionType, is_quantized_kv_cache) +from vllm.distributed import get_tensor_model_parallel_world_size from vllm.logger import init_logger from vllm.platforms import current_platform from vllm.v1.attention.backends.utils import (AttentionMetadataBuilder, @@ -236,7 +237,12 @@ def final_mask_mod( def build_block_mask(self) -> BlockMask: assert self.mask_mod is not None - return create_block_mask_compiled( + # FIXME: With TP>1, create_block_mask_compiled will raise + # CUDA error: an illegal memory access was encountered + create_block_mask_fn = (create_block_mask_compiled + if get_tensor_model_parallel_world_size() == 1 + else create_block_mask) + return create_block_mask_fn( self.mask_mod, None, None, From b4f110f6403b34cd315d9f9b2dbbacbb4af52118 Mon Sep 17 00:00:00 2001 From: Isotr0py <2037008807@qq.com> Date: Sun, 15 Jun 2025 16:38:08 +0800 Subject: [PATCH 3/4] add engine core tp test Signed-off-by: Isotr0py <2037008807@qq.com> --- tests/v1/engine/test_engine_core.py | 32 ++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/v1/engine/test_engine_core.py b/tests/v1/engine/test_engine_core.py index fbbfc630d27d..031cb13057cb 100644 --- a/tests/v1/engine/test_engine_core.py +++ b/tests/v1/engine/test_engine_core.py @@ -19,7 +19,7 @@ from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.outputs import ModelRunnerOutput -from ...utils import create_new_process_for_each_test +from ...utils import create_new_process_for_each_test, multi_gpu_test if not current_platform.is_cuda(): pytest.skip(reason="V1 currently only supported on CUDA.", @@ -378,3 +378,33 @@ def shutdown(self): # Odd steps schedules a new batch. assert output is None step += 1 + + +@multi_gpu_test(num_gpus=2) +def test_engine_core_tp(monkeypatch: pytest.MonkeyPatch): + """ + Test engine can initialize worker in tp properly + """ + + with monkeypatch.context() as m: + m.setenv("VLLM_USE_V1", "1") + """Setup the EngineCore.""" + engine_args = EngineArgs( + model=MODEL_NAME, + tensor_parallel_size=2, + # Reduce startup time. + enforce_eager=True, + ) + vllm_config = engine_args.create_engine_config() + executor_class = Executor.get_class(vllm_config) + + with set_default_torch_num_threads(1): + engine_core = EngineCore(vllm_config=vllm_config, + executor_class=executor_class, + log_stats=True) + + def get_worker_num_gpu_blocks(worker): + return worker.cache_config.num_gpu_blocks + + num_gpu_blocks = engine_core.collective_rpc(get_worker_num_gpu_blocks) + assert all(x is not None for x in num_gpu_blocks) From 30b4f9211109d60af7fef7469c59207527aa1d95 Mon Sep 17 00:00:00 2001 From: Isotr0py <2037008807@qq.com> Date: Sun, 15 Jun 2025 16:47:53 +0800 Subject: [PATCH 4/4] check cpu num blocks as well Signed-off-by: Isotr0py <2037008807@qq.com> --- tests/v1/engine/test_engine_core.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/v1/engine/test_engine_core.py b/tests/v1/engine/test_engine_core.py index 031cb13057cb..bc7894e92814 100644 --- a/tests/v1/engine/test_engine_core.py +++ b/tests/v1/engine/test_engine_core.py @@ -403,8 +403,12 @@ def test_engine_core_tp(monkeypatch: pytest.MonkeyPatch): executor_class=executor_class, log_stats=True) - def get_worker_num_gpu_blocks(worker): - return worker.cache_config.num_gpu_blocks + def get_worker_cache_config_field(worker, key: str): + return getattr(worker.cache_config, key) - num_gpu_blocks = engine_core.collective_rpc(get_worker_num_gpu_blocks) + num_gpu_blocks = engine_core.collective_rpc( + get_worker_cache_config_field, args=("num_gpu_blocks", )) + num_cpu_blocks = engine_core.collective_rpc( + get_worker_cache_config_field, args=("num_cpu_blocks", )) assert all(x is not None for x in num_gpu_blocks) + assert all(x is not None for x in num_cpu_blocks)