Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions python/sglang/srt/managers/mm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,19 @@ def wrap_shm_features(obj):
return obj


def has_shm_features(recv_reqs):
"""Return True if any request in the list contains ShmPointerMMData."""
for req in recv_reqs:
if hasattr(req, "batch"):
if has_shm_features(req.batch):
return True
elif hasattr(req, "mm_inputs") and req.mm_inputs:
for item in req.mm_inputs.get("mm_items", []):
if isinstance(item.feature, ShmPointerMMData):
return True
return False


def unwrap_shm_features(obj):
"""
Restore ShmPointerMMData wrappers back into standard torch.Tensors.
Expand Down
27 changes: 26 additions & 1 deletion python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@
UpdateWeightsFromIPCReqInput,
UpdateWeightsFromTensorReqInput,
)
from sglang.srt.managers.mm_utils import init_mm_embedding_cache, unwrap_shm_features
from sglang.srt.managers.mm_utils import (
has_shm_features,
init_mm_embedding_cache,
unwrap_shm_features,
)
from sglang.srt.managers.multimodal_processor import get_mm_processor, import_processors
from sglang.srt.managers.overlap_utils import FutureMap
from sglang.srt.managers.prefill_delayer import (
Expand Down Expand Up @@ -1501,6 +1505,27 @@ def recv_requests(
# so that ShmPointerMMData metadata (not full tensor data) is what
# gets serialized during broadcast_pyobj.
if recv_reqs:
# Barrier for the non-DP-attention path only: there is a single
# broadcast_pyobj on tp_cpu_group where the source rank returns
# the original objects immediately while other ranks are still in
# pickle.loads (-> __setstate__ -> shm_open). Without a barrier
# the source can call materialize() / shm_unlink before others
# open the segment. recv_reqs is consistent across all ranks
# here (same broadcast), so the guard is deadlock-free.
#
# Under DP-attention no barrier is needed: the control_reqs
# broadcast on tp_cpu_group (step 3) is a collective that forces
# every rank to complete the earlier attn_tp / attn_cp work_reqs
# deserializations (steps 1-2, which call shm_open) before any
# rank returns from step 3. POSIX guarantees shm_unlink only
# removes the name; already-open handles stay valid.
if (
not self.server_args.enable_dp_attention
and self.tp_size > 1
and self.model_config.is_multimodal
and has_shm_features(recv_reqs)
):
barrier(group=self.tp_cpu_group)
Comment on lines +1508 to +1528
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To prevent regressions of this race condition, it would be beneficial to add a test case that specifically covers the failing scenario: a VLM model with tp_size > 1 and enable_dp_attention=False. This could be based on TestDPAttentionDP2TP2VLM with adjusted server launch arguments.

for req in recv_reqs:
unwrap_shm_features(req)

Expand Down
Loading