Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 123 additions & 52 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,31 @@ def __init__(
)

self._start_server(model, vllm_serve_args, env_dict)
max_wait_seconds = max_wait_seconds or 360
self._wait_for_server(url=self.url_for("health"), timeout=max_wait_seconds)
max_wait_seconds = max_wait_seconds or 480
try:
self._wait_for_server(url=self.url_for("health"), timeout=max_wait_seconds)
except Exception:
# If the server never became healthy, we must still clean up
# the subprocess tree. Without this, a timeout in __init__
# leaks the server + EngineCore processes (and their GPU
# memory), because __exit__ is never called when __init__
# raises inside a ``with`` statement.
self._shutdown()
raise

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self._shutdown()

def _shutdown(self) -> None:
"""Kill the server process tree and wait for GPU memory release.

Called from both ``__exit__`` (normal path) and ``__init__``
(when the server fails to start). Must be safe to call even if
the process is already dead.
"""
pid = self.proc.pid

# Get the process group ID. Because we used
Expand Down Expand Up @@ -265,33 +283,92 @@ def __exit__(self, exc_type, exc_value, traceback):
self.proc.wait(timeout=10)
print(f"[RemoteOpenAIServer] Server {pid} killed")
except subprocess.TimeoutExpired:
# Phase 3: last resort - find and kill any orphaned children
self._kill_orphaned_children(pid)
pass

# Wait for GPU memory to actually be *freed*, not just
# After killing the root process, ensure all children in the
# process group (e.g. EngineCore workers) are also dead.
# On ROCm especially, surviving children hold GPU contexts and
# prevent VRAM from being reclaimed by the driver.
self._kill_process_group_survivors(pgid)

# Wait for GPU memory to actually be freed, not just
# "stabilized at whatever level it's at".
self._wait_for_gpu_memory_release()

def _kill_orphaned_children(self, parent_pid: int) -> None:
"""Best-effort cleanup of any lingering child processes."""
try:
import psutil
def _kill_process_group_survivors(
self, pgid: int | None, timeout: float = 15.0
) -> None:
"""SIGKILL any processes still in the server's process group
and wait for them to exit.

parent = psutil.Process(parent_pid)
children = parent.children(recursive=True)
for child in children:
print(
f"[RemoteOpenAIServer] Killing orphaned child "
f"pid={child.pid} name={child.name()}"
)
child.kill()
psutil.wait_procs(children, timeout=5)
except Exception as e:
# psutil may not be installed, or processes already gone
print(f"[RemoteOpenAIServer] Orphan cleanup failed: {e}")
# Fallback: try to kill by pgid one more time
with contextlib.suppress(ProcessLookupError, OSError):
os.killpg(parent_pid, signal.SIGKILL)
Because the server is launched with ``start_new_session=True``,
all its children (EngineCore, workers, etc.) share the same
pgid. After the root process is killed, stragglers -- especially
on ROCm where GPU contexts linger until the *process* exits --
must be reaped explicitly.

Uses ``/proc`` to scan for pgid members so this works even after
the parent has been reaped (unlike ``psutil.Process.children``).
"""
if pgid is None:
return

# Send SIGKILL to the entire process group one more time.
# This is cheap and harmless if everyone is already dead.
with contextlib.suppress(ProcessLookupError, OSError):
os.killpg(pgid, signal.SIGKILL)

# Collect surviving PIDs by scanning /proc for matching pgid.
# This works on Linux even after the parent has been waited on
# and is more reliable than psutil.Process(parent).children().
survivor_pids = self._find_pgid_members(pgid)

if not survivor_pids:
return

print(
f"[RemoteOpenAIServer] {len(survivor_pids)} process(es) still "
f"in pgid {pgid} after SIGKILL: {survivor_pids}"
)

# Wait for each survivor to actually exit so the GPU driver
# releases its VRAM.
deadline = time.time() + timeout
while survivor_pids and time.time() < deadline:
still_alive = []
for spid in survivor_pids:
try:
os.kill(spid, 0) # Check if still alive
still_alive.append(spid)
except (ProcessLookupError, OSError):
pass
survivor_pids = still_alive
if survivor_pids:
time.sleep(0.5)

