Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions vllm_omni/entrypoints/omni_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import asyncio
import fcntl
import importlib
import logging
import multiprocessing as mp
import os
import queue
Expand Down Expand Up @@ -63,14 +64,69 @@
logger = init_logger(__name__)


class _RepoUtilsLocalPathFilter(logging.Filter):
"""Filter out noisy HF Hub errors that always fire for local paths."""

_SUPPRESSED_FRAGMENTS = (
"Error retrieving file list",
"Error retrieving safetensors",
)

def filter(self, record: logging.LogRecord) -> bool:
msg = record.getMessage()
return not any(f in msg for f in self._SUPPRESSED_FRAGMENTS)


def _suppress_repo_utils_errors_for_local_path(model: str) -> None:
"""Install a filter to suppress harmless HF Hub errors for local model paths."""
if os.path.exists(model):
logging.getLogger("vllm.transformers_utils.repo_utils").addFilter(_RepoUtilsLocalPathFilter())


@contextmanager
def _sequential_init_lock(engine_args: dict[str, Any], stage_init_timeout: int = 300):
"""Acquire device locks for sequential init if NVML is unavailable.

If process-scoped memory tracking is available (NVML works), stages can
safely initialize concurrently — each measures only its own GPU memory.
Otherwise, fall back to file-based locks to serialize initialization.

A global file lock is always acquired first to serialize get_open_port()
calls across all stages, preventing EADDRINUSE TOCTOU races.
"""
# Global lock to serialize engine initialization across all stages.
# This prevents TOCTOU races where concurrent get_open_port() calls
# in different stages (using different devices) return the same port.
_GLOBAL_INIT_LOCK_PATH = "/tmp/vllm_omni_engine_init.lock"
global_lock_fd = os.open(_GLOBAL_INIT_LOCK_PATH, os.O_CREAT | os.O_RDWR, 0o644)
try:
logger.info("Waiting for global engine init lock (%s)...", _GLOBAL_INIT_LOCK_PATH)
fcntl.flock(global_lock_fd, fcntl.LOCK_EX)
logger.info("Acquired global engine init lock")
except OSError as e:
logger.warning("Failed to acquire global init lock: %s, proceeding anyway", e)
try:
os.close(global_lock_fd)
except OSError:
pass
global_lock_fd = -1

try:
with _sequential_init_lock_inner(engine_args, stage_init_timeout):
yield
finally:
if global_lock_fd >= 0:
try:
fcntl.flock(global_lock_fd, fcntl.LOCK_UN)
os.close(global_lock_fd)
logger.info("Released global engine init lock")
except OSError:
pass


@contextmanager
def _sequential_init_lock_inner(engine_args: dict[str, Any], stage_init_timeout: int = 300):
"""Inner logic for sequential init locks (per-device granularity)."""
from vllm_omni.worker.gpu_memory_utils import is_process_scoped_memory_available

nvml_available = is_process_scoped_memory_available()
Expand Down Expand Up @@ -711,6 +767,7 @@ def _stage_worker(
from vllm_omni.plugins import load_omni_general_plugins

load_omni_general_plugins()
_suppress_repo_utils_errors_for_local_path(model)
# IMPORTANT: Ensure vLLM's internal multiprocessing workers (e.g., GPUARWorker /
# GPUARModelRunner) are spawned with a fork-safe method.
# Mooncake / gRPC / RDMA and CUDA/NCCL can deadlock under fork-with-threads.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setLevel(CRITICAL) suppresses all ERROR logs from transformers — that's pretty broad. Can you target just the specific message instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — updated in 9ebc167. Replaced setLevel(CRITICAL) with a logging.Filter that only suppresses the known-harmless "Error retrieving file list" message. All other ERROR logs (including retry failures) now pass through normally.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now — the logging.Filter approach is much better than setLevel(CRITICAL). Only suppresses the known-harmless messages while keeping real errors visible. Thanks for the quick fix.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

Expand Down Expand Up @@ -1122,6 +1179,7 @@ async def _stage_worker_async(
from vllm_omni.plugins import load_omni_general_plugins

load_omni_general_plugins()
_suppress_repo_utils_errors_for_local_path(model)
# IMPORTANT: Ensure vLLM's internal multiprocessing workers (e.g., GPUARWorker /
# GPUARModelRunner) are spawned with a fork-safe method.
if _os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") != "spawn":
Expand Down
Loading