diff --git a/tests/utils.py b/tests/utils.py index e4b6a6ff6e70..81ac521d802c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -18,7 +18,7 @@ import time import warnings from collections.abc import Callable, Iterable, Sequence -from contextlib import ExitStack, contextmanager, suppress +from contextlib import ExitStack, contextmanager from multiprocessing import Process from pathlib import Path from typing import Any, Literal @@ -1414,52 +1414,65 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: def spawn_new_process_for_each_test(f: Callable[_P, None]) -> Callable[_P, None]: - """Decorator to spawn a new process for each test function.""" + """Decorator to spawn a new process for each test function. + + Uses subprocess with cloudpickle to serialize the test function and + propagates exceptions back to the parent, so test failures are never + silently swallowed (fixes https://github.com/vllm-project/vllm/issues/41415). + """ @functools.wraps(f) def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None: - # Check if we're already in a subprocess - if os.environ.get("RUNNING_IN_SUBPROCESS") == "1": - # If we are, just run the function directly - return f(*args, **kwargs) - - import torch.multiprocessing as mp - - with suppress(RuntimeError): - mp.set_start_method("spawn") - - # Get the module - module_name = f.__module__ - - # Create a process with environment variable set - env = os.environ.copy() - env["RUNNING_IN_SUBPROCESS"] = "1" - - with tempfile.TemporaryDirectory() as tempdir: - output_filepath = os.path.join(tempdir, "new_process.tmp") + with tempfile.NamedTemporaryFile(delete=False, suffix=".tb", mode="wb") as tmp: + tb_file = tmp.name - # `cloudpickle` allows pickling complex functions directly - input_bytes = cloudpickle.dumps((f, output_filepath)) + try: + # Serialize the function + args with cloudpickle so closures work + payload = cloudpickle.dumps((f, args, kwargs, tb_file)) + + child_script = ( + "import sys, cloudpickle, traceback\n" + "try:\n" + " from _pytest.outcomes import Skipped\n" + "except ImportError:\n" + " class Skipped(BaseException): pass\n" + "f, args, kwargs, tb_file = " + "cloudpickle.loads(sys.stdin.buffer.read())\n" + "try:\n" + " f(*args, **kwargs)\n" + "except Skipped:\n" + " sys.exit(0)\n" + "except BaseException:\n" + " open(tb_file, 'w').write(traceback.format_exc())\n" + " sys.exit(1)\n" + ) repo_root = str(VLLM_PATH.resolve()) - - env = dict(env or os.environ) + env = os.environ.copy() env["PYTHONPATH"] = repo_root + os.pathsep + env.get("PYTHONPATH", "") - cmd = [sys.executable, "-m", f"{module_name}"] - - returned = subprocess.run( - cmd, input=input_bytes, capture_output=True, env=env + result = subprocess.run( + [sys.executable, "-c", child_script], + input=payload, + capture_output=True, + env=env, ) - # check if the subprocess is successful - try: - returned.check_returncode() - except Exception as e: - # wrap raised exception to provide more information + if result.returncode != 0: + # Read traceback written by child, fall back to stderr + tb = "" + if os.path.exists(tb_file) and os.path.getsize(tb_file) > 0: + with open(tb_file) as fp: + tb = fp.read() + else: + tb = result.stderr.decode() raise RuntimeError( - f"Error raised in subprocess:\n{returned.stderr.decode()}" - ) from e + f"Test subprocess '{f.__name__}' failed " + f"(exit code {result.returncode}):\n{tb}" + ) + finally: + with contextlib.suppress(OSError): + os.remove(tb_file) return wrapper diff --git a/tests/utils_/test_spawn_decorator.py b/tests/utils_/test_spawn_decorator.py new file mode 100644 index 000000000000..1740ea30de94 --- /dev/null +++ b/tests/utils_/test_spawn_decorator.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Tests for spawn_new_process_for_each_test decorator.""" + +import pytest + +from tests.utils import spawn_new_process_for_each_test + + +@spawn_new_process_for_each_test +def test_spawn_decorator_passing(): + """Passing function should complete normally.""" + assert 1 + 1 == 2 + + +@pytest.mark.xfail(raises=RuntimeError, strict=True) +@spawn_new_process_for_each_test +def test_spawn_decorator_failure_is_caught(): + """Failing function should raise RuntimeError, never silently pass.""" + raise ValueError("intentional failure") + + +@spawn_new_process_for_each_test +def test_spawn_decorator_skip(): + """pytest.skip inside subprocess should propagate correctly.""" + pytest.skip("intentional skip") + + +@spawn_new_process_for_each_test +@pytest.mark.parametrize("x,y,expected", [(1, 2, 3), (0, 0, 0)]) +def test_spawn_decorator_parametrized(x, y, expected): + """Args and kwargs must be forwarded correctly to subprocess.""" + assert x + y == expected diff --git a/tests/v1/cudagraph/test_cudagraph_dispatch.py b/tests/v1/cudagraph/test_cudagraph_dispatch.py index 66e6d7dd4605..97b5fd46a2eb 100644 --- a/tests/v1/cudagraph/test_cudagraph_dispatch.py +++ b/tests/v1/cudagraph/test_cudagraph_dispatch.py @@ -371,193 +371,200 @@ def test_bypass_on_mode_none(self): assert not wrapper.concrete_cudagraph_entries -@pytest.mark.skipif(not current_platform.is_cuda(), reason="Skip if not cuda") -class TestCudagraphIntegration: - def setup_method(self): - # only FULL mode for non-uniform batches - self.comp_config = CompilationConfig( - mode=CompilationMode.VLLM_COMPILE, - cudagraph_mode="FULL", - cudagraph_capture_sizes=[10, 20], - ) - self.vllm_config = _create_vllm_config(self.comp_config) - self.dispatcher = CudagraphDispatcher(self.vllm_config) - self.dispatcher.initialize_cudagraph_keys( - self.comp_config.cudagraph_mode, uniform_decode_query_len=1 - ) - - def _run_and_monitor_call( - self, wrapper, input_tensor, runtime_mode, batch_descriptor +def _run_and_monitor_call( + wrapper, input_tensor, runtime_mode, batch_descriptor, vllm_config +): + """Helper to run a single call and monitor the action.""" + + with ( + patch("torch.cuda.graph", wraps=torch.cuda.graph) as mock_graph_context, + patch.object(wrapper, "runnable", wraps=wrapper.runnable) as mock_runnable, ): - """Helper to run a single call and monitor the action.""" - - with ( - patch("torch.cuda.graph", wraps=torch.cuda.graph) as mock_graph_context, - patch.object(wrapper, "runnable", wraps=wrapper.runnable) as mock_runnable, - ): - entry = wrapper.concrete_cudagraph_entries.get(batch_descriptor, None) - - context = set_forward_context( - attn_metadata=None, - vllm_config=self.vllm_config, - cudagraph_runtime_mode=runtime_mode, - batch_descriptor=batch_descriptor, - ) - mock_replay = MagicMock() - if entry and entry.cudagraph: - with ( - context, - patch.object( - entry.cudagraph, "replay", new_callable=MagicMock - ) as mock_replay, - ): - wrapper(input_tensor) - else: - with context: - wrapper(input_tensor) - - if mock_graph_context.called: - # note that this is globally mocked, so it will be detected - # even whether called by the inner or outer wrapper - return "capture_global" - if mock_replay.called: - # only for outer wrapper - return "replay" - if mock_runnable.call_count > 0: - # only for outer wrapper - return "bypass" - return "unknown" - - @create_new_process_for_each_test("spawn") - def test_capture_replay_bypass_logic(self): - model = SimpleMLP().to(DEVICE_TYPE) - full_wrapper = CUDAGraphWrapper(model, self.vllm_config, CUDAGraphMode.FULL) - max_bs = 16 - persistent_input_buffer = torch.zeros(max_bs, 10, device=DEVICE_TYPE) - input_1 = persistent_input_buffer[:1] - input_2 = persistent_input_buffer[:2] - input_3 = persistent_input_buffer[:3] - - desc_1 = BatchDescriptor(num_tokens=1) - desc_2 = BatchDescriptor(num_tokens=2) - desc_3_unseen = BatchDescriptor(num_tokens=3) + entry = wrapper.concrete_cudagraph_entries.get(batch_descriptor, None) - # 0. global warmup - with set_forward_context( + context = set_forward_context( attn_metadata=None, - vllm_config=self.vllm_config, - cudagraph_runtime_mode=CUDAGraphMode.NONE, - batch_descriptor=None, - ): - full_wrapper(input_1) + vllm_config=vllm_config, + cudagraph_runtime_mode=runtime_mode, + batch_descriptor=batch_descriptor, + ) + mock_replay = MagicMock() + if entry and entry.cudagraph: + with ( + context, + patch.object( + entry.cudagraph, "replay", new_callable=MagicMock + ) as mock_replay, + ): + wrapper(input_tensor) + else: + with context: + wrapper(input_tensor) + + if mock_graph_context.called: + # note that this is globally mocked, so it will be detected + # even whether called by the inner or outer wrapper + return "capture_global" + if mock_replay.called: + # only for outer wrapper + return "replay" + if mock_runnable.call_count > 0: + # only for outer wrapper + return "bypass" + return "unknown" + + +@create_new_process_for_each_test("spawn") +@pytest.mark.skipif(not current_platform.is_cuda(), reason="Skip if not cuda") +def test_capture_replay_bypass_logic(): + comp_config = CompilationConfig( + mode=CompilationMode.VLLM_COMPILE, + cudagraph_mode="FULL", + cudagraph_capture_sizes=[1, 2], + ) + vllm_config = _create_vllm_config(comp_config) + dispatcher = CudagraphDispatcher(vllm_config) + dispatcher.initialize_cudagraph_keys( + comp_config.cudagraph_mode, uniform_decode_query_len=1 + ) + model = SimpleMLP().to(DEVICE_TYPE) + full_wrapper = CUDAGraphWrapper(model, vllm_config, CUDAGraphMode.FULL) + max_bs = 16 + persistent_input_buffer = torch.zeros(max_bs, 10, device=DEVICE_TYPE) + input_1 = persistent_input_buffer[:1] + input_2 = persistent_input_buffer[:2] + input_3 = persistent_input_buffer[:3] + + desc_1 = BatchDescriptor(num_tokens=1) + desc_2 = BatchDescriptor(num_tokens=2) + desc_3_unseen = BatchDescriptor(num_tokens=3) + + # 0. global warmup + with set_forward_context( + attn_metadata=None, + vllm_config=vllm_config, + cudagraph_runtime_mode=CUDAGraphMode.NONE, + batch_descriptor=None, + ): + full_wrapper(input_1) - rt_mode, key = self.dispatcher.dispatch(num_tokens=desc_1.num_tokens) - # 1. Capture first shape - action = self._run_and_monitor_call(full_wrapper, input_1, rt_mode, key) - assert action == "capture_global" + rt_mode, key = dispatcher.dispatch(num_tokens=desc_1.num_tokens) + # 1. Capture first shape + action = _run_and_monitor_call(full_wrapper, input_1, rt_mode, key, vllm_config) + assert action == "capture_global" - # 2. Replay first shape - action = self._run_and_monitor_call(full_wrapper, input_1, rt_mode, key) - assert action == "replay" + # 2. Replay first shape + action = _run_and_monitor_call(full_wrapper, input_1, rt_mode, key, vllm_config) + assert action == "replay" - rt_mode, key = self.dispatcher.dispatch(num_tokens=desc_2.num_tokens) - # 3. Capture second shape - action = self._run_and_monitor_call(full_wrapper, input_2, rt_mode, key) - assert action == "capture_global" + rt_mode, key = dispatcher.dispatch(num_tokens=desc_2.num_tokens) + # 3. Capture second shape + action = _run_and_monitor_call(full_wrapper, input_2, rt_mode, key, vllm_config) + assert action == "capture_global" - # 4. Replay second shape - action = self._run_and_monitor_call( - full_wrapper, input_2, CUDAGraphMode.FULL, desc_2 + # 4. Replay second shape + action = _run_and_monitor_call( + full_wrapper, input_2, CUDAGraphMode.FULL, key, vllm_config + ) + assert action == "replay" + + # 5. Bypass if no key match + rt_mode, key = dispatcher.dispatch(num_tokens=desc_3_unseen.num_tokens) + assert rt_mode == CUDAGraphMode.NONE + action = _run_and_monitor_call(full_wrapper, input_3, rt_mode, key, vllm_config) + assert action == "bypass" + + # capture unseen shape is not allowed after disable + set_cudagraph_capturing_enabled(False) + with pytest.raises(RuntimeError): + _run_and_monitor_call( + full_wrapper, input_3, CUDAGraphMode.FULL, desc_3_unseen, vllm_config ) - assert action == "replay" + set_cudagraph_capturing_enabled(True) - # 5. Bypass if no key match - rt_mode, key = self.dispatcher.dispatch(num_tokens=desc_3_unseen.num_tokens) - assert rt_mode == CUDAGraphMode.NONE - action = self._run_and_monitor_call(full_wrapper, input_3, rt_mode, key) - assert action == "bypass" - - # capture unseen shape is not allowed after disable - set_cudagraph_capturing_enabled(False) - with pytest.raises(RuntimeError): - self._run_and_monitor_call( - full_wrapper, input_3, CUDAGraphMode.FULL, desc_3_unseen - ) - set_cudagraph_capturing_enabled(True) - - @create_new_process_for_each_test("spawn") - def test_nested_wrappers(self): - """Tests a scenario with a PIECEWISE wrapper inside a FULL one.""" - model = SimpleMLP().to(DEVICE_TYPE) - full_wrapper = CUDAGraphWrapper(model, self.vllm_config, CUDAGraphMode.FULL) - input_1 = torch.randn(1, 10, device=DEVICE_TYPE) - - # Setup: Inner model is wrapped with PIECEWISE, outer with FULL - inner_model = SimpleMLP().to(DEVICE_TYPE) - piecewise_wrapper = CUDAGraphWrapper( - inner_model, self.vllm_config, CUDAGraphMode.PIECEWISE - ) - inner_model.forward = MagicMock(wraps=inner_model.forward) - outer_model = SimpleMLP().to(DEVICE_TYPE) - # When outer model is called, it calls the piecewise_wrapper - outer_model.forward = MagicMock( - wraps=outer_model.forward, side_effect=piecewise_wrapper - ) - full_wrapper = CUDAGraphWrapper( - outer_model, self.vllm_config, CUDAGraphMode.FULL - ) - desc_1 = BatchDescriptor(num_tokens=1) +@create_new_process_for_each_test("spawn") +@pytest.mark.skipif(not current_platform.is_cuda(), reason="Skip if not cuda") +def test_nested_wrappers(): + """Tests a scenario with a PIECEWISE wrapper inside a FULL one.""" + comp_config = CompilationConfig( + mode=CompilationMode.VLLM_COMPILE, + cudagraph_mode="FULL", + cudagraph_capture_sizes=[1], + ) + vllm_config = _create_vllm_config(comp_config) + dispatcher = CudagraphDispatcher(vllm_config) + dispatcher.initialize_cudagraph_keys( + comp_config.cudagraph_mode, uniform_decode_query_len=1 + ) + model = SimpleMLP().to(DEVICE_TYPE) + full_wrapper = CUDAGraphWrapper(model, vllm_config, CUDAGraphMode.FULL) + input_1 = torch.randn(1, 10, device=DEVICE_TYPE) + + # Setup: Inner model is wrapped with PIECEWISE, outer with FULL + inner_model = SimpleMLP().to(DEVICE_TYPE) + piecewise_wrapper = CUDAGraphWrapper( + inner_model, vllm_config, CUDAGraphMode.PIECEWISE + ) + inner_model.forward = MagicMock(wraps=inner_model.forward) + outer_model = SimpleMLP().to(DEVICE_TYPE) + # When outer model is called, it calls the piecewise_wrapper + outer_model.forward = MagicMock( + wraps=outer_model.forward, side_effect=piecewise_wrapper + ) + full_wrapper = CUDAGraphWrapper(outer_model, vllm_config, CUDAGraphMode.FULL) - # 0. global warmup - with set_forward_context( - attn_metadata=None, - vllm_config=self.vllm_config, - cudagraph_runtime_mode=CUDAGraphMode.NONE, - batch_descriptor=None, - ): - full_wrapper(input_1) - - # --- Test runtime mode FULL--- - # Run with FULL mode context. Expect outer wrapper to capture. - # The inner mock should be called once inside the graph capture. - outer_model.forward.reset_mock() - inner_model.forward.reset_mock() - action = self._run_and_monitor_call( - full_wrapper, input_1, CUDAGraphMode.FULL, desc_1 - ) - assert action == "capture_global" - assert outer_model.forward.call_count == 1 - assert inner_model.forward.call_count == 1 - - # Run again. Expect outer wrapper to replay. - # The outer model should NOT be called because the whole graph - # is replayed. - action = self._run_and_monitor_call( - full_wrapper, input_1, CUDAGraphMode.FULL, desc_1 - ) - assert action == "replay" - assert outer_model.forward.call_count == 1 # No new call - assert inner_model.forward.call_count == 1 - - # --- Test runtime mode PIECEWISE --- - outer_model.forward.reset_mock() - inner_model.forward.reset_mock() - # Run with PIECEWISE mode context. - # Expect outer wrapper to bypass and call inner wrapper. - # Inner wrapper should capture. - action = self._run_and_monitor_call( - full_wrapper, input_1, CUDAGraphMode.PIECEWISE, desc_1 - ) - assert action == "capture_global" - assert outer_model.forward.call_count == 1 - assert inner_model.forward.call_count == 1 - - # Run again with PIECEWISE. - # Outer bypasses, inner replays. - action = self._run_and_monitor_call( - full_wrapper, input_1, CUDAGraphMode.PIECEWISE, desc_1 - ) - assert action == "bypass" - assert outer_model.forward.call_count == 2 - assert inner_model.forward.call_count == 1 + desc_1 = BatchDescriptor(num_tokens=1) + + # 0. global warmup + with set_forward_context( + attn_metadata=None, + vllm_config=vllm_config, + cudagraph_runtime_mode=CUDAGraphMode.NONE, + batch_descriptor=None, + ): + full_wrapper(input_1) + + # --- Test runtime mode FULL--- + # Run with FULL mode context. Expect outer wrapper to capture. + # The inner mock should be called once inside the graph capture. + outer_model.forward.reset_mock() + inner_model.forward.reset_mock() + action = _run_and_monitor_call( + full_wrapper, input_1, CUDAGraphMode.FULL, desc_1, vllm_config + ) + assert action == "capture_global" + assert outer_model.forward.call_count == 1 + assert inner_model.forward.call_count == 1 + + # Run again. Expect outer wrapper to replay. + # The outer model should NOT be called because the whole graph + # is replayed. + action = _run_and_monitor_call( + full_wrapper, input_1, CUDAGraphMode.FULL, desc_1, vllm_config + ) + assert action == "replay" + assert outer_model.forward.call_count == 1 # No new call + assert inner_model.forward.call_count == 1 + + # --- Test runtime mode PIECEWISE --- + outer_model.forward.reset_mock() + inner_model.forward.reset_mock() + # Run with PIECEWISE mode context. + # Expect outer wrapper to bypass and call inner wrapper. + # Inner wrapper should capture. + action = _run_and_monitor_call( + full_wrapper, input_1, CUDAGraphMode.PIECEWISE, desc_1, vllm_config + ) + assert action == "capture_global" + assert outer_model.forward.call_count == 1 + assert inner_model.forward.call_count == 1 + + # Run again with PIECEWISE. + # Outer bypasses, inner replays. + action = _run_and_monitor_call( + full_wrapper, input_1, CUDAGraphMode.PIECEWISE, desc_1, vllm_config + ) + assert action == "bypass" + assert outer_model.forward.call_count == 2 + assert inner_model.forward.call_count == 1 diff --git a/tests/v1/logits_processors/test_custom_online.py b/tests/v1/logits_processors/test_custom_online.py index 3dc6b8979015..93825c65bc92 100644 --- a/tests/v1/logits_processors/test_custom_online.py +++ b/tests/v1/logits_processors/test_custom_online.py @@ -120,12 +120,11 @@ async def client(server): @create_new_process_for_each_test() -@pytest.mark.asyncio @pytest.mark.parametrize( "model_name", [MODEL_NAME], ) -async def test_custom_logitsprocs(client: openai.AsyncOpenAI, model_name: str): +def test_custom_logitsprocs(server, model_name: str): """Test custom logitsprocs when starting OpenAI server from CLI Launch vLLM OpenAI-compatible server, configured to load a custom logitproc @@ -139,36 +138,45 @@ async def test_custom_logitsprocs(client: openai.AsyncOpenAI, model_name: str): token """ - use_dummy_logitproc = True - for prompt in prompts: - # Build request arguments - request_keyword_args: dict[str, Any] = { - **api_keyword_args, - } - if use_dummy_logitproc: - # 50% of requests pass target_token custom arg - target_token = random.choice([128, 67]) - # For requests which activate the dummy logitproc, choose one of - # two `target_token` values which are known not to be EOS tokens - request_keyword_args["extra_body"] = { - "vllm_xargs": {DUMMY_LOGITPROC_ARG: target_token} - } - batch = await client.completions.create( - model=model_name, - prompt=prompt, - **request_keyword_args, - ) + import asyncio - if use_dummy_logitproc: - # Only for requests which activate dummy logitproc - validate that - # output token is repeated - choices: openai.types.CompletionChoice = batch.choices - toks = choices[0].logprobs.tokens - if not all([x == toks[0] for x in toks]): - raise AssertionError(f"Generated {toks} should all be {toks[0]}") + async def _async_main(srv, mn): + async with srv.get_async_client() as client: + await _run(client) - # Alternate whether to activate dummy logitproc for each request - use_dummy_logitproc = not use_dummy_logitproc + async def _run(client): + use_dummy_logitproc = True + for prompt in prompts: + # Build request arguments + request_keyword_args: dict[str, Any] = { + **api_keyword_args, + } + if use_dummy_logitproc: + # 50% of requests pass target_token custom arg + target_token = random.choice([128, 67]) + # For requests which activate the dummy logitproc, choose one of + # two `target_token` values which are known not to be EOS tokens + request_keyword_args["extra_body"] = { + "vllm_xargs": {DUMMY_LOGITPROC_ARG: target_token} + } + batch = await client.completions.create( + model=model_name, + prompt=prompt, + **request_keyword_args, + ) + + if use_dummy_logitproc: + # Only for requests which activate dummy logitproc - validate that + # output token is repeated + choices: openai.types.CompletionChoice = batch.choices + toks = choices[0].logprobs.tokens + if not all([x == toks[0] for x in toks]): + raise AssertionError(f"Generated {toks} should all be {toks[0]}") + + # Alternate whether to activate dummy logitproc for each request + use_dummy_logitproc = not use_dummy_logitproc + + asyncio.run(_async_main(server, model_name)) @pytest.mark.asyncio