diff --git a/python/sglang/srt/managers/mm_utils.py b/python/sglang/srt/managers/mm_utils.py index 514dfba47c06..f743aaf9c86c 100644 --- a/python/sglang/srt/managers/mm_utils.py +++ b/python/sglang/srt/managers/mm_utils.py @@ -1725,6 +1725,19 @@ def wrap_shm_features(obj): return obj +def has_shm_features(recv_reqs): + """Return True if any request in the list contains ShmPointerMMData.""" + for req in recv_reqs: + if hasattr(req, "batch"): + if has_shm_features(req.batch): + return True + elif hasattr(req, "mm_inputs") and req.mm_inputs: + for item in req.mm_inputs.get("mm_items", []): + if isinstance(item.feature, ShmPointerMMData): + return True + return False + + def unwrap_shm_features(obj): """ Restore ShmPointerMMData wrappers back into standard torch.Tensors. diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index 7092023457c3..f35d5a7ca829 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -142,7 +142,11 @@ UpdateWeightsFromIPCReqInput, UpdateWeightsFromTensorReqInput, ) -from sglang.srt.managers.mm_utils import init_mm_embedding_cache, unwrap_shm_features +from sglang.srt.managers.mm_utils import ( + has_shm_features, + init_mm_embedding_cache, + unwrap_shm_features, +) from sglang.srt.managers.multimodal_processor import get_mm_processor, import_processors from sglang.srt.managers.overlap_utils import FutureMap from sglang.srt.managers.prefill_delayer import ( @@ -1501,6 +1505,27 @@ def recv_requests( # so that ShmPointerMMData metadata (not full tensor data) is what # gets serialized during broadcast_pyobj. if recv_reqs: + # Barrier for the non-DP-attention path only: there is a single + # broadcast_pyobj on tp_cpu_group where the source rank returns + # the original objects immediately while other ranks are still in + # pickle.loads (-> __setstate__ -> shm_open). Without a barrier + # the source can call materialize() / shm_unlink before others + # open the segment. recv_reqs is consistent across all ranks + # here (same broadcast), so the guard is deadlock-free. + # + # Under DP-attention no barrier is needed: the control_reqs + # broadcast on tp_cpu_group (step 3) is a collective that forces + # every rank to complete the earlier attn_tp / attn_cp work_reqs + # deserializations (steps 1-2, which call shm_open) before any + # rank returns from step 3. POSIX guarantees shm_unlink only + # removes the name; already-open handles stay valid. + if ( + not self.server_args.enable_dp_attention + and self.tp_size > 1 + and self.model_config.is_multimodal + and has_shm_features(recv_reqs) + ): + barrier(group=self.tp_cpu_group) for req in recv_reqs: unwrap_shm_features(req)