Skip to content

Commit

Permalink
feature(wgt): enable DI using torch-rpc to support GPU-p2p and RDMA-rpc
Browse files Browse the repository at this point in the history
1. Add torchrpc message queue.

2. Implement buffer based on CUDA-shared-tensor to optimize the data path of torchrpc.

3. Add 'bypass_eventloop' arg in Task() and Parallel().

4. Add thread lock in distributer.py to prevent sender and receiver competition.

5. Add message queue perf test for torchrpc, nccl, nng, shm

6. Add comm_perf_helper.py to make program timing more convenient.

7. Modified the subscribe() of class MQ, adding 'fn' parameter and 'is_once' parameter.

8. Add new DummyLock and ConditionLock type in lock_helper.py

9. Add message queues perf test.

10. Introduced a new self-hosted runner to execute cuda, multiprocess, torchrpc related tests.
  • Loading branch information
SolenoidWGT committed Jan 12, 2023
1 parent dfae2cc commit 5cfc2fb
Show file tree
Hide file tree
Showing 33 changed files with 2,350 additions and 123 deletions.
83 changes: 75 additions & 8 deletions .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ jobs:
if: "!contains(github.event.head_commit.message, 'ci skip')"
strategy:
matrix:
python-version: [3.7, 3.8, 3.9]

python-version: ["3.7", "3.8", "3.9"]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: do_unittest
Expand All @@ -41,12 +40,13 @@ jobs:
if: "!contains(github.event.head_commit.message, 'ci skip')"
strategy:
matrix:
python-version: [3.7, 3.8, 3.9]

python-version: ["3.7", "3.8", "3.9"]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v3
env:
AGENT_TOOLSDIRECTORY: /opt/hostedtoolcache
with:
python-version: ${{ matrix.python-version }}
- name: do_benchmark
Expand All @@ -55,3 +55,70 @@ jobs:
python -m pip install ".[test,k8s]"
./ding/scripts/install-k8s-tools.sh
make benchmark
test_multiprocess:
runs-on: self-hosted
if: "!contains(github.event.head_commit.message, 'ci skip')"
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: do_multiprocesstest
timeout-minutes: 40
run: |
python -m pip install box2d-py
python -m pip install .
python -m pip install ".[test,k8s]"
./ding/scripts/install-k8s-tools.sh
make multiprocesstest
test_cuda:
runs-on: self-hosted
if: "!contains(github.event.head_commit.message, 'ci skip')"
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
env:
AGENT_TOOLSDIRECTORY: /opt/hostedtoolcache
with:
python-version: ${{ matrix.python-version }}
- name: do_unittest
timeout-minutes: 40
run: |
python -m pip install torch==1.12.1+cu113 --extra-index-url https://download.pytorch.org/whl/cu113
python -m pip install box2d-py
python -m pip install .
python -m pip install ".[test,k8s]"
./ding/scripts/install-k8s-tools.sh
make cudatest
test_mq_benchmark:
runs-on: self-hosted
if: "!contains(github.event.head_commit.message, 'ci skip')"
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
env:
AGENT_TOOLSDIRECTORY: /opt/hostedtoolcache
with:
python-version: ${{ matrix.python-version }}
- name: do_mqbenchmark
run: |
python -m pip install torch==1.12.1+cu113 --extra-index-url https://download.pytorch.org/whl/cu113
python -m pip install .
python -m pip install ".[test,k8s]"
./ding/scripts/install-k8s-tools.sh
make mqbenchmark
16 changes: 15 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,25 @@ benchmark:
--durations=0 \
-sv -m benchmark

multiprocesstest:
pytest ${TEST_DIR} \
--cov-report=xml \
--cov-report term-missing \
--cov=${COV_DIR} \
${DURATIONS_COMMAND} \
${WORKERS_COMMAND} \
-sv -m multiprocesstest

mqbenchmark:
pytest ${TEST_DIR} \
--durations=0 \
-sv -m mqbenchmark

test: unittest # just for compatibility, can be changed later

cpu_test: unittest algotest benchmark

all_test: unittest algotest cudatest benchmark
all_test: unittest algotest cudatest benchmark multiprocesstest

format:
yapf --in-place --recursive -p --verbose --style .style.yapf ${FORMAT_DIR}
Expand Down
7 changes: 7 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ coverage:
target: auto
threshold: 0.5%
if_ci_failed: success #success, failure, error, ignore

