Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions python/sglang/srt/managers/tp_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations

import logging
import os
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional

Expand Down Expand Up @@ -56,6 +57,38 @@
logger = logging.getLogger(__name__)


def safe_load():
import torch.multiprocessing as mp

# 1) setting process sharing policy
try:
mp.set_sharing_strategy("file_system")
mp.set_start_method("fork", force=True)
except Exception:
pass

# 2) using forks method
try:
mp.set_start_method("fork", force=True)
except RuntimeError:
pass

import ctypes

try:
libc = ctypes.CDLL("libc.so.6", use_errno=True)
PR_SET_PTRACER = 0x59616D61
PR_SET_PTRACER_ANY = -1
libc.prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY, 0, 0, 0)
except Exception:
pass
pass


if os.environ.get("SAFE_LOAD", "0") == "1":
safe_load()


class BaseTpWorker(ABC):
@abstractmethod
def forward_batch_generation(self, forward_batch: ForwardBatch):
Expand Down
45 changes: 44 additions & 1 deletion python/sglang/srt/utils/patch_torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import torch
from packaging import version
from torch.multiprocessing import reductions
from torch.storage import UntypedStorage

from sglang.srt.utils.common import is_npu

Expand All @@ -32,11 +33,15 @@ def monkey_patch_torch_reductions():
if hasattr(reductions, "_reduce_tensor_original"):
return

UntypedStorage._new_shared_cuda_original = UntypedStorage._new_shared_cuda

reductions._reduce_tensor_original = reductions.reduce_tensor
reductions._rebuild_cuda_tensor_original = reductions.rebuild_cuda_tensor

UntypedStorage._new_shared_cuda = _new_shared_cuda_safe

reductions.reduce_tensor = _reduce_tensor_modified
reductions.rebuild_cuda_tensor = _rebuild_cuda_tensor_modified
reductions._rebuild_cuda_tensor = _rebuild_cuda_tensor_safe

reductions.init_reductions()

Expand All @@ -59,6 +64,44 @@ def _rebuild_cuda_tensor_modified(*args):
return reductions._rebuild_cuda_tensor_original(*args)


# NOTE (yiakwy)
def _new_shared_cuda_safe(*args, **kwargs):
"""
Monkey-patch for torch.UntypedStorage._new_shared_cuda to avoid pidfd_getfd
failures in Docker / restricted environments.

Instead of using pidfd, fallback to CPU tensor and move to GPU manually later.
"""
# Allocate CPU storage of the same size
try:
return UntypedStorage._new_shared_cuda_original(*args, **kwargs)
except:
# original impl of https://github.com/pytorch/pytorch/blob/f47dd0ddef1359e5b43e4b962412f67b30ecde56/torch/csrc/StorageSharing.cpp#L423
size_bytes = args[2]
if size_bytes is None or not isinstance(size_bytes, int):
raise RuntimeError("Cannot determine size for safe CUDA storage fallback")

return torch.UntypedStorage._new_shared(size_bytes)


# NOTE (yiakwy)
def _rebuild_cuda_tensor_safe(*args):
try:
return _rebuild_cuda_tensor_modified(*args)
except RuntimeError:
args = _modify_tuple(
args, _REDUCE_TENSOR_ARG_DEVICE_INDEX, _device_from_maybe_uuid
)

storage, metadata = args[1], args[2]

if storage.device.type != "cuda":
storage = storage.cuda()

tensor = torch._utils._rebuild_tensor_v2(storage, *metadata)
return tensor


def _device_to_uuid(device: int) -> str:
return str(torch.cuda.get_device_properties(device).uuid)

Expand Down
Loading