Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ if(VLLM_GPU_LANG STREQUAL "CUDA")
SET(CUTLASS_ENABLE_HEADERS_ONLY ON CACHE BOOL "Enable only the header library")

# Set CUTLASS_REVISION. Used for FetchContent. Also fixes some bogus messages when building.
set(CUTLASS_REVISION "v4.2.1")
set(CUTLASS_REVISION "v4.3.5")

# Use the specified CUTLASS source directory for compilation if VLLM_CUTLASS_SRC_DIR is provided
if (DEFINED ENV{VLLM_CUTLASS_SRC_DIR})
Expand Down
114 changes: 59 additions & 55 deletions tests/distributed/test_torchrun_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,66 @@
import torch.distributed as dist

from vllm import LLM, SamplingParams
from vllm.distributed import destroy_distributed_environment
from vllm.distributed.parallel_state import get_world_group

dist.init_process_group(backend="gloo")

# Create prompts
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]

sampling_params = SamplingParams(temperature=0.8, top_p=0.95)

# set different `gpu_memory_utilization` and `swap_space` for different ranks,
# to test if all ranks agree on the same kv cache configuration.
llm = LLM(
model="facebook/opt-125m",
tensor_parallel_size=2,
pipeline_parallel_size=int(os.getenv("PP_SIZE", 1)),
distributed_executor_backend="external_launcher",
gpu_memory_utilization=random.uniform(0.7, 0.9),
swap_space=random.randint(1, 4),
seed=0,
)

outputs = llm.generate(prompts, sampling_params)

cpu_group = get_world_group().cpu_group

torch_rank = dist.get_rank(group=cpu_group)


def test_consistent_across_ranks(obj):
if torch_rank == 0:
dist.broadcast_object_list([obj], src=0, group=cpu_group)
else:
container = [None]
dist.broadcast_object_list(container, src=0, group=cpu_group)
assert container[0] == obj


test_consistent_across_ranks(llm.llm_engine.vllm_config.cache_config.num_cpu_blocks)
test_consistent_across_ranks(llm.llm_engine.vllm_config.cache_config.num_gpu_blocks)

# make sure we can access the model parameters from the calling process
# of the `LLM` instance.
params = list(
llm.llm_engine.model_executor.driver_worker.worker.model_runner.model.parameters()
)
test_consistent_across_ranks(len(params))

# all ranks should have the same outputs
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
test_consistent_across_ranks(prompt)
test_consistent_across_ranks(generated_text)
print(f"Rank {torch_rank}, Prompt: {prompt!r}, Generated text: {generated_text!r}")
try:
# Create prompts
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]

sampling_params = SamplingParams(temperature=0.8, top_p=0.95)

# set different `gpu_memory_utilization` and `swap_space` for different ranks,
# to test if all ranks agree on the same kv cache configuration.
llm = LLM(
model="facebook/opt-125m",
tensor_parallel_size=2,
pipeline_parallel_size=int(os.getenv("PP_SIZE", 1)),
distributed_executor_backend="external_launcher",
gpu_memory_utilization=random.uniform(0.7, 0.9),
swap_space=random.randint(1, 4),
seed=0,
)

outputs = llm.generate(prompts, sampling_params)

cpu_group = get_world_group().cpu_group

torch_rank = dist.get_rank(group=cpu_group)

def test_consistent_across_ranks(obj):
if torch_rank == 0:
dist.broadcast_object_list([obj], src=0, group=cpu_group)
else:
container = [None]
dist.broadcast_object_list(container, src=0, group=cpu_group)
assert container[0] == obj

test_consistent_across_ranks(llm.llm_engine.vllm_config.cache_config.num_cpu_blocks)
test_consistent_across_ranks(llm.llm_engine.vllm_config.cache_config.num_gpu_blocks)

# make sure we can access the model parameters from the calling process
# of the `LLM` instance.
params = list(
llm.llm_engine.model_executor.driver_worker.worker.model_runner.model.parameters()
)
test_consistent_across_ranks(len(params))