# fix me
# The unittests of the torchrpc module are tested by different runners and cannot be included
# in the test_unittest's coverage report. To keep CI happy, we don't count torchrpc related coverage.
ignore:
- /mnt/cache/wangguoteng/DI-engine/ding/framework/message_queue/torch_rpc.py
- /mnt/cache/wangguoteng/DI-engine/ding/framework/message_queue/perfs/*
4 changes: 4 additions & 0 deletions ding/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ def torch_ge_131():

def torch_ge_180():
return int("".join(list(filter(str.isdigit, torch.__version__)))) >= 180


def torch_ge_1121():
return int("".join(list(filter(str.isdigit, torch.__version__)))) >= 1121
134 changes: 130 additions & 4 deletions ding/data/shm_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import ctypes
import numpy as np
import torch
import torch.multiprocessing as mp
from functools import reduce
from ditk import logging
from abc import abstractmethod

_NTYPE_TO_CTYPE = {
np.bool_: ctypes.c_bool,
Expand All @@ -18,8 +22,37 @@
np.float64: ctypes.c_double,
}

# uint16, uint32, uint32
_NTYPE_TO_TTYPE = {
np.bool_: torch.bool,
np.uint8: torch.uint8,
# np.uint16: torch.int16,
# np.uint32: torch.int32,
# np.uint64: torch.int64,
np.int8: torch.uint8,
np.int16: torch.int16,
np.int32: torch.int32,
np.int64: torch.int64,
np.float32: torch.float32,
np.float64: torch.float64,
}

_NOT_SUPPORT_NTYPE = {np.uint16: torch.int16, np.uint32: torch.int32, np.uint64: torch.int64}
_CONVERSION_TYPE = {np.uint16: np.int16, np.uint32: np.int32, np.uint64: np.int64}


class ShmBufferBase:

@abstractmethod
def fill(self, src_arr: Union[np.ndarray, torch.Tensor]) -> None:
raise NotImplementedError

class ShmBuffer():
@abstractmethod
def get(self) -> Union[np.ndarray, torch.Tensor]:
raise NotImplementedError


class ShmBuffer(ShmBufferBase):
"""
Overview:
Shared memory buffer to store numpy array.
Expand Down Expand Up @@ -78,6 +111,94 @@ def get(self) -> np.ndarray:
return data


class ShmBufferCuda(ShmBufferBase):

def __init__(
self,
dtype: Union[torch.dtype, np.dtype],
shape: Tuple[int],
ctype: Optional[type] = None,
copy_on_get: bool = True,
device: Optional[torch.device] = torch.device('cuda:0')
) -> None:
"""
Overview:
Use torch.multiprocessing for shared tensor or ndaray between processes.
Arguments:
- dtype (Union[torch.dtype, np.dtype]): dtype of torch.tensor or numpy.ndarray.
- shape (Tuple[int]): Shape of torch.tensor or numpy.ndarray.
- ctype (type): Origin class type, e.g. np.ndarray, torch.Tensor.
- copy_on_get (bool, optional): Can be set to False only if the shared object
is a tenor, otherwise True.
- device (Optional[torch.device], optional): The GPU device where cuda-shared-tensor
is located, the default is cuda:0.
Raises:
RuntimeError: Unsupported share type by ShmBufferCuda.
"""
if isinstance(dtype, np.dtype): # it is type of gym.spaces.dtype
self.ctype = np.ndarray
dtype = dtype.type
if dtype in _NOT_SUPPORT_NTYPE.keys():
logging.warning(
"Torch tensor unsupport numpy type {}, attempt to do a type conversion, which may lose precision.".
format(dtype)
)
ttype = _NOT_SUPPORT_NTYPE[dtype]
self.dtype = _CONVERSION_TYPE[dtype]
else:
ttype = _NTYPE_TO_TTYPE[dtype]
self.dtype = dtype
elif isinstance(dtype, torch.dtype):
self.ctype = torch.Tensor
ttype = dtype
else:
raise RuntimeError("The dtype parameter only supports torch.dtype and np.dtype")

self.copy_on_get = copy_on_get
self.shape = shape
self.device = device
# We don't want the buffer to be involved in the computational graph
with torch.no_grad():
self.buffer = torch.zeros(reduce(lambda x, y: x * y, shape), dtype=ttype, device=self.device)

def fill(self, src_arr: Union[np.ndarray, torch.Tensor]) -> None:
if self.ctype is np.ndarray:
if src_arr.dtype.type != self.dtype:
logging.warning(
"Torch tensor unsupport numpy type {}, attempt to do a type conversion, which may lose precision.".
format(self.dtype)
)
src_arr = src_arr.astype(self.dtype)
tensor = torch.from_numpy(src_arr)
elif self.ctype is torch.Tensor:
tensor = src_arr
else:
raise RuntimeError("Unsopport CUDA-shared-tensor input type:\"{}\"".format(type(src_arr)))

# If the GPU-a and GPU-b are connected using nvlink, the copy is very fast.
with torch.no_grad():
self.buffer.copy_(tensor.view(tensor.numel()))

def get(self) -> Union[np.ndarray, torch.Tensor]:
with torch.no_grad():
if self.ctype is np.ndarray:
# Because ShmBufferCuda use CUDA memory exchanging data between processes.
# So copy_on_get is necessary for numpy arrays.
re = self.buffer.cpu()
re = re.detach().view(self.shape).numpy()
else:
if self.copy_on_get:
re = self.buffer.clone().detach().view(self.shape)
else:
re = self.buffer.view(self.shape)

return re

def __del__(self):
del self.buffer


class ShmBufferContainer(object):
"""
Overview:
Expand All @@ -88,7 +209,8 @@ def __init__(
self,
dtype: Union[Dict[Any, type], type, np.dtype],
shape: Union[Dict[Any, tuple], tuple],
copy_on_get: bool = True
copy_on_get: bool = True,
is_cuda_buffer: bool = False
) -> None:
"""
Overview:
Expand All @@ -98,11 +220,15 @@ def __init__(
- shape (:obj:`Union[Dict[Any, tuple], tuple]`): If `Dict[Any, tuple]`, use a dict to manage \
multiple buffers; If `tuple`, use single buffer.
- copy_on_get (:obj:`bool`): Whether to copy data when calling get method.
- is_cuda_buffer (:obj:`bool`): Whether to use pytorch CUDA shared tensor as the implementation of shm.
"""
if isinstance(shape, dict):
self._data = {k: ShmBufferContainer(dtype[k], v, copy_on_get) for k, v in shape.items()}
self._data = {k: ShmBufferContainer(dtype[k], v, copy_on_get, is_cuda_buffer) for k, v in shape.items()}
elif isinstance(shape, (tuple, list)):
self._data = ShmBuffer(dtype, shape, copy_on_get)
if not is_cuda_buffer:
self._data = ShmBuffer(dtype, shape, copy_on_get)
else:
self._data = ShmBufferCuda(dtype, shape, copy_on_get)
else:
raise RuntimeError("not support shape: {}".format(shape))
self._shape = shape
Expand Down
Loading

0 comments on commit 5cfc2fb

Please sign in to comment.