diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index ec215d8e525b..98c2d9540e15 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -157,10 +157,13 @@ def _init_executor(self) -> None: global_start_rank = ( self.local_world_size * self.parallel_config.node_rank_within_dp ) - # Keep track of socket file descriptors that are inherited by the - # worker when using fork, so that we can close them in subsequent + # When using fork, keep track of socket file descriptors that are + # inherited by the worker, so that we can close them in subsequent # workers - inherited_fds: list[int] = [] + inherited_fds: list[int] | None = ( + [] if context.get_start_method() == "fork" else None + ) + for local_rank in range(self.local_world_size): global_rank = global_start_rank + local_rank is_driver_worker = self._is_driver_worker(global_rank) @@ -175,13 +178,9 @@ def _init_executor(self) -> None: inherited_fds=inherited_fds, ) unready_workers.append(unready_worker_handle) - if context.get_start_method() == "fork": - inherited_fds.extend( - [ - unready_worker_handle.death_writer.fileno(), - unready_worker_handle.ready_pipe.fileno(), - ] - ) + if inherited_fds is not None: + inherited_fds.append(unready_worker_handle.death_writer.fileno()) + inherited_fds.append(unready_worker_handle.ready_pipe.fileno()) # Workers must be created before wait_for_ready to avoid # deadlock, since worker.init_device() does a device sync. @@ -453,12 +452,13 @@ def shutdown(self): w.worker_response_mq.shutdown() w.worker_response_mq = None - if self.rpc_broadcast_mq is not None: - self.rpc_broadcast_mq.shutdown() + if rpc_broadcast_mq := getattr(self, "rpc_broadcast_mq", None): + rpc_broadcast_mq.shutdown() self.rpc_broadcast_mq = None - for mq in self.response_mqs: - mq.shutdown() - self.response_mqs = [] + if response_mqs := getattr(self, "response_mqs", None): + for mq in response_mqs: + mq.shutdown() + self.response_mqs = [] def check_health(self) -> None: self.collective_rpc("check_health", timeout=10) @@ -631,13 +631,16 @@ def make_worker_process( input_shm_handle, # Receive SchedulerOutput shared_worker_lock: LockType, is_driver_worker: bool, - inherited_fds: list[int], + inherited_fds: list[int] | None = None, ) -> UnreadyWorkerProcHandle: context = get_mp_context() # Ready pipe to communicate readiness from child to parent ready_reader, ready_writer = context.Pipe(duplex=False) # Death pipe to let child detect parent process exit death_reader, death_writer = context.Pipe(duplex=False) + if inherited_fds is not None: + inherited_fds = inherited_fds.copy() + inherited_fds.extend((ready_reader.fileno(), death_writer.fileno())) process_kwargs = { "vllm_config": vllm_config, "local_rank": local_rank, @@ -649,8 +652,7 @@ def make_worker_process( "shared_worker_lock": shared_worker_lock, "is_driver_worker": is_driver_worker, # Have the worker close parent end of this worker's pipes too - "inherited_fds": inherited_fds - + [ready_reader.fileno(), death_writer.fileno()], + "inherited_fds": inherited_fds if inherited_fds is not None else [], } # Run EngineCore busy loop in background process. proc = context.Process( @@ -694,9 +696,8 @@ def wait_for_ready( unready_proc_handles: list[UnreadyWorkerProcHandle], ) -> list[WorkerProcHandle]: e = Exception( - "WorkerProc initialization failed due to " - "an exception in a background process. " - "See stack trace for root cause." + "WorkerProc initialization failed due to an exception in a " + "background process. See stack trace for root cause." ) pipes = {handle.ready_pipe: handle for handle in unready_proc_handles} @@ -799,7 +800,7 @@ def signal_handler(signum, frame): try: os.close(fd) except Exception as e: - logger.warning("Exception closing inherited connection: %s", e) + logger.warning("Error closing inherited connection: %s: %s", type(e), e) try: # Initialize tracer