Skip to content
50 changes: 27 additions & 23 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,31 @@ def get_physical_device_indices(devices):


@_nvml()
def check_gpu_memory_usage(devices: list[int]) -> dict[int, tuple[float, float]]:
# Use nvml instead of pytorch to reduce measurement error from torch cuda
# context.
usage_by_device: dict[int, tuple[float, float]] = {}
for device in devices:
if current_platform.is_rocm():
dev_handle = amdsmi_get_processor_handles()[device]
mem_info = amdsmi_get_gpu_vram_usage(dev_handle)
gb_used = mem_info["vram_used"] / 2**10
gb_total = mem_info["vram_total"] / 2**10
else:
dev_handle = nvmlDeviceGetHandleByIndex(device)
mem_info = nvmlDeviceGetMemoryInfo(dev_handle)
gb_used = mem_info.used / 2**30
gb_total = mem_info.total / 2**30
usage_by_device[device] = (gb_used, gb_total)

print("gpu memory used/total (GiB): ", end="")
for device, (gb_used, gb_total) in usage_by_device.items():
print(f"{device}={gb_used:.02f}/{gb_total:.02f}; ", end="")
print("")

return usage_by_device


def wait_for_gpu_memory_to_clear(
*,
devices: list[int],
Expand All @@ -808,31 +833,10 @@ def wait_for_gpu_memory_to_clear(
timeout_s: float = 120,
) -> None:
assert threshold_bytes is not None or threshold_ratio is not None
# Use nvml instead of pytorch to reduce measurement error from torch cuda
# context.
devices = get_physical_device_indices(devices)
start_time = time.time()
while True:
output: dict[int, str] = {}
output_raw: dict[int, tuple[float, float]] = {}
for device in devices:
if current_platform.is_rocm():
dev_handle = amdsmi_get_processor_handles()[device]
mem_info = amdsmi_get_gpu_vram_usage(dev_handle)
gb_used = mem_info["vram_used"] / 2**10
gb_total = mem_info["vram_total"] / 2**10
else:
dev_handle = nvmlDeviceGetHandleByIndex(device)
mem_info = nvmlDeviceGetMemoryInfo(dev_handle)
gb_used = mem_info.used / 2**30
gb_total = mem_info.total / 2**30
output_raw[device] = (gb_used, gb_total)
output[device] = f"{gb_used:.02f}/{gb_total:.02f}"

print("gpu memory used/total (GiB): ", end="")
for k, v in output.items():
print(f"{k}={v}; ", end="")
print("")
usage_by_device = check_gpu_memory_usage(devices)

if threshold_bytes is not None:
is_free = lambda used, total: used <= threshold_bytes / 2**30
Expand All @@ -842,7 +846,7 @@ def wait_for_gpu_memory_to_clear(
threshold = f"{threshold_ratio:.2f}"

dur_s = time.time() - start_time
if all(is_free(used, total) for used, total in output_raw.values()):
if all(is_free(used, total) for used, total in usage_by_device.values()):
print(
f"Done waiting for free GPU memory on devices {devices=} "
f"({threshold=}) {dur_s=:.02f}"
Expand Down
91 changes: 63 additions & 28 deletions tests/v1/shutdown/test_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

import pytest

from tests.utils import wait_for_gpu_memory_to_clear
from tests.utils import (
check_gpu_memory_usage,
create_new_process_for_each_test,
wait_for_gpu_memory_to_clear,
)
from tests.v1.shutdown.utils import (
SHUTDOWN_TEST_THRESHOLD_BYTES,
SHUTDOWN_TEST_TIMEOUT_SEC,
Expand All @@ -19,7 +23,6 @@


@pytest.mark.asyncio
@pytest.mark.timeout(SHUTDOWN_TEST_TIMEOUT_SEC)
@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("tensor_parallel_size", [2, 1])
@pytest.mark.parametrize("send_one_request", [False, True])
Expand All @@ -37,6 +40,9 @@ async def test_async_llm_delete(
if cuda_device_count_stateless() < tensor_parallel_size:
pytest.skip(reason="Not enough CUDA devices")

devices = list(range(tensor_parallel_size))
check_gpu_memory_usage(devices)

engine_args = AsyncEngineArgs(
model=model, enforce_eager=True, tensor_parallel_size=tensor_parallel_size
)
Expand All @@ -57,52 +63,81 @@ async def test_async_llm_delete(

# Confirm all the processes are cleaned up.
wait_for_gpu_memory_to_clear(
devices=list(range(tensor_parallel_size)),
devices=devices,
threshold_bytes=SHUTDOWN_TEST_THRESHOLD_BYTES,
timeout_s=SHUTDOWN_TEST_TIMEOUT_SEC,
)


def _test_llm_delete(
model: str,
tensor_parallel_size: int,
send_one_request: bool,
) -> None:
devices = list(range(tensor_parallel_size))
check_gpu_memory_usage(devices)

# Instantiate LLM; make request to complete any deferred
# initialization; then delete instance
llm = LLM(
model=model, enforce_eager=True, tensor_parallel_size=tensor_parallel_size
)
if send_one_request:
llm.generate("Hello my name is", sampling_params=SamplingParams(max_tokens=1))
del llm

# Confirm all the processes are cleaned up.
wait_for_gpu_memory_to_clear(
devices=devices,
threshold_bytes=SHUTDOWN_TEST_THRESHOLD_BYTES,
timeout_s=SHUTDOWN_TEST_TIMEOUT_SEC,
)


@pytest.mark.timeout(SHUTDOWN_TEST_TIMEOUT_SEC)
@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("tensor_parallel_size", [2, 1])
@pytest.mark.parametrize("enable_multiprocessing", [True])
@pytest.mark.parametrize("send_one_request", [False, True])
def test_llm_delete(
monkeypatch,
model: str,
tensor_parallel_size: int,
enable_multiprocessing: bool,
send_one_request: bool,
) -> None:
"""Test that LLM frees GPU memory upon deletion.
TODO(andy) - LLM without multiprocessing.

Args:
model: model under test
tensor_parallel_size: degree of tensor parallelism
enable_multiprocessing: enable workers in separate process(es)
send_one_request: send one request to engine before deleting
"""
if cuda_device_count_stateless() < tensor_parallel_size:
pytest.skip(reason="Not enough CUDA devices")

with monkeypatch.context() as m:
MP_VALUE = "1" if enable_multiprocessing else "0"
m.setenv("VLLM_ENABLE_V1_MULTIPROCESSING", MP_VALUE)

# Instantiate LLM; make request to complete any deferred
# initialization; then delete instance
llm = LLM(
model=model, enforce_eager=True, tensor_parallel_size=tensor_parallel_size
)
if send_one_request:
llm.generate(
"Hello my name is", sampling_params=SamplingParams(max_tokens=1)
)
del llm

# Confirm all the processes are cleaned up.
wait_for_gpu_memory_to_clear(
devices=list(range(tensor_parallel_size)),
threshold_bytes=SHUTDOWN_TEST_THRESHOLD_BYTES,
)
monkeypatch.setenv("VLLM_ENABLE_V1_MULTIPROCESSING", "1")

_test_llm_delete(model, tensor_parallel_size, send_one_request)


@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("tensor_parallel_size", [2, 1])
@pytest.mark.parametrize("send_one_request", [False, True])
@create_new_process_for_each_test() # Avoid initing CUDA in this process with TP=1
def test_llm_delete_without_multiprocessing(
monkeypatch,
model: str,
tensor_parallel_size: int,
send_one_request: bool,
) -> None:
"""Test that LLM frees GPU memory upon deletion, without multiprocessing.

Args:
model: model under test
tensor_parallel_size: degree of tensor parallelism
send_one_request: send one request to engine before deleting
"""
if cuda_device_count_stateless() < tensor_parallel_size:
pytest.skip(reason="Not enough CUDA devices")

monkeypatch.setenv("VLLM_ENABLE_V1_MULTIPROCESSING", "0")

_test_llm_delete(model, tensor_parallel_size, send_one_request)
78 changes: 53 additions & 25 deletions tests/v1/shutdown/test_forward_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@

import pytest

from tests.utils import wait_for_gpu_memory_to_clear
from tests.utils import (
check_gpu_memory_usage,
create_new_process_for_each_test,
wait_for_gpu_memory_to_clear,
)
from tests.v1.shutdown.utils import (
SHUTDOWN_TEST_THRESHOLD_BYTES,
SHUTDOWN_TEST_TIMEOUT_SEC,
assert_mp_fork_context,
)
from vllm import LLM, AsyncEngineArgs, SamplingParams
from vllm.distributed import get_tensor_model_parallel_rank
Expand Down Expand Up @@ -51,7 +56,11 @@ async def test_async_llm_model_error(
if cuda_device_count_stateless() < tensor_parallel_size:
pytest.skip(reason="Not enough CUDA devices")

devices = list(range(tensor_parallel_size))
check_gpu_memory_usage(devices)

# Monkeypatch an error in the model.
assert_mp_fork_context()
monkeypatch.setattr(LlamaForCausalLM, "forward", evil_forward)

engine_args = AsyncEngineArgs(
Expand Down Expand Up @@ -89,7 +98,7 @@ async def generate(request_id: str):

# Confirm all the processes are cleaned up.
wait_for_gpu_memory_to_clear(
devices=list(range(tensor_parallel_size)),
devices=devices,
threshold_bytes=2 * 2**30,
timeout_s=60,
)
Expand All @@ -99,36 +108,55 @@ async def generate(request_id: str):
async_llm.shutdown()


def _test_llm_model_error(
monkeypatch, tensor_parallel_size: int, model: str, expected_exception: type
):
devices = list(range(tensor_parallel_size))
check_gpu_memory_usage(devices)

# Monkeypatch an error in the model.
assert_mp_fork_context()
monkeypatch.setattr(LlamaForCausalLM, "forward", evil_forward)

llm = LLM(
model=model, enforce_eager=True, tensor_parallel_size=tensor_parallel_size
)

with pytest.raises(expected_exception):
llm.generate("Hello my name is Robert and I")

# Confirm all the processes are cleaned up.
wait_for_gpu_memory_to_clear(
devices=devices,
threshold_bytes=SHUTDOWN_TEST_THRESHOLD_BYTES,
)


@pytest.mark.timeout(SHUTDOWN_TEST_TIMEOUT_SEC)
@pytest.mark.parametrize("enable_multiprocessing", [True])
@pytest.mark.parametrize("tensor_parallel_size", [2, 1])
@pytest.mark.parametrize("model", MODELS)
def test_llm_model_error(
monkeypatch, tensor_parallel_size: int, enable_multiprocessing: bool, model: str
) -> None:
"""Test that LLM propagates a forward pass error and frees memory.
TODO(andy) - LLM without multiprocessing; LLM with multiprocessing
and >1 rank
"""
def test_llm_model_error(monkeypatch, tensor_parallel_size: int, model: str) -> None:
"""Test that LLM propagates a forward pass error and frees memory."""
if cuda_device_count_stateless() < tensor_parallel_size:
pytest.skip(reason="Not enough CUDA devices")

with monkeypatch.context() as m:
MP_VALUE = "1" if enable_multiprocessing else "0"
m.setenv("VLLM_ENABLE_V1_MULTIPROCESSING", MP_VALUE)
monkeypatch.setenv("VLLM_ENABLE_V1_MULTIPROCESSING", "1")

# Monkeypatch an error in the model.
m.setattr(LlamaForCausalLM, "forward", evil_forward)
_test_llm_model_error(monkeypatch, tensor_parallel_size, model, EngineDeadError)

llm = LLM(
model=model, enforce_eager=True, tensor_parallel_size=tensor_parallel_size
)

with pytest.raises(EngineDeadError if enable_multiprocessing else Exception):
llm.generate("Hello my name is Robert and I")
@pytest.mark.skip(reason="FIXME - this is currently broken")
@pytest.mark.timeout(SHUTDOWN_TEST_TIMEOUT_SEC)
@pytest.mark.parametrize("tensor_parallel_size", [2, 1])
@pytest.mark.parametrize("model", MODELS)
@create_new_process_for_each_test() # Avoid initing CUDA in this process with TP=1
def test_llm_model_error_without_multiprocessing(
monkeypatch, tensor_parallel_size: int, model: str
) -> None:
"""Test that LLM forward pass error, but without multiprocessing."""
if cuda_device_count_stateless() < tensor_parallel_size:
pytest.skip(reason="Not enough CUDA devices")

# Confirm all the processes are cleaned up.
wait_for_gpu_memory_to_clear(
devices=list(range(tensor_parallel_size)),
threshold_bytes=SHUTDOWN_TEST_THRESHOLD_BYTES,
)
monkeypatch.setenv("VLLM_ENABLE_V1_MULTIPROCESSING", "0")

_test_llm_model_error(monkeypatch, tensor_parallel_size, model, Exception)
Loading