# all ranks should have the same outputs
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
test_consistent_across_ranks(prompt)
test_consistent_across_ranks(generated_text)
print(
f"Rank {torch_rank}, Prompt: {prompt!r}, Generated text: {generated_text!r}"
)
finally:
destroy_distributed_environment()
29 changes: 19 additions & 10 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import asyncio
import contextlib
import copy
import functools
import importlib
Expand Down Expand Up @@ -977,13 +976,15 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None:
signal.signal(signal.SIGTERM, old_signal_handler)
if _exitcode != 0:
# Try to read the exception from the child process
exc_info = {}
if os.path.exists(exc_file_path):
with (
contextlib.suppress(Exception),
open(exc_file_path, "rb") as f,
):
exc_info = cloudpickle.load(f)
exc_info: dict[str, Any] = {}
exc_file_existed = os.path.exists(exc_file_path)
load_error: str | None = None
if exc_file_existed:
try:
with open(exc_file_path, "rb") as f:
exc_info = cloudpickle.load(f)
except Exception as e:
load_error = f"{type(e).__name__}: {e}"

if (
original_exception := exc_info.get("pickled_exception")
Expand All @@ -1001,11 +1002,19 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None:
f" (exit code: {_exitcode}):\n{original_tb}"
) from None

# Fallback to the original generic error
# Fallback: no recoverable exception; include diagnostics
diag = (
f" exc_file_path={exc_file_path!r}"
f" exc_file_existed={exc_file_existed}"
)
if load_error is not None:
diag += f" load_error={load_error!r}"
raise AssertionError(
f"function {func.__name__} failed when called with"
f" args {args} and kwargs {kwargs}"
f" (exit code: {_exitcode})"
f" (exit code: {_exitcode});"
f" exception could not be recovered from child"
f"{diag}"
) from None

return wrapper
Expand Down
8 changes: 8 additions & 0 deletions tests/v1/engine/test_abort_final_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@

TEXT_PROMPT = "Hello"

# Register the test connector so it can be used by the engine
# This must be done before any test uses the connector
KVConnectorFactory.register_connector(
"DummyKVConnector",
"tests.v1.engine.test_abort_final_step",
"DummyKVConnector",
)


class DummyKVConnectorMetadata(KVConnectorMetadata):
"""Dummy metadata for the test connector."""
Expand Down
12 changes: 9 additions & 3 deletions tests/v1/kv_connector/unit/test_multi_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,14 @@ def test_multi_example_connector_consistency():

events = get_connector_events()
# get_num_new_matched_tokens and update_state_after_alloc will be called
# on each connector in turn.
assert events["storage1-SCHEDULER"][:3] == [
# on each connector in turn. With shared prefix (PROMPT_CONTEXT) between
# the two prompts, the second request gets a prefix cache hit, causing an
# extra get_num_new_matched_tokens call with the cached token count.
assert events["storage1-SCHEDULER"][:5] == [
"get_num_new_matched_tokens 0",
"update_state_after_alloc num_blocks=[0] 0",
"get_num_new_matched_tokens 96",
"update_state_after_alloc num_blocks=[0] 0",
"build_connector_meta",
]
assert events["storage1-WORKER"][:5] == [
Expand All @@ -204,9 +208,11 @@ def test_multi_example_connector_consistency():
"wait_for_layer_load",
"save_kv_layer",
]
assert events["storage2-SCHEDULER"][:3] == [
assert events["storage2-SCHEDULER"][:5] == [
"get_num_new_matched_tokens 0",
"update_state_after_alloc num_blocks=[0] 0",
"get_num_new_matched_tokens 96",
"update_state_after_alloc num_blocks=[0] 0",
"build_connector_meta",
]
assert events["storage2-WORKER"][:5] == [
Expand Down
5 changes: 5 additions & 0 deletions vllm/utils/system_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import signal
import sys
import threading
from collections.abc import Callable, Iterator
from pathlib import Path
from typing import TextIO
Expand Down Expand Up @@ -119,6 +120,10 @@ def _maybe_force_spawn():
return

reasons = []
if threading.active_count() > 1:
reasons.append(
"Process is multi-threaded; fork() can cause deadlocks in the child"
)
if is_in_ray_actor():
# even if we choose to spawn, we need to pass the ray address
# to the subprocess so that it knows how to connect to the ray cluster.
Expand Down