diff --git a/CMakeLists.txt b/CMakeLists.txt index 0000b6d32be6..c609b5257e16 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -308,7 +308,7 @@ if(VLLM_GPU_LANG STREQUAL "CUDA") SET(CUTLASS_ENABLE_HEADERS_ONLY ON CACHE BOOL "Enable only the header library") # Set CUTLASS_REVISION. Used for FetchContent. Also fixes some bogus messages when building. - set(CUTLASS_REVISION "v4.2.1") + set(CUTLASS_REVISION "v4.3.5") # Use the specified CUTLASS source directory for compilation if VLLM_CUTLASS_SRC_DIR is provided if (DEFINED ENV{VLLM_CUTLASS_SRC_DIR}) diff --git a/tests/distributed/test_torchrun_example.py b/tests/distributed/test_torchrun_example.py index f415409d7b37..f449b8b36f98 100644 --- a/tests/distributed/test_torchrun_example.py +++ b/tests/distributed/test_torchrun_example.py @@ -8,62 +8,66 @@ import torch.distributed as dist from vllm import LLM, SamplingParams +from vllm.distributed import destroy_distributed_environment from vllm.distributed.parallel_state import get_world_group dist.init_process_group(backend="gloo") -# Create prompts -prompts = [ - "Hello, my name is", - "The president of the United States is", - "The capital of France is", - "The future of AI is", -] - -sampling_params = SamplingParams(temperature=0.8, top_p=0.95) - -# set different `gpu_memory_utilization` and `swap_space` for different ranks, -# to test if all ranks agree on the same kv cache configuration. -llm = LLM( - model="facebook/opt-125m", - tensor_parallel_size=2, - pipeline_parallel_size=int(os.getenv("PP_SIZE", 1)), - distributed_executor_backend="external_launcher", - gpu_memory_utilization=random.uniform(0.7, 0.9), - swap_space=random.randint(1, 4), - seed=0, -) - -outputs = llm.generate(prompts, sampling_params) - -cpu_group = get_world_group().cpu_group - -torch_rank = dist.get_rank(group=cpu_group) - - -def test_consistent_across_ranks(obj): - if torch_rank == 0: - dist.broadcast_object_list([obj], src=0, group=cpu_group) - else: - container = [None] - dist.broadcast_object_list(container, src=0, group=cpu_group) - assert container[0] == obj - - -test_consistent_across_ranks(llm.llm_engine.vllm_config.cache_config.num_cpu_blocks) -test_consistent_across_ranks(llm.llm_engine.vllm_config.cache_config.num_gpu_blocks) - -# make sure we can access the model parameters from the calling process -# of the `LLM` instance. -params = list( - llm.llm_engine.model_executor.driver_worker.worker.model_runner.model.parameters() -) -test_consistent_across_ranks(len(params)) - -# all ranks should have the same outputs -for output in outputs: - prompt = output.prompt - generated_text = output.outputs[0].text - test_consistent_across_ranks(prompt) - test_consistent_across_ranks(generated_text) - print(f"Rank {torch_rank}, Prompt: {prompt!r}, Generated text: {generated_text!r}") +try: + # Create prompts + prompts = [ + "Hello, my name is", + "The president of the United States is", + "The capital of France is", + "The future of AI is", + ] + + sampling_params = SamplingParams(temperature=0.8, top_p=0.95) + + # set different `gpu_memory_utilization` and `swap_space` for different ranks, + # to test if all ranks agree on the same kv cache configuration. + llm = LLM( + model="facebook/opt-125m", + tensor_parallel_size=2, + pipeline_parallel_size=int(os.getenv("PP_SIZE", 1)), + distributed_executor_backend="external_launcher", + gpu_memory_utilization=random.uniform(0.7, 0.9), + swap_space=random.randint(1, 4), + seed=0, + ) + + outputs = llm.generate(prompts, sampling_params) + + cpu_group = get_world_group().cpu_group + + torch_rank = dist.get_rank(group=cpu_group) + + def test_consistent_across_ranks(obj): + if torch_rank == 0: + dist.broadcast_object_list([obj], src=0, group=cpu_group) + else: + container = [None] + dist.broadcast_object_list(container, src=0, group=cpu_group) + assert container[0] == obj + + test_consistent_across_ranks(llm.llm_engine.vllm_config.cache_config.num_cpu_blocks) + test_consistent_across_ranks(llm.llm_engine.vllm_config.cache_config.num_gpu_blocks) + + # make sure we can access the model parameters from the calling process + # of the `LLM` instance. + params = list( + llm.llm_engine.model_executor.driver_worker.worker.model_runner.model.parameters() + ) + test_consistent_across_ranks(len(params)) + + # all ranks should have the same outputs + for output in outputs: + prompt = output.prompt + generated_text = output.outputs[0].text + test_consistent_across_ranks(prompt) + test_consistent_across_ranks(generated_text) + print( + f"Rank {torch_rank}, Prompt: {prompt!r}, Generated text: {generated_text!r}" + ) +finally: + destroy_distributed_environment() diff --git a/tests/utils.py b/tests/utils.py index efeceba63bcc..d0cc7955c455 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,7 +2,6 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import asyncio -import contextlib import copy import functools import importlib @@ -977,13 +976,15 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: signal.signal(signal.SIGTERM, old_signal_handler) if _exitcode != 0: # Try to read the exception from the child process - exc_info = {} - if os.path.exists(exc_file_path): - with ( - contextlib.suppress(Exception), - open(exc_file_path, "rb") as f, - ): - exc_info = cloudpickle.load(f) + exc_info: dict[str, Any] = {} + exc_file_existed = os.path.exists(exc_file_path) + load_error: str | None = None + if exc_file_existed: + try: + with open(exc_file_path, "rb") as f: + exc_info = cloudpickle.load(f) + except Exception as e: + load_error = f"{type(e).__name__}: {e}" if ( original_exception := exc_info.get("pickled_exception") @@ -1001,11 +1002,19 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: f" (exit code: {_exitcode}):\n{original_tb}" ) from None - # Fallback to the original generic error + # Fallback: no recoverable exception; include diagnostics + diag = ( + f" exc_file_path={exc_file_path!r}" + f" exc_file_existed={exc_file_existed}" + ) + if load_error is not None: + diag += f" load_error={load_error!r}" raise AssertionError( f"function {func.__name__} failed when called with" f" args {args} and kwargs {kwargs}" - f" (exit code: {_exitcode})" + f" (exit code: {_exitcode});" + f" exception could not be recovered from child" + f"{diag}" ) from None return wrapper diff --git a/tests/v1/engine/test_abort_final_step.py b/tests/v1/engine/test_abort_final_step.py index 560c5c2b1e30..b7e826350771 100644 --- a/tests/v1/engine/test_abort_final_step.py +++ b/tests/v1/engine/test_abort_final_step.py @@ -43,6 +43,14 @@ TEXT_PROMPT = "Hello" +# Register the test connector so it can be used by the engine +# This must be done before any test uses the connector +KVConnectorFactory.register_connector( + "DummyKVConnector", + "tests.v1.engine.test_abort_final_step", + "DummyKVConnector", +) + class DummyKVConnectorMetadata(KVConnectorMetadata): """Dummy metadata for the test connector.""" diff --git a/tests/v1/kv_connector/unit/test_multi_connector.py b/tests/v1/kv_connector/unit/test_multi_connector.py index 60f4002e0783..130dcc8c5815 100644 --- a/tests/v1/kv_connector/unit/test_multi_connector.py +++ b/tests/v1/kv_connector/unit/test_multi_connector.py @@ -191,10 +191,14 @@ def test_multi_example_connector_consistency(): events = get_connector_events() # get_num_new_matched_tokens and update_state_after_alloc will be called - # on each connector in turn. - assert events["storage1-SCHEDULER"][:3] == [ + # on each connector in turn. With shared prefix (PROMPT_CONTEXT) between + # the two prompts, the second request gets a prefix cache hit, causing an + # extra get_num_new_matched_tokens call with the cached token count. + assert events["storage1-SCHEDULER"][:5] == [ "get_num_new_matched_tokens 0", "update_state_after_alloc num_blocks=[0] 0", + "get_num_new_matched_tokens 96", + "update_state_after_alloc num_blocks=[0] 0", "build_connector_meta", ] assert events["storage1-WORKER"][:5] == [ @@ -204,9 +208,11 @@ def test_multi_example_connector_consistency(): "wait_for_layer_load", "save_kv_layer", ] - assert events["storage2-SCHEDULER"][:3] == [ + assert events["storage2-SCHEDULER"][:5] == [ "get_num_new_matched_tokens 0", "update_state_after_alloc num_blocks=[0] 0", + "get_num_new_matched_tokens 96", + "update_state_after_alloc num_blocks=[0] 0", "build_connector_meta", ] assert events["storage2-WORKER"][:5] == [ diff --git a/vllm/utils/system_utils.py b/vllm/utils/system_utils.py index 180a8d08b731..9c20ca65afd9 100644 --- a/vllm/utils/system_utils.py +++ b/vllm/utils/system_utils.py @@ -8,6 +8,7 @@ import os import signal import sys +import threading from collections.abc import Callable, Iterator from pathlib import Path from typing import TextIO @@ -119,6 +120,10 @@ def _maybe_force_spawn(): return reasons = [] + if threading.active_count() > 1: + reasons.append( + "Process is multi-threaded; fork() can cause deadlocks in the child" + ) if is_in_ray_actor(): # even if we choose to spawn, we need to pass the ray address # to the subprocess so that it knows how to connect to the ray cluster.