diff --git a/python/sglang/srt/managers/tp_worker.py b/python/sglang/srt/managers/tp_worker.py index b4f18d84dad2..dcdfd611cdae 100644 --- a/python/sglang/srt/managers/tp_worker.py +++ b/python/sglang/srt/managers/tp_worker.py @@ -15,6 +15,7 @@ from __future__ import annotations import logging +import os from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Optional @@ -56,6 +57,38 @@ logger = logging.getLogger(__name__) +def safe_load(): + import torch.multiprocessing as mp + + # 1) setting process sharing policy + try: + mp.set_sharing_strategy("file_system") + mp.set_start_method("fork", force=True) + except Exception: + pass + + # 2) using forks method + try: + mp.set_start_method("fork", force=True) + except RuntimeError: + pass + + import ctypes + + try: + libc = ctypes.CDLL("libc.so.6", use_errno=True) + PR_SET_PTRACER = 0x59616D61 + PR_SET_PTRACER_ANY = -1 + libc.prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY, 0, 0, 0) + except Exception: + pass + pass + + +if os.environ.get("SAFE_LOAD", "0") == "1": + safe_load() + + class BaseTpWorker(ABC): @abstractmethod def forward_batch_generation(self, forward_batch: ForwardBatch): diff --git a/python/sglang/srt/utils/patch_torch.py b/python/sglang/srt/utils/patch_torch.py index 9b4e21154e57..c472687e1eb3 100644 --- a/python/sglang/srt/utils/patch_torch.py +++ b/python/sglang/srt/utils/patch_torch.py @@ -16,6 +16,7 @@ import torch from packaging import version from torch.multiprocessing import reductions +from torch.storage import UntypedStorage from sglang.srt.utils.common import is_npu @@ -32,11 +33,15 @@ def monkey_patch_torch_reductions(): if hasattr(reductions, "_reduce_tensor_original"): return + UntypedStorage._new_shared_cuda_original = UntypedStorage._new_shared_cuda + reductions._reduce_tensor_original = reductions.reduce_tensor reductions._rebuild_cuda_tensor_original = reductions.rebuild_cuda_tensor + UntypedStorage._new_shared_cuda = _new_shared_cuda_safe + reductions.reduce_tensor = _reduce_tensor_modified - reductions.rebuild_cuda_tensor = _rebuild_cuda_tensor_modified + reductions._rebuild_cuda_tensor = _rebuild_cuda_tensor_safe reductions.init_reductions() @@ -59,6 +64,44 @@ def _rebuild_cuda_tensor_modified(*args): return reductions._rebuild_cuda_tensor_original(*args) +# NOTE (yiakwy) +def _new_shared_cuda_safe(*args, **kwargs): + """ + Monkey-patch for torch.UntypedStorage._new_shared_cuda to avoid pidfd_getfd + failures in Docker / restricted environments. + + Instead of using pidfd, fallback to CPU tensor and move to GPU manually later. + """ + # Allocate CPU storage of the same size + try: + return UntypedStorage._new_shared_cuda_original(*args, **kwargs) + except: + # original impl of https://github.com/pytorch/pytorch/blob/f47dd0ddef1359e5b43e4b962412f67b30ecde56/torch/csrc/StorageSharing.cpp#L423 + size_bytes = args[2] + if size_bytes is None or not isinstance(size_bytes, int): + raise RuntimeError("Cannot determine size for safe CUDA storage fallback") + + return torch.UntypedStorage._new_shared(size_bytes) + + +# NOTE (yiakwy) +def _rebuild_cuda_tensor_safe(*args): + try: + return _rebuild_cuda_tensor_modified(*args) + except RuntimeError: + args = _modify_tuple( + args, _REDUCE_TENSOR_ARG_DEVICE_INDEX, _device_from_maybe_uuid + ) + + storage, metadata = args[1], args[2] + + if storage.device.type != "cuda": + storage = storage.cuda() + + tensor = torch._utils._rebuild_tensor_v2(storage, *metadata) + return tensor + + def _device_to_uuid(device: int) -> str: return str(torch.cuda.get_device_properties(device).uuid)