diff --git a/docs/.nav.yml b/docs/.nav.yml index 7493e71e8af..911f0fbc9b8 100644 --- a/docs/.nav.yml +++ b/docs/.nav.yml @@ -46,7 +46,6 @@ nav: - contributing/model/adding_omni_model.md - contributing/model/adding_diffusion_model.md - CI: contributing/ci - - Tests: contributing/tests - Design Documents: - design/index.md - design/architecture_overview.md diff --git a/docs/contributing/ci/tests_markers.md b/docs/contributing/ci/tests_markers.md new file mode 100644 index 00000000000..bf56914f8da --- /dev/null +++ b/docs/contributing/ci/tests_markers.md @@ -0,0 +1,160 @@ +# Markers for Tests + +By adding markers before test functions, tests can later be executed uniformly by simply declaring the corresponding marker type. + +## Current Markers +Defined in `pyproject.toml`: + +| Marker | Description | +| ------------------ | ------------------------------------------------------- | +| `core_model` | Core model tests (run in each PR) | +| `diffusion` | Diffusion model tests | +| `omni` | Omni model tests | +| `cache` | Cache backend tests | +| `parallel` | Parallelism/distributed tests | +| `cpu` | Tests that run on CPU | +| `gpu` | Tests that run on GPU (auto-added) | +| `cuda` | Tests that run on CUDA (auto-added) | +| `rocm` | Tests that run on AMD/ROCm (auto-added) | +| `npu` | Tests that run on NPU/Ascend (auto-added) | +| `H100` | Tests that require H100 GPU | +| `L4` | Tests that require L4 GPU | +| `MI325` | Tests that require MI325 GPU (AMD/ROCm) | +| `A2` | Tests that require A2 NPU | +| `A3` | Tests that require A3 NPU | +| `distributed_cuda` | Tests that require multi cards on CUDA platform | +| `distributed_rocm` | Tests that require multi cards on ROCm platform | +| `distributed_npu` | Tests that require multi cards on NPU platform | +| `skipif_cuda` | Skip if the num of CUDA cards is less than the required | +| `skipif_rocm` | Skip if the num of ROCm cards is less than the required | +| `skipif_npu` | Skip if the num of NPU cards is less than the required | +| `slow` | Slow tests (may skip in quick CI) | +| `benchmark` | Benchmark tests | + +For those markers shown as auto-added, they will be added by the `@hardware_test` decorator. + +### Example usage for markers + +```python +from tests.utils import hardware_test + +@pytest.mark.core_model +@pytest.mark.omni +@hardware_test( + res={"cuda": "L4", "rocm": "MI325", "npu": "A2"}, + num_cards=2, +) +@pytest.mark.parametrize("omni_server", test_params, indirect=True) +def test_video_to_audio() + ... +``` +### Decorator: `@hardware_test` + +This decorator is intended to make hardware-aware, cross-platform test authoring easier and more robust for CI/CD environments. The `hardware_test` decorator in `vllm-omni/tests/utils.py` performs the following actions: + +1. **Applies platform and resource markers** + Adds the appropriate pytest markers for each specified hardware platform (e.g., `cuda`, `rocm`, `npu`) and resource type (e.g., `L4`, `H100`, `MI325`, `A2`, `A3`). + ``` + @pytest.mark.cuda + @pytest.mark.L4 + ``` +2. **Handles multi-card (distributed) scenarios** + For tests requiring multiple cards, it automatically adds distributed markers such as `distributed_cuda`, `distributed_rocm`, or `distributed_npu`. + ``` + @pytest.mark.distributed_cuda(num_cards=num_cards) + ``` +3. **Supports flexible card requirements** + Accepts `num_cards` as either a single integer for all platforms or as a dictionary with per-platform values. If not specified, defaults to 1 card per platform. + +4. **Integrates resource validation** + On CUDA, adds a skip marker (`skipif_cuda`) if the system does not have the required number of devices. + Support for `skipif_rocm` and `skipif_npu` will be implemented later. + + +5. **Runs each test in a new process** + Automatically wraps the distributed test with a decorator (`@create_new_process_for_each_test`) to ensure isolation and compatibility with multi-process hardware backends. + +6. **Works with pytest filtering** + Allows tests to be filtered and selected at runtime using standard pytest marker expressions (e.g., `-m "distributed_cuda and L4"`). + +#### Example usage for decorator +- Single call for multiple platforms: + ```python + @hardware_test( + res={"cuda": "L4", "rocm": "MI325", "npu": "A2"}, + num_cards={"cuda": 2, "rocm": 2, "npu": 2}, + ) + ``` + or + ```python + @hardware_test( + res={"cuda": "L4", "rocm": "MI325", "npu": "A2"}, + num_cards=2, + ) + ``` +- `res` must be a dict; supported resources: CUDA (L4/H100), ROCm (MI325), NPU (A2/A3) +- `num_cards` can be int (all platforms) or dict (per platform); defaults to 1 when missing +- `hardware_test` automatically applies `@create_new_process_for_each_test` for distributed tests. +- Distributed markers (`distributed_cuda`, `distributed_rocm`, `distributed_npu`) are auto-added for multi-card cases +- Filtering examples: + - CUDA only: `pytest -m "distributed_cuda and L4"` + - ROCm only: `pytest -m "distributed_rocm and MI325"` + - NPU only: `pytest -m "distributed_npu"` + +## Add Support for a New Platform + +If you want to add support for a new platform (e.g., "tpu" for a new accelerator), follow these steps: + +1. **Extend the marker list in your pytest config** so that platform/resource markers are defined: + ```toml + # In pyproject.toml or pytest.ini + [tool.pytest.ini_options] + markers = [ + # ... existing markers ... + "tpu: Tests that require TPU device", + "TPU_V3: Tests that require TPU v3 hardware", + "distributed_tpu: Tests that require multiple TPU devices", + ] + ``` +2. **Implement a marker construction function for your platform** in `vllm-omni/tests/utils.py`: + ```python + # In vllm-omni/tests/utils.py + + def tpu_marks(*, res: str, num_cards: int): + test_platform = pytest.mark.tpu + if res == "TPU_V3": + test_resource = pytest.mark.TPU_V3 + else: + raise ValueError( + f"Invalid TPU resource type: {res}. Supported: TPU_V3") + + if num_cards == 1: + return [test_platform, test_resource] + else: + test_distributed = pytest.mark.distributed_tpu(num_cards=num_cards) + # Optionally: add skipif_tpu when implemented + return [test_platform, test_resource, test_distributed] + ``` +3. **Update `hardware_test` to recognize your new platform**: + In the relevant place (see the `hardware_test` implementation), add: + ```python + if platform == "tpu": + marks = tpu_marks(res=resource, num_cards=cards) + ``` +4. **(Recommended) Add a test using your new markers**: + ```python + @hardware_test( + res={"tpu": "TPU_V3"}, + num_cards=2, + ) + def test_my_tpu_feature(): + ... + ``` + +**Summary**: +- Add pytest markers for your new platform/resources +- Implement a marker function (`xxx_marks`) +- Plug into `hardware_test` +- You're done: tests decorated with `@hardware_test` using your platform now automatically get the correct markers, distribution, and isolation! + +See code in `vllm-omni/tests/utils.py` for existing examples (`cuda_marks`, `rocm_marks`, `npu_marks`). diff --git a/docs/contributing/tests/tests_style.md b/docs/contributing/ci/tests_style.md similarity index 94% rename from docs/contributing/tests/tests_style.md rename to docs/contributing/ci/tests_style.md index c88e17dee34..65c2b044346 100644 --- a/docs/contributing/tests/tests_style.md +++ b/docs/contributing/ci/tests_style.md @@ -139,7 +139,7 @@ vllm_omni/ tests/ 4. **Documentation**: Add docstrings to all test functions 5. **Environment variables**: Set uniformly in `conftest.py` or at the top of files 6. **Type annotations**: Add type annotations to all test function parameters -7. **Resources**, Using pytest tag to specify the computation resources the test required. +7. **Pytest Markers**: Add necessary markers like `@pytest.mark.core_model` and use `@hardware_test` to declare hardware requirements (check detailed in [Markers for Tests](../ci/tests_markers.md)). ### Template #### E2E - Online serving @@ -155,6 +155,7 @@ from pathlib import Path import pytest import openai +from tests.utils import hardware_test # Optional: set process start method for workers os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" @@ -184,6 +185,12 @@ def base64_encoded_video() -> str: def dummy_messages_from_video_data(video_data_url: str, content_text: str) -> str: xxx +@pytest.mark.core_model +@pytest.mark.omni +@hardware_test( + res={"cuda": "L4", "rocm": "MI325", "npu": "A2"}, + num_cards={"cuda": 2, "rocm": 2, "npu": 4}, +) @pytest.mark.parametrize("omni_server", test_params, indirect=True) def test_video_to_audio( client: openai.OpenAI, @@ -226,6 +233,7 @@ from pathlib import Path import pytest from vllm.assets.video import VideoAsset +from tests.utils import hardware_test from ..multi_stages.conftest import OmniRunner # Optional: set process start method for workers @@ -239,7 +247,12 @@ test_params = [(model, stage_config) for model in models for stage_config in sta # function name: test_{input_modality}_to_{output_modality} # modality candidate: text, image, audio, video, mixed_modalities -@pytest.mark.gpu_mem_high # requires high-memory GPU node +@pytest.mark.core_model +@pytest.mark.omni +@hardware_test( + res={"cuda": "L4", "rocm": "MI325", "npu": "A2"}, + num_cards=2, +) @pytest.mark.parametrize("test_config", test_params) def test_video_to_audio(omni_runner: type[OmniRunner], model: str) -> None: """Offline inference: video input, audio output.""" @@ -263,4 +276,5 @@ def test_video_to_audio(omni_runner: type[OmniRunner], model: str) -> None: 1. The file is saved in an appropriate place and the file name is clear. 2. The coding style follows the requirements outlined above. -3. For e2e model test, please ensure the test is configured under the `./buildkite/` folder. +3. **All test functions have appropriate pytest markers** +4. For tests that need run in CI, please ensure the test is configured under the `./buildkite/` folder. diff --git a/docs/contributing/model/adding_diffusion_model.md b/docs/contributing/model/adding_diffusion_model.md index 70fdc6a0817..7eb56d5f5bc 100644 --- a/docs/contributing/model/adding_diffusion_model.md +++ b/docs/contributing/model/adding_diffusion_model.md @@ -140,7 +140,7 @@ Key point for writing the example: + Save or display the generated results so users can validate the integration. # Testing -For comprehensive testing guidelines, please refer to the [Test File Structure and Style Guide](../tests/tests_style.md). +For comprehensive testing guidelines, please refer to the [Test File Structure and Style Guide](../ci/tests_style.md). ## Adding a Model Recipe diff --git a/docs/contributing/model/adding_omni_model.md b/docs/contributing/model/adding_omni_model.md index 2a91a305091..81499118623 100644 --- a/docs/contributing/model/adding_omni_model.md +++ b/docs/contributing/model/adding_omni_model.md @@ -572,7 +572,7 @@ def talker2code2wav( ## Testing -For comprehensive testing guidelines, please refer to the [Test File Structure and Style Guide](../tests/tests_style.md). +For comprehensive testing guidelines, please refer to the [Test File Structure and Style Guide](../ci/tests_style.md). ## Adding a Model Recipe diff --git a/pyproject.toml b/pyproject.toml index 209a085bf87..4833b117487 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -151,11 +151,34 @@ addopts = [ "--cov-report=xml", ] markers = [ - "unit: Unit tests", - "integration: Integration tests", + # ci/cd required + "core_model: Core model tests (run in each PR)", + # function module markers + "diffusion: Diffusion model tests", + "omni: Omni model tests", + "cache: Cache backend tests", + "parallel: Parallelism/distributed tests", + # platform markers + "cpu: Tests that run on CPU", + "gpu: Tests that run on GPU (auto-added)", + "cuda: Tests that run on CUDA (auto-added)", + "rocm: Tests that run on AMD/ROCm (auto-added)", + "npu: Tests that run on NPU/Ascend (auto-added)", + # specified computation resources marks (auto-added) + "H100: Tests that require H100 GPU", + "L4: Tests that require L4 GPU", + "MI325: Tests that require MI325 GPU (AMD/ROCm)", + "A2: Tests that require A2 NPU", + "A3: Tests that require A3 NPU", + "distributed_cuda: Tests that require multi cards on CUDA platform", + "distributed_rocm: Tests that require multi cards on ROCm platform", + "distributed_npu: Tests that require multi cards on NPU platform", + "skipif_cuda: Skip if the num of CUDA cards is less than the required", + "skipif_rocm: Skip if the num of ROCm cards is less than the required", + "skipif_npu: Skip if the num of NPU cards is less than the required", + # more detailed markers + "slow: Slow tests (may skip in quick CI)", "benchmark: Benchmark tests", - "slow: Slow tests", - "core_model: enable this model test in each PR instead of only nightly", ] [tool.typos.default] diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index 8fb4beb9755..00000000000 --- a/pytest.ini +++ /dev/null @@ -1,3 +0,0 @@ -[pytest] -markers = - gpu_mem_high: needs high VRAM diff --git a/tests/utils.py b/tests/utils.py index aba734501eb..2a2dca238a8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,11 +1,24 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project - +# Some functions are copied from vllm/tests/utils.py +import functools import os +import signal +import subprocess +import sys +import tempfile import time -from contextlib import contextmanager +from collections.abc import Callable +from contextlib import ExitStack, contextmanager, suppress +from typing import Any, Literal +import cloudpickle +import pytest +from typing_extensions import ParamSpec from vllm.platforms import current_platform +from vllm.utils.torch_utils import cuda_device_count_stateless + +_P = ParamSpec("_P") if current_platform.is_rocm(): from amdsmi import ( @@ -90,10 +103,16 @@ def wait_for_gpu_memory_to_clear( print("") if threshold_bytes is not None: - is_free = lambda used, total: used <= threshold_bytes / 2**30 # noqa E731 + + def is_free(used, total): + return used <= threshold_bytes / 2**30 # noqa E731 + threshold = f"{threshold_bytes / 2**30} GiB" else: - is_free = lambda used, total: used / total <= threshold_ratio # noqa E731 + + def is_free(used, total): + return used / total <= threshold_ratio # noqa E731 + threshold = f"{threshold_ratio:.2f}" dur_s = time.time() - start_time @@ -105,3 +124,353 @@ def wait_for_gpu_memory_to_clear( raise ValueError(f"Memory of devices {devices=} not free after {dur_s=:.02f} ({threshold=})") time.sleep(5) + + +def fork_new_process_for_each_test(func: Callable[_P, None]) -> Callable[_P, None]: + """Decorator to fork a new process for each test function. + See https://github.com/vllm-project/vllm/issues/7053 for more details. + """ + + @functools.wraps(func) + def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: + # Make the process the leader of its own process group + # to avoid sending SIGTERM to the parent process + os.setpgrp() + from _pytest.outcomes import Skipped + + # Create a unique temporary file to store exception info from child + # process. Use test function name and process ID to avoid collisions. + with ( + tempfile.NamedTemporaryFile( + delete=False, mode="w+b", prefix=f"vllm_test_{func.__name__}_{os.getpid()}_", suffix=".exc" + ) as exc_file, + ExitStack() as delete_after, + ): + exc_file_path = exc_file.name + delete_after.callback(os.remove, exc_file_path) + + pid = os.fork() + print(f"Fork a new process to run a test {pid}") + if pid == 0: + # Parent process responsible for deleting, don't delete + # in child. + delete_after.pop_all() + try: + func(*args, **kwargs) + except Skipped as e: + # convert Skipped to exit code 0 + print(str(e)) + os._exit(0) + except Exception as e: + import traceback + + tb_string = traceback.format_exc() + + # Try to serialize the exception object first + exc_to_serialize: dict[str, Any] + try: + # First, try to pickle the actual exception with + # its traceback. + exc_to_serialize = {"pickled_exception": e} + # Test if it can be pickled + cloudpickle.dumps(exc_to_serialize) + except (Exception, KeyboardInterrupt): + # Fall back to string-based approach. + exc_to_serialize = { + "exception_type": type(e).__name__, + "exception_msg": str(e), + "traceback": tb_string, + } + try: + with open(exc_file_path, "wb") as f: + cloudpickle.dump(exc_to_serialize, f) + except Exception: + # Fallback: just print the traceback. + print(tb_string) + os._exit(1) + else: + os._exit(0) + else: + pgid = os.getpgid(pid) + _pid, _exitcode = os.waitpid(pid, 0) + # ignore SIGTERM signal itself + old_signal_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) + # kill all child processes + os.killpg(pgid, signal.SIGTERM) + # restore the signal handler + signal.signal(signal.SIGTERM, old_signal_handler) + if _exitcode != 0: + # Try to read the exception from the child process + exc_info = {} + if os.path.exists(exc_file_path): + with suppress(Exception), open(exc_file_path, "rb") as f: + exc_info = cloudpickle.load(f) + + if (original_exception := exc_info.get("pickled_exception")) is not None: + # Re-raise the actual exception object if it was + # successfully pickled. + assert isinstance(original_exception, Exception) + raise original_exception + + if (original_tb := exc_info.get("traceback")) is not None: + # Use string-based traceback for fallback case + raise AssertionError( + f"Test {func.__name__} failed when called with" + f" args {args} and kwargs {kwargs}" + f" (exit code: {_exitcode}):\n{original_tb}" + ) from None + + # Fallback to the original generic error + raise AssertionError( + f"function {func.__name__} failed when called with" + f" args {args} and kwargs {kwargs}" + f" (exit code: {_exitcode})" + ) from None + + return wrapper + + +def spawn_new_process_for_each_test(f: Callable[_P, None]) -> Callable[_P, None]: + """Decorator to spawn a new process for each test function.""" + + @functools.wraps(f) + def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: + # Check if we're already in a subprocess + if os.environ.get("RUNNING_IN_SUBPROCESS") == "1": + # If we are, just run the function directly + return f(*args, **kwargs) + + import torch.multiprocessing as mp + + with suppress(RuntimeError): + mp.set_start_method("spawn") + + # Get the module + module_name = f.__module__ + + # Create a process with environment variable set + env = os.environ.copy() + env["RUNNING_IN_SUBPROCESS"] = "1" + + with tempfile.TemporaryDirectory() as tempdir: + output_filepath = os.path.join(tempdir, "new_process.tmp") + + # `cloudpickle` allows pickling complex functions directly + input_bytes = cloudpickle.dumps((f, output_filepath)) + + cmd = [sys.executable, "-m", f"{module_name}"] + + returned = subprocess.run(cmd, input=input_bytes, capture_output=True, env=env) + + # check if the subprocess is successful + try: + returned.check_returncode() + except Exception as e: + # wrap raised exception to provide more information + raise RuntimeError(f"Error raised in subprocess:\n{returned.stderr.decode()}") from e + + return wrapper + + +def create_new_process_for_each_test( + method: Literal["spawn", "fork"] | None = None, +) -> Callable[[Callable[_P, None]], Callable[_P, None]]: + """Creates a decorator that runs each test function in a new process. + + Args: + method: The process creation method. Can be either "spawn" or "fork". + If not specified, it defaults to "spawn" on ROCm and XPU + platforms and "fork" otherwise. + + Returns: + A decorator to run test functions in separate processes. + """ + if method is None: + # TODO: Spawn is not working correctly on ROCm + # The test content will not run and tests passed immediately. + # For now, using `fork` for ROCm as it can run with `fork` + # and tests are running correctly. + use_spawn = current_platform.is_xpu() + method = "spawn" if use_spawn else "fork" + + assert method in ["spawn", "fork"], "Method must be either 'spawn' or 'fork'" + + if method == "fork": + return fork_new_process_for_each_test + + return spawn_new_process_for_each_test + + +def cuda_marks(*, res: str, num_cards: int): + """ + Get a collection of pytest marks to apply for `@cuda_test`. + + Args: + res: Resource type, e.g., "L4" or "H100". + num_cards: Number of GPU cards required. + + Returns: + List of pytest marks to apply. + """ + test_platform_detail = pytest.mark.cuda + + if res == "L4": + test_resource = pytest.mark.L4 + elif res == "H100": + test_resource = pytest.mark.H100 + else: + raise ValueError(f"Invalid CUDA resource type: {res}. Supported: L4, H100") + + marks = [test_resource, test_platform_detail] + + if num_cards == 1: + return marks + else: + test_distributed = pytest.mark.distributed_cuda(num_cards=num_cards) + test_skipif = pytest.mark.skipif_cuda( + cuda_device_count_stateless() < num_cards, + reason=f"Need at least {num_cards} CUDA GPUs to run the test.", + ) + return marks + [test_distributed, test_skipif] + + +def rocm_marks(*, res: str, num_cards: int): + """ + Get a collection of pytest marks to apply for `@rocm_test`. + + Args: + res: Resource type, e.g., "MI325". + num_cards: Number of GPU cards required. + + Returns: + List of pytest marks to apply. + """ + test_platform_detail = pytest.mark.rocm + + if res == "MI325": + test_resource = pytest.mark.MI325 + else: + raise ValueError(f"Invalid ROCm resource type: {res}. Supported: MI325") + + marks = [test_resource, test_platform_detail] + + if num_cards == 1: + return marks + else: + test_distributed = pytest.mark.distributed_rocm(num_cards=num_cards) + # TODO: add ROCm support for `skipif_rocm` marker + return marks + [test_distributed] + + +def gpu_marks(*, res: str, num_cards: int): + """ + Get a collection of pytest marks to apply for `@gpu_test`. + Platform is automatically determined based on resource type. + + Args: + res: Resource type, e.g., "L4", "H100" for CUDA, or "MI325" for ROCm. + num_cards: Number of GPU cards required. + + Returns: + List of pytest marks to apply. + """ + test_platform = pytest.mark.gpu + if res in ("L4", "H100"): + return [test_platform] + cuda_marks(res=res, num_cards=num_cards) + if res == "MI325": + return [test_platform] + rocm_marks(res=res, num_cards=num_cards) + raise ValueError(f"Invalid resource type: {res}. Supported: L4, H100, MI325") + + +def npu_marks(*, res: str, num_cards: int): + """Get a collection of pytest marks to apply for `@npu_test`.""" + test_platform = pytest.mark.npu + if res == "A2": + test_resource = pytest.mark.A2 + elif res == "A3": + test_resource = pytest.mark.A3 + else: + # TODO: Currently we don't have various NPU card types defined + # Use None to skip resource-specific marking for unknown types + test_resource = None + + if num_cards == 1: + return [mark for mark in [test_platform, test_resource] if mark is not None] + else: + # Multiple cards scenario needs distributed_npu mark + test_distributed = pytest.mark.distributed_npu(num_cards=num_cards) + # TODO: add NPU support for `skipif_npu` marker + return [mark for mark in [test_platform, test_resource, test_distributed] if mark is not None] + + +def hardware_test(*, res: dict[str, str], num_cards: int | dict[str, int] = 1): + """ + Decorate a test for multiple hardware platforms with a single call. + Automatically wraps the test with @create_new_process_for_each_test() for distributed tests. + + Args: + res: Mapping from platform to resource type. Supported platforms/resources: + - cuda: L4, H100 + - rocm: MI325 + - npu: A2, A3 + num_cards: Number of cards required. Can be: + - int: same card count for all platforms (default: 1) + - dict: per-platform card count, e.g., {"cuda": 2, "rocm": 2} + + Example: + @hardware_test( + res={"cuda": "L4", "rocm": "MI325", "npu": "A2"}, + num_cards={"cuda": 2, "rocm": 2, "npu": 2}, + ) + def test_multi_platform(): + ... + """ + # Validate platforms + # Don't validate platform details in this decorator + for platform, _ in res.items(): + if platform not in ("cuda", "rocm", "npu"): + raise ValueError(f"Unsupported platform: {platform}") + + # Normalize num_cards + if isinstance(num_cards, int): + num_cards_dict = {platform: num_cards for platform in res.keys()} + else: + num_cards_dict = num_cards + for platform in num_cards_dict.keys(): + if platform not in res: + raise ValueError( + f"Platform '{platform}' in num_cards but not in res. Available platforms: {list(res.keys())}" + ) + for platform in res.keys(): + if platform not in num_cards_dict: + num_cards_dict[platform] = 1 + + # Collect marks from all platforms + all_marks: list[Callable[[Callable[_P, None]], Callable[_P, None]]] = [] + for platform, resource in res.items(): + cards = num_cards_dict[platform] + if platform == "cuda" or platform == "rocm": + marks = gpu_marks(res=resource, num_cards=cards) + elif platform == "npu": + marks = npu_marks(res=resource, num_cards=cards) + else: + raise ValueError(f"Unsupported platform: {platform}") + all_marks.extend(marks) + + create_new_process_flag = False + for cards in num_cards_dict.values(): + if cards > 1: + create_new_process_flag = True + break + + def wrapper(f: Callable[_P, None]) -> Callable[_P, None]: + if create_new_process_flag: + # only for distributed tests + func = create_new_process_for_each_test()(f) + else: + func = f + for mark in reversed(all_marks): + func = mark(func) + return func + + return wrapper diff --git a/tools/pre_commit/check_pickle_imports.py b/tools/pre_commit/check_pickle_imports.py index 562999d7e58..db45f29900d 100644 --- a/tools/pre_commit/check_pickle_imports.py +++ b/tools/pre_commit/check_pickle_imports.py @@ -18,6 +18,7 @@ ALLOWED_FILES = { "vllm_omni/entrypoints/omni_llm.py", "tests/e2e/offline_inference/utils.py", + "tests/utils.py", "vllm_omni/diffusion/distributed/group_coordinator.py", "tests/diffusion/attention/test_sequence_parallel.py", }