From 69d235cc3b0c152e5a037d0296f32bc056857fa1 Mon Sep 17 00:00:00 2001 From: Lidang-Jiang Date: Wed, 4 Mar 2026 20:11:46 +0800 Subject: [PATCH 1/3] [Bugfix] Suppress harmless repo_utils ERROR in stage workers for local model paths Only suppress repo_utils logger when model is a local path (os.path.exists). For HF repo IDs, keep logging enabled so real errors (auth failures, network issues) remain visible. Signed-off-by: Lidang-Jiang --- vllm_omni/entrypoints/omni_stage.py | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 098cfa15d88..a0fa9645775 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -711,6 +711,24 @@ def _stage_worker( from vllm_omni.plugins import load_omni_general_plugins load_omni_general_plugins() + # Suppress only known-harmless repo_utils ERROR messages for local model + # paths; keep the logger level intact so real errors are still visible. + if _os.path.exists(model): + import logging as _logging + + class _RepoUtilsLocalPathFilter(_logging.Filter): + """Filter out noisy HF Hub errors that always fire for local paths.""" + + _SUPPRESSED_FRAGMENTS = ( + "Error retrieving file list", + "Error retrieving safetensors", + ) + + def filter(self, record: _logging.LogRecord) -> bool: + msg = record.getMessage() + return not any(f in msg for f in self._SUPPRESSED_FRAGMENTS) + + _logging.getLogger("vllm.transformers_utils.repo_utils").addFilter(_RepoUtilsLocalPathFilter()) # IMPORTANT: Ensure vLLM's internal multiprocessing workers (e.g., GPUARWorker / # GPUARModelRunner) are spawned with a fork-safe method. # Mooncake / gRPC / RDMA and CUDA/NCCL can deadlock under fork-with-threads. @@ -1122,6 +1140,24 @@ async def _stage_worker_async( from vllm_omni.plugins import load_omni_general_plugins load_omni_general_plugins() + # Suppress only known-harmless repo_utils ERROR messages for local model + # paths; keep the logger level intact so real errors are still visible. + if _os.path.exists(model): + import logging as _logging + + class _RepoUtilsLocalPathFilter(_logging.Filter): + """Filter out noisy HF Hub errors that always fire for local paths.""" + + _SUPPRESSED_FRAGMENTS = ( + "Error retrieving file list", + "Error retrieving safetensors", + ) + + def filter(self, record: _logging.LogRecord) -> bool: + msg = record.getMessage() + return not any(f in msg for f in self._SUPPRESSED_FRAGMENTS) + + _logging.getLogger("vllm.transformers_utils.repo_utils").addFilter(_RepoUtilsLocalPathFilter()) # IMPORTANT: Ensure vLLM's internal multiprocessing workers (e.g., GPUARWorker / # GPUARModelRunner) are spawned with a fork-safe method. if _os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") != "spawn": From 725e41ef9184bad38d02e2d7b95f64918a6af2b3 Mon Sep 17 00:00:00 2001 From: Lidang-Jiang Date: Thu, 5 Mar 2026 16:15:22 +0800 Subject: [PATCH 2/3] [Bugfix] Add global file lock to prevent EADDRINUSE in concurrent stage init When multiple stages initialize concurrently on different GPUs, their get_open_port() calls can race (TOCTOU) and return the same port, causing EADDRINUSE errors. Add a global file lock that serializes engine initialization across all stages before the existing per-device lock. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Lidang-Jiang --- vllm_omni/entrypoints/omni_stage.py | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index a0fa9645775..5d5469707de 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -70,7 +70,43 @@ def _sequential_init_lock(engine_args: dict[str, Any], stage_init_timeout: int = If process-scoped memory tracking is available (NVML works), stages can safely initialize concurrently — each measures only its own GPU memory. Otherwise, fall back to file-based locks to serialize initialization. + + A global file lock is always acquired first to serialize get_open_port() + calls across all stages, preventing EADDRINUSE TOCTOU races. """ + # Global lock to serialize engine initialization across all stages. + # This prevents TOCTOU races where concurrent get_open_port() calls + # in different stages (using different devices) return the same port. + _GLOBAL_INIT_LOCK_PATH = "/tmp/vllm_omni_engine_init.lock" + global_lock_fd = os.open(_GLOBAL_INIT_LOCK_PATH, os.O_CREAT | os.O_RDWR, 0o644) + try: + logger.info("Waiting for global engine init lock (%s)...", _GLOBAL_INIT_LOCK_PATH) + fcntl.flock(global_lock_fd, fcntl.LOCK_EX) + logger.info("Acquired global engine init lock") + except OSError as e: + logger.warning("Failed to acquire global init lock: %s, proceeding anyway", e) + try: + os.close(global_lock_fd) + except OSError: + pass + global_lock_fd = -1 + + try: + with _sequential_init_lock_inner(engine_args, stage_init_timeout): + yield + finally: + if global_lock_fd >= 0: + try: + fcntl.flock(global_lock_fd, fcntl.LOCK_UN) + os.close(global_lock_fd) + logger.info("Released global engine init lock") + except OSError: + pass + + +@contextmanager +def _sequential_init_lock_inner(engine_args: dict[str, Any], stage_init_timeout: int = 300): + """Inner logic for sequential init locks (per-device granularity).""" from vllm_omni.worker.gpu_memory_utils import is_process_scoped_memory_available nvml_available = is_process_scoped_memory_available() From f83a622353e0747b3bf889d29b125a6fcfa7d48e Mon Sep 17 00:00:00 2001 From: Lidang-Jiang Date: Thu, 12 Mar 2026 21:23:01 +0800 Subject: [PATCH 3/3] [Bugfix] Deduplicate _RepoUtilsLocalPathFilter to module level Move the duplicated _RepoUtilsLocalPathFilter class and guard logic from both _stage_worker() and _stage_worker_async() to a single module-level definition with a _suppress_repo_utils_errors_for_local_path() helper, replacing both inline definitions with a single call. Signed-off-by: Lidang-Jiang --- vllm_omni/entrypoints/omni_stage.py | 58 +++++++++++------------------ 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/vllm_omni/entrypoints/omni_stage.py b/vllm_omni/entrypoints/omni_stage.py index 5d5469707de..60db196c161 100644 --- a/vllm_omni/entrypoints/omni_stage.py +++ b/vllm_omni/entrypoints/omni_stage.py @@ -9,6 +9,7 @@ import asyncio import fcntl import importlib +import logging import multiprocessing as mp import os import queue @@ -63,6 +64,25 @@ logger = init_logger(__name__) +class _RepoUtilsLocalPathFilter(logging.Filter): + """Filter out noisy HF Hub errors that always fire for local paths.""" + + _SUPPRESSED_FRAGMENTS = ( + "Error retrieving file list", + "Error retrieving safetensors", + ) + + def filter(self, record: logging.LogRecord) -> bool: + msg = record.getMessage() + return not any(f in msg for f in self._SUPPRESSED_FRAGMENTS) + + +def _suppress_repo_utils_errors_for_local_path(model: str) -> None: + """Install a filter to suppress harmless HF Hub errors for local model paths.""" + if os.path.exists(model): + logging.getLogger("vllm.transformers_utils.repo_utils").addFilter(_RepoUtilsLocalPathFilter()) + + @contextmanager def _sequential_init_lock(engine_args: dict[str, Any], stage_init_timeout: int = 300): """Acquire device locks for sequential init if NVML is unavailable. @@ -747,24 +767,7 @@ def _stage_worker( from vllm_omni.plugins import load_omni_general_plugins load_omni_general_plugins() - # Suppress only known-harmless repo_utils ERROR messages for local model - # paths; keep the logger level intact so real errors are still visible. - if _os.path.exists(model): - import logging as _logging - - class _RepoUtilsLocalPathFilter(_logging.Filter): - """Filter out noisy HF Hub errors that always fire for local paths.""" - - _SUPPRESSED_FRAGMENTS = ( - "Error retrieving file list", - "Error retrieving safetensors", - ) - - def filter(self, record: _logging.LogRecord) -> bool: - msg = record.getMessage() - return not any(f in msg for f in self._SUPPRESSED_FRAGMENTS) - - _logging.getLogger("vllm.transformers_utils.repo_utils").addFilter(_RepoUtilsLocalPathFilter()) + _suppress_repo_utils_errors_for_local_path(model) # IMPORTANT: Ensure vLLM's internal multiprocessing workers (e.g., GPUARWorker / # GPUARModelRunner) are spawned with a fork-safe method. # Mooncake / gRPC / RDMA and CUDA/NCCL can deadlock under fork-with-threads. @@ -1176,24 +1179,7 @@ async def _stage_worker_async( from vllm_omni.plugins import load_omni_general_plugins load_omni_general_plugins() - # Suppress only known-harmless repo_utils ERROR messages for local model - # paths; keep the logger level intact so real errors are still visible. - if _os.path.exists(model): - import logging as _logging - - class _RepoUtilsLocalPathFilter(_logging.Filter): - """Filter out noisy HF Hub errors that always fire for local paths.""" - - _SUPPRESSED_FRAGMENTS = ( - "Error retrieving file list", - "Error retrieving safetensors", - ) - - def filter(self, record: _logging.LogRecord) -> bool: - msg = record.getMessage() - return not any(f in msg for f in self._SUPPRESSED_FRAGMENTS) - - _logging.getLogger("vllm.transformers_utils.repo_utils").addFilter(_RepoUtilsLocalPathFilter()) + _suppress_repo_utils_errors_for_local_path(model) # IMPORTANT: Ensure vLLM's internal multiprocessing workers (e.g., GPUARWorker / # GPUARModelRunner) are spawned with a fork-safe method. if _os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") != "spawn":