if survivor_pids:
print(
f"[RemoteOpenAIServer] WARNING: processes {survivor_pids} "
f"in pgid {pgid} could not be killed within {timeout}s"
)

@staticmethod
def _find_pgid_members(pgid: int) -> list[int]:
"""Return PIDs of all living processes whose pgid matches."""
members: list[int] = []
proc_path = Path("/proc")
if not proc_path.is_dir():
return members
for entry in proc_path.iterdir():
if not entry.name.isdigit():
continue
pid = int(entry.name)
try:
if os.getpgid(pid) == pgid:
members.append(pid)
except OSError:
continue
return members

def _get_gpu_memory_used(self) -> float | None:
"""Get total GPU memory used across all visible devices in bytes."""
Expand All @@ -318,13 +395,16 @@ def _get_gpu_memory_used(self) -> float | None:
return None
return None

def _wait_for_gpu_memory_release(self, timeout: float = 60.0):
def _wait_for_gpu_memory_release(
self, timeout: float = 120.0, log_interval: float = 10.0
):
"""Wait for GPU memory to drop back toward pre-server levels.

Two-phase strategy:
1. Try to wait for memory to return close to pre-server baseline.
2. If that doesn't happen, fall back to waiting for stabilization
and log a warning (the next server might still OOM).
Waits the full timeout for memory to return close to the
pre-server baseline. Does NOT fall back to a "stabilization"
heuristic -- if memory is still held when the timeout expires,
the test fails so the problem is surfaced immediately rather
than causing cascading OOM failures in every subsequent test.
"""
baseline = self._pre_server_gpu_memory
if baseline is None:
Expand All @@ -337,8 +417,7 @@ def _wait_for_gpu_memory_release(self, timeout: float = 60.0):
target = baseline + headroom_bytes

start = time.time()
last_used: float | None = None
stable_count = 0
next_log_time = start + log_interval

while time.time() - start < timeout:
used = self._get_gpu_memory_used()
Expand All @@ -350,7 +429,6 @@ def _wait_for_gpu_memory_release(self, timeout: float = 60.0):
target_gb = target / 1e9
elapsed = time.time() - start

# Phase 1: memory dropped to near baseline - we're done.
if used <= target:
print(
f"[RemoteOpenAIServer] GPU memory released to "
Expand All @@ -359,28 +437,19 @@ def _wait_for_gpu_memory_release(self, timeout: float = 60.0):
)
return

# Phase 2 (after 40s): fall back to stabilization check.
# This handles cases where another process is using GPU memory
# and we'll never reach baseline.
if elapsed > 40.0 and last_used is not None:
delta = abs(used - last_used)
if delta < 200 * 1024 * 1024: # 200 MB
stable_count += 1
if stable_count >= 3:
print(
f"[RemoteOpenAIServer] WARNING: GPU memory "
f"stabilized at {used_gb:.2f} GB "
f"(target was {target_gb:.2f} GB). "
f"Proceeding - next server may OOM."
)
return
else:
stable_count = 0
now = time.time()
if now >= next_log_time:
print(
f"[RemoteOpenAIServer] Waiting for GPU memory release: "
f"{used_gb:.2f} GB (target: {target_gb:.2f} GB) "
f"[{elapsed:.0f}s/{timeout:.0f}s]"
)
next_log_time = now + log_interval

last_used = used
time.sleep(1.0)

# Timeout - log clearly so CI failures are diagnosable
# Timeout -- raise so the current test fails with a clear
# message instead of silently poisoning subsequent tests.
final_used = self._get_gpu_memory_used()
final_gb = final_used / 1e9 if final_used else 0.0
raise RuntimeError(
Expand Down Expand Up @@ -534,7 +603,9 @@ def _pre_download_model(self, model: str, args) -> None:
revision=model_config.tokenizer_revision,
)

def _wait_for_gpu_memory_release(self, timeout: float = 30.0):
def _wait_for_gpu_memory_release(
self, timeout: float = 30.0, log_interval: float = 10.0
):
pass # No GPU used


Expand Down
Loading