diff --git a/tests/utils.py b/tests/utils.py index e8fd3f1e8152..41202aa19481 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -19,7 +19,7 @@ import time import warnings from collections.abc import Callable, Iterable, Sequence -from contextlib import ExitStack, contextmanager +from contextlib import ExitStack, contextmanager, suppress from multiprocessing import Process from pathlib import Path from typing import Any, Literal @@ -1512,65 +1512,52 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: def spawn_new_process_for_each_test(f: Callable[_P, None]) -> Callable[_P, None]: - """Decorator to spawn a new process for each test function. - - Uses subprocess with cloudpickle to serialize the test function and - propagates exceptions back to the parent, so test failures are never - silently swallowed (fixes https://github.com/vllm-project/vllm/issues/41415). - """ + """Decorator to spawn a new process for each test function.""" @functools.wraps(f) def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: - with tempfile.NamedTemporaryFile(delete=False, suffix=".tb", mode="wb") as tmp: - tb_file = tmp.name + # Check if we're already in a subprocess + if os.environ.get("RUNNING_IN_SUBPROCESS") == "1": + # If we are, just run the function directly + return f(*args, **kwargs) - try: - # Serialize the function + args with cloudpickle so closures work - payload = cloudpickle.dumps((f, args, kwargs, tb_file)) - - child_script = ( - "import sys, cloudpickle, traceback\n" - "try:\n" - " from _pytest.outcomes import Skipped\n" - "except ImportError:\n" - " class Skipped(BaseException): pass\n" - "f, args, kwargs, tb_file = " - "cloudpickle.loads(sys.stdin.buffer.read())\n" - "try:\n" - " f(*args, **kwargs)\n" - "except Skipped:\n" - " sys.exit(0)\n" - "except BaseException:\n" - " open(tb_file, 'w').write(traceback.format_exc())\n" - " sys.exit(1)\n" - ) + import torch.multiprocessing as mp + + with suppress(RuntimeError): + mp.set_start_method("spawn") + + # Get the module + module_name = f.__module__ + + # Create a process with environment variable set + env = os.environ.copy() + env["RUNNING_IN_SUBPROCESS"] = "1" + + with tempfile.TemporaryDirectory() as tempdir: + output_filepath = os.path.join(tempdir, "new_process.tmp") + + # `cloudpickle` allows pickling complex functions directly + input_bytes = cloudpickle.dumps((f, output_filepath)) repo_root = str(VLLM_PATH.resolve()) - env = os.environ.copy() + + env = dict(env or os.environ) env["PYTHONPATH"] = repo_root + os.pathsep + env.get("PYTHONPATH", "") - result = subprocess.run( - [sys.executable, "-c", child_script], - input=payload, - capture_output=True, - env=env, + cmd = [sys.executable, "-m", f"{module_name}"] + + returned = subprocess.run( + cmd, input=input_bytes, capture_output=True, env=env ) - if result.returncode != 0: - # Read traceback written by child, fall back to stderr - tb = "" - if os.path.exists(tb_file) and os.path.getsize(tb_file) > 0: - with open(tb_file) as fp: - tb = fp.read() - else: - tb = result.stderr.decode() + # check if the subprocess is successful + try: + returned.check_returncode() + except Exception as e: + # wrap raised exception to provide more information raise RuntimeError( - f"Test subprocess '{f.__name__}' failed " - f"(exit code {result.returncode}):\n{tb}" - ) - finally: - with contextlib.suppress(OSError): - os.remove(tb_file) + f"Error raised in subprocess:\n{returned.stderr.decode()}" + ) from e return wrapper diff --git a/tests/utils_/test_spawn_decorator.py b/tests/utils_/test_spawn_decorator.py deleted file mode 100644 index 1740ea30de94..000000000000 --- a/tests/utils_/test_spawn_decorator.py +++ /dev/null @@ -1,33 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -"""Tests for spawn_new_process_for_each_test decorator.""" - -import pytest - -from tests.utils import spawn_new_process_for_each_test - - -@spawn_new_process_for_each_test -def test_spawn_decorator_passing(): - """Passing function should complete normally.""" - assert 1 + 1 == 2 - - -@pytest.mark.xfail(raises=RuntimeError, strict=True) -@spawn_new_process_for_each_test -def test_spawn_decorator_failure_is_caught(): - """Failing function should raise RuntimeError, never silently pass.""" - raise ValueError("intentional failure") - - -@spawn_new_process_for_each_test -def test_spawn_decorator_skip(): - """pytest.skip inside subprocess should propagate correctly.""" - pytest.skip("intentional skip") - - -@spawn_new_process_for_each_test -@pytest.mark.parametrize("x,y,expected", [(1, 2, 3), (0, 0, 0)]) -def test_spawn_decorator_parametrized(x, y, expected): - """Args and kwargs must be forwarded correctly to subprocess.""" - assert x + y == expected diff --git a/tests/v1/cudagraph/test_cudagraph_dispatch.py b/tests/v1/cudagraph/test_cudagraph_dispatch.py index 97b5fd46a2eb..66e6d7dd4605 100644 --- a/tests/v1/cudagraph/test_cudagraph_dispatch.py +++ b/tests/v1/cudagraph/test_cudagraph_dispatch.py @@ -371,200 +371,193 @@ def test_bypass_on_mode_none(self): assert not wrapper.concrete_cudagraph_entries -def _run_and_monitor_call( - wrapper, input_tensor, runtime_mode, batch_descriptor, vllm_config -): - """Helper to run a single call and monitor the action.""" - - with ( - patch("torch.cuda.graph", wraps=torch.cuda.graph) as mock_graph_context, - patch.object(wrapper, "runnable", wraps=wrapper.runnable) as mock_runnable, +@pytest.mark.skipif(not current_platform.is_cuda(), reason="Skip if not cuda") +class TestCudagraphIntegration: + def setup_method(self): + # only FULL mode for non-uniform batches + self.comp_config = CompilationConfig( + mode=CompilationMode.VLLM_COMPILE, + cudagraph_mode="FULL", + cudagraph_capture_sizes=[10, 20], + ) + self.vllm_config = _create_vllm_config(self.comp_config) + self.dispatcher = CudagraphDispatcher(self.vllm_config) + self.dispatcher.initialize_cudagraph_keys( + self.comp_config.cudagraph_mode, uniform_decode_query_len=1 + ) + + def _run_and_monitor_call( + self, wrapper, input_tensor, runtime_mode, batch_descriptor ): - entry = wrapper.concrete_cudagraph_entries.get(batch_descriptor, None) + """Helper to run a single call and monitor the action.""" + + with ( + patch("torch.cuda.graph", wraps=torch.cuda.graph) as mock_graph_context, + patch.object(wrapper, "runnable", wraps=wrapper.runnable) as mock_runnable, + ): + entry = wrapper.concrete_cudagraph_entries.get(batch_descriptor, None) + + context = set_forward_context( + attn_metadata=None, + vllm_config=self.vllm_config, + cudagraph_runtime_mode=runtime_mode, + batch_descriptor=batch_descriptor, + ) + mock_replay = MagicMock() + if entry and entry.cudagraph: + with ( + context, + patch.object( + entry.cudagraph, "replay", new_callable=MagicMock + ) as mock_replay, + ): + wrapper(input_tensor) + else: + with context: + wrapper(input_tensor) + + if mock_graph_context.called: + # note that this is globally mocked, so it will be detected + # even whether called by the inner or outer wrapper + return "capture_global" + if mock_replay.called: + # only for outer wrapper + return "replay" + if mock_runnable.call_count > 0: + # only for outer wrapper + return "bypass" + return "unknown" + + @create_new_process_for_each_test("spawn") + def test_capture_replay_bypass_logic(self): + model = SimpleMLP().to(DEVICE_TYPE) + full_wrapper = CUDAGraphWrapper(model, self.vllm_config, CUDAGraphMode.FULL) + max_bs = 16 + persistent_input_buffer = torch.zeros(max_bs, 10, device=DEVICE_TYPE) + input_1 = persistent_input_buffer[:1] + input_2 = persistent_input_buffer[:2] + input_3 = persistent_input_buffer[:3] + + desc_1 = BatchDescriptor(num_tokens=1) + desc_2 = BatchDescriptor(num_tokens=2) + desc_3_unseen = BatchDescriptor(num_tokens=3) - context = set_forward_context( + # 0. global warmup + with set_forward_context( attn_metadata=None, - vllm_config=vllm_config, - cudagraph_runtime_mode=runtime_mode, - batch_descriptor=batch_descriptor, - ) - mock_replay = MagicMock() - if entry and entry.cudagraph: - with ( - context, - patch.object( - entry.cudagraph, "replay", new_callable=MagicMock - ) as mock_replay, - ): - wrapper(input_tensor) - else: - with context: - wrapper(input_tensor) - - if mock_graph_context.called: - # note that this is globally mocked, so it will be detected - # even whether called by the inner or outer wrapper - return "capture_global" - if mock_replay.called: - # only for outer wrapper - return "replay" - if mock_runnable.call_count > 0: - # only for outer wrapper - return "bypass" - return "unknown" - - -@create_new_process_for_each_test("spawn") -@pytest.mark.skipif(not current_platform.is_cuda(), reason="Skip if not cuda") -def test_capture_replay_bypass_logic(): - comp_config = CompilationConfig( - mode=CompilationMode.VLLM_COMPILE, - cudagraph_mode="FULL", - cudagraph_capture_sizes=[1, 2], - ) - vllm_config = _create_vllm_config(comp_config) - dispatcher = CudagraphDispatcher(vllm_config) - dispatcher.initialize_cudagraph_keys( - comp_config.cudagraph_mode, uniform_decode_query_len=1 - ) - model = SimpleMLP().to(DEVICE_TYPE) - full_wrapper = CUDAGraphWrapper(model, vllm_config, CUDAGraphMode.FULL) - max_bs = 16 - persistent_input_buffer = torch.zeros(max_bs, 10, device=DEVICE_TYPE) - input_1 = persistent_input_buffer[:1] - input_2 = persistent_input_buffer[:2] - input_3 = persistent_input_buffer[:3] - - desc_1 = BatchDescriptor(num_tokens=1) - desc_2 = BatchDescriptor(num_tokens=2) - desc_3_unseen = BatchDescriptor(num_tokens=3) - - # 0. global warmup - with set_forward_context( - attn_metadata=None, - vllm_config=vllm_config, - cudagraph_runtime_mode=CUDAGraphMode.NONE, - batch_descriptor=None, - ): - full_wrapper(input_1) + vllm_config=self.vllm_config, + cudagraph_runtime_mode=CUDAGraphMode.NONE, + batch_descriptor=None, + ): + full_wrapper(input_1) - rt_mode, key = dispatcher.dispatch(num_tokens=desc_1.num_tokens) - # 1. Capture first shape - action = _run_and_monitor_call(full_wrapper, input_1, rt_mode, key, vllm_config) - assert action == "capture_global" + rt_mode, key = self.dispatcher.dispatch(num_tokens=desc_1.num_tokens) + # 1. Capture first shape + action = self._run_and_monitor_call(full_wrapper, input_1, rt_mode, key) + assert action == "capture_global" - # 2. Replay first shape - action = _run_and_monitor_call(full_wrapper, input_1, rt_mode, key, vllm_config) - assert action == "replay" + # 2. Replay first shape + action = self._run_and_monitor_call(full_wrapper, input_1, rt_mode, key) + assert action == "replay" - rt_mode, key = dispatcher.dispatch(num_tokens=desc_2.num_tokens) - # 3. Capture second shape - action = _run_and_monitor_call(full_wrapper, input_2, rt_mode, key, vllm_config) - assert action == "capture_global" + rt_mode, key = self.dispatcher.dispatch(num_tokens=desc_2.num_tokens) + # 3. Capture second shape + action = self._run_and_monitor_call(full_wrapper, input_2, rt_mode, key) + assert action == "capture_global" - # 4. Replay second shape - action = _run_and_monitor_call( - full_wrapper, input_2, CUDAGraphMode.FULL, key, vllm_config - ) - assert action == "replay" - - # 5. Bypass if no key match - rt_mode, key = dispatcher.dispatch(num_tokens=desc_3_unseen.num_tokens) - assert rt_mode == CUDAGraphMode.NONE - action = _run_and_monitor_call(full_wrapper, input_3, rt_mode, key, vllm_config) - assert action == "bypass" - - # capture unseen shape is not allowed after disable - set_cudagraph_capturing_enabled(False) - with pytest.raises(RuntimeError): - _run_and_monitor_call( - full_wrapper, input_3, CUDAGraphMode.FULL, desc_3_unseen, vllm_config + # 4. Replay second shape + action = self._run_and_monitor_call( + full_wrapper, input_2, CUDAGraphMode.FULL, desc_2 ) - set_cudagraph_capturing_enabled(True) + assert action == "replay" + # 5. Bypass if no key match + rt_mode, key = self.dispatcher.dispatch(num_tokens=desc_3_unseen.num_tokens) + assert rt_mode == CUDAGraphMode.NONE + action = self._run_and_monitor_call(full_wrapper, input_3, rt_mode, key) + assert action == "bypass" + + # capture unseen shape is not allowed after disable + set_cudagraph_capturing_enabled(False) + with pytest.raises(RuntimeError): + self._run_and_monitor_call( + full_wrapper, input_3, CUDAGraphMode.FULL, desc_3_unseen + ) + set_cudagraph_capturing_enabled(True) + + @create_new_process_for_each_test("spawn") + def test_nested_wrappers(self): + """Tests a scenario with a PIECEWISE wrapper inside a FULL one.""" + model = SimpleMLP().to(DEVICE_TYPE) + full_wrapper = CUDAGraphWrapper(model, self.vllm_config, CUDAGraphMode.FULL) + input_1 = torch.randn(1, 10, device=DEVICE_TYPE) + + # Setup: Inner model is wrapped with PIECEWISE, outer with FULL + inner_model = SimpleMLP().to(DEVICE_TYPE) + piecewise_wrapper = CUDAGraphWrapper( + inner_model, self.vllm_config, CUDAGraphMode.PIECEWISE + ) + inner_model.forward = MagicMock(wraps=inner_model.forward) + outer_model = SimpleMLP().to(DEVICE_TYPE) + # When outer model is called, it calls the piecewise_wrapper + outer_model.forward = MagicMock( + wraps=outer_model.forward, side_effect=piecewise_wrapper + ) + full_wrapper = CUDAGraphWrapper( + outer_model, self.vllm_config, CUDAGraphMode.FULL + ) -@create_new_process_for_each_test("spawn") -@pytest.mark.skipif(not current_platform.is_cuda(), reason="Skip if not cuda") -def test_nested_wrappers(): - """Tests a scenario with a PIECEWISE wrapper inside a FULL one.""" - comp_config = CompilationConfig( - mode=CompilationMode.VLLM_COMPILE, - cudagraph_mode="FULL", - cudagraph_capture_sizes=[1], - ) - vllm_config = _create_vllm_config(comp_config) - dispatcher = CudagraphDispatcher(vllm_config) - dispatcher.initialize_cudagraph_keys( - comp_config.cudagraph_mode, uniform_decode_query_len=1 - ) - model = SimpleMLP().to(DEVICE_TYPE) - full_wrapper = CUDAGraphWrapper(model, vllm_config, CUDAGraphMode.FULL) - input_1 = torch.randn(1, 10, device=DEVICE_TYPE) - - # Setup: Inner model is wrapped with PIECEWISE, outer with FULL - inner_model = SimpleMLP().to(DEVICE_TYPE) - piecewise_wrapper = CUDAGraphWrapper( - inner_model, vllm_config, CUDAGraphMode.PIECEWISE - ) - inner_model.forward = MagicMock(wraps=inner_model.forward) - outer_model = SimpleMLP().to(DEVICE_TYPE) - # When outer model is called, it calls the piecewise_wrapper - outer_model.forward = MagicMock( - wraps=outer_model.forward, side_effect=piecewise_wrapper - ) - full_wrapper = CUDAGraphWrapper(outer_model, vllm_config, CUDAGraphMode.FULL) - - desc_1 = BatchDescriptor(num_tokens=1) + desc_1 = BatchDescriptor(num_tokens=1) - # 0. global warmup - with set_forward_context( - attn_metadata=None, - vllm_config=vllm_config, - cudagraph_runtime_mode=CUDAGraphMode.NONE, - batch_descriptor=None, - ): - full_wrapper(input_1) - - # --- Test runtime mode FULL--- - # Run with FULL mode context. Expect outer wrapper to capture. - # The inner mock should be called once inside the graph capture. - outer_model.forward.reset_mock() - inner_model.forward.reset_mock() - action = _run_and_monitor_call( - full_wrapper, input_1, CUDAGraphMode.FULL, desc_1, vllm_config - ) - assert action == "capture_global" - assert outer_model.forward.call_count == 1 - assert inner_model.forward.call_count == 1 - - # Run again. Expect outer wrapper to replay. - # The outer model should NOT be called because the whole graph - # is replayed. - action = _run_and_monitor_call( - full_wrapper, input_1, CUDAGraphMode.FULL, desc_1, vllm_config - ) - assert action == "replay" - assert outer_model.forward.call_count == 1 # No new call - assert inner_model.forward.call_count == 1 - - # --- Test runtime mode PIECEWISE --- - outer_model.forward.reset_mock() - inner_model.forward.reset_mock() - # Run with PIECEWISE mode context. - # Expect outer wrapper to bypass and call inner wrapper. - # Inner wrapper should capture. - action = _run_and_monitor_call( - full_wrapper, input_1, CUDAGraphMode.PIECEWISE, desc_1, vllm_config - ) - assert action == "capture_global" - assert outer_model.forward.call_count == 1 - assert inner_model.forward.call_count == 1 - - # Run again with PIECEWISE. - # Outer bypasses, inner replays. - action = _run_and_monitor_call( - full_wrapper, input_1, CUDAGraphMode.PIECEWISE, desc_1, vllm_config - ) - assert action == "bypass" - assert outer_model.forward.call_count == 2 - assert inner_model.forward.call_count == 1 + # 0. global warmup + with set_forward_context( + attn_metadata=None, + vllm_config=self.vllm_config, + cudagraph_runtime_mode=CUDAGraphMode.NONE, + batch_descriptor=None, + ): + full_wrapper(input_1) + + # --- Test runtime mode FULL--- + # Run with FULL mode context. Expect outer wrapper to capture. + # The inner mock should be called once inside the graph capture. + outer_model.forward.reset_mock() + inner_model.forward.reset_mock() + action = self._run_and_monitor_call( + full_wrapper, input_1, CUDAGraphMode.FULL, desc_1 + ) + assert action == "capture_global" + assert outer_model.forward.call_count == 1 + assert inner_model.forward.call_count == 1 + + # Run again. Expect outer wrapper to replay. + # The outer model should NOT be called because the whole graph + # is replayed. + action = self._run_and_monitor_call( + full_wrapper, input_1, CUDAGraphMode.FULL, desc_1 + ) + assert action == "replay" + assert outer_model.forward.call_count == 1 # No new call + assert inner_model.forward.call_count == 1 + + # --- Test runtime mode PIECEWISE --- + outer_model.forward.reset_mock() + inner_model.forward.reset_mock() + # Run with PIECEWISE mode context. + # Expect outer wrapper to bypass and call inner wrapper. + # Inner wrapper should capture. + action = self._run_and_monitor_call( + full_wrapper, input_1, CUDAGraphMode.PIECEWISE, desc_1 + ) + assert action == "capture_global" + assert outer_model.forward.call_count == 1 + assert inner_model.forward.call_count == 1 + + # Run again with PIECEWISE. + # Outer bypasses, inner replays. + action = self._run_and_monitor_call( + full_wrapper, input_1, CUDAGraphMode.PIECEWISE, desc_1 + ) + assert action == "bypass" + assert outer_model.forward.call_count == 2 + assert inner_model.forward.call_count == 1 diff --git a/tests/v1/logits_processors/test_custom_online.py b/tests/v1/logits_processors/test_custom_online.py index 93825c65bc92..3dc6b8979015 100644 --- a/tests/v1/logits_processors/test_custom_online.py +++ b/tests/v1/logits_processors/test_custom_online.py @@ -120,11 +120,12 @@ async def client(server): @create_new_process_for_each_test() +@pytest.mark.asyncio @pytest.mark.parametrize( "model_name", [MODEL_NAME], ) -def test_custom_logitsprocs(server, model_name: str): +async def test_custom_logitsprocs(client: openai.AsyncOpenAI, model_name: str): """Test custom logitsprocs when starting OpenAI server from CLI Launch vLLM OpenAI-compatible server, configured to load a custom logitproc @@ -138,45 +139,36 @@ def test_custom_logitsprocs(server, model_name: str): token """ - import asyncio + use_dummy_logitproc = True + for prompt in prompts: + # Build request arguments + request_keyword_args: dict[str, Any] = { + **api_keyword_args, + } + if use_dummy_logitproc: + # 50% of requests pass target_token custom arg + target_token = random.choice([128, 67]) + # For requests which activate the dummy logitproc, choose one of + # two `target_token` values which are known not to be EOS tokens + request_keyword_args["extra_body"] = { + "vllm_xargs": {DUMMY_LOGITPROC_ARG: target_token} + } + batch = await client.completions.create( + model=model_name, + prompt=prompt, + **request_keyword_args, + ) - async def _async_main(srv, mn): - async with srv.get_async_client() as client: - await _run(client) + if use_dummy_logitproc: + # Only for requests which activate dummy logitproc - validate that + # output token is repeated + choices: openai.types.CompletionChoice = batch.choices + toks = choices[0].logprobs.tokens + if not all([x == toks[0] for x in toks]): + raise AssertionError(f"Generated {toks} should all be {toks[0]}") - async def _run(client): - use_dummy_logitproc = True - for prompt in prompts: - # Build request arguments - request_keyword_args: dict[str, Any] = { - **api_keyword_args, - } - if use_dummy_logitproc: - # 50% of requests pass target_token custom arg - target_token = random.choice([128, 67]) - # For requests which activate the dummy logitproc, choose one of - # two `target_token` values which are known not to be EOS tokens - request_keyword_args["extra_body"] = { - "vllm_xargs": {DUMMY_LOGITPROC_ARG: target_token} - } - batch = await client.completions.create( - model=model_name, - prompt=prompt, - **request_keyword_args, - ) - - if use_dummy_logitproc: - # Only for requests which activate dummy logitproc - validate that - # output token is repeated - choices: openai.types.CompletionChoice = batch.choices - toks = choices[0].logprobs.tokens - if not all([x == toks[0] for x in toks]): - raise AssertionError(f"Generated {toks} should all be {toks[0]}") - - # Alternate whether to activate dummy logitproc for each request - use_dummy_logitproc = not use_dummy_logitproc - - asyncio.run(_async_main(server, model_name)) + # Alternate whether to activate dummy logitproc for each request + use_dummy_logitproc = not use_dummy_logitproc @pytest.mark.asyncio