Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add isend irecv batch_isend_irecv reduce_scatter #4970

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/api/api_label
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ paddle.distributed.all_reduce .. _api_paddle_distributed_all_reduce:
paddle.distributed.scatter .. _api_paddle_distributed_scatter:
paddle.distributed.alltoall .. _api_paddle_distributed_alltoall:
paddle.distributed.send .. _api_paddle_distributed_send:
paddle.distributed.isend .. _api_paddle_distributed_isend:
paddle.distributed.irecv .. _api_paddle_distributed_irecv:
paddle.distributed.reduce_scatter .. _api_paddle_distributed_reduce_scatter:
paddle.distributed.QueueDataset .. _api_paddle_distributed_QueueDataset:
paddle.distributed.barrier .. _api_paddle_distributed_barrier:
paddle.distributed.CountFilterEntry .. _api_paddle_distributed_CountFilterEntry:
Expand Down
27 changes: 27 additions & 0 deletions docs/api/paddle/distributed/irecv_cn.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
.. _cn_api_paddle_distributed_irecv:

irecv
-------------------------------


.. py:function:: paddle.distributed.irecv(tensor, src=None, group=None)
异步接受发送来的tensor。

参数
:::::::::
- tensor (Tensor) - 要接受的张量。其数据类型应为 float16、float32、float64、int32 或 int64。
- src (int) - 接受节点的全局rank号。
- group (Group,可选) - new_group返回的Group实例,或者设置为None表示默认的全局组。默认值:None。


返回
:::::::::
返回Task。

注意
:::::::::
当前只支持动态图

代码示例
:::::::::
COPY-FROM: paddle.distributed.irecv
28 changes: 28 additions & 0 deletions docs/api/paddle/distributed/isend_cn.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
.. _cn_api_paddle_distributed_isend:

isend
-------------------------------


.. py:function:: paddle.distributed.isend(tensor, dst, group=None)
异步的将 ``tensor`` 发送到指定的rank进程上。

参数
:::::::::
- tensor (Tensor) - 要发送的张量。其数据类型应为 float16、float32、float64、int32 或 int64。
- dst (int) - 目标节点的全局rank号。
- group (Group,可选) - new_group返回的Group实例,或者设置为None表示默认的全局组。默认值:None。


返回
:::::::::
返回Task。


注意
:::::::::
当前只支持动态图

代码示例
:::::::::
COPY-FROM: paddle.distributed.isend
29 changes: 29 additions & 0 deletions docs/api/paddle/reduce_scatter_cn.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
.. _cn_api_paddle_distributed_reduce_scatter:

reduce_scatter
-------------------------------


.. py:function:: paddle.distributed.reduce_scatter(tensor, tensor_list, op=ReduceOp.SUM, group=None, use_calc_stream=True)
规约,然后将张量列表分散到组中的所有进程上

参数
:::::::::
- tensor (Tensor) – 输出的张量。
- tensor_list (list(Tensor)) – 归约和切分的张量列表。
- op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.Min|ReduceOp.PROD) – 操作类型,默认ReduceOp.SUM。
- group: (Group, optional) – 通信组;如果是None,则使用默认通信组。
- use_calc_stream: (bool, optional) – 决定是在计算流还是通信流上做该通信操作;默认为True,表示在计算流。


返回
:::::::::
返回Task。

注意
:::::::::
当前只支持动态图

代码示例
:::::::::
COPY-FROM: paddle.distributed.reduce_scatter
79 changes: 44 additions & 35 deletions docs/guides/06_distributed_training/pipeline_parallel_cn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,38 @@
import paddle.nn.functional as F
import paddle.distributed as dist
import random
from paddle.io import Dataset, BatchSampler, DataLoader


创建数据集

.. code-block:: python
BATCH_NUM = 20
BATCH_SIZE = 16
EPOCH_NUM = 4

IMAGE_SIZE = 784
CLASS_NUM = 10
MICRO_BATCH_SIZE = 2

class RandomDataset(Dataset):
def __init__(self, num_samples):
self.num_samples = num_samples

def __getitem__(self, idx):
image = np.random.random([1, 28, 28]).astype('float32')
label = np.random.randint(0, CLASS_NUM - 1, (1, )).astype('int64')
return image, label

def __len__(self):
return self.num_samples

dataset = RandomDataset(BATCH_NUM * BATCH_SIZE)
train_reader = DataLoader(dataset,
batch_size=BATCH_SIZE,
shuffle=True,
drop_last=True,
num_workers=2)


构建一个可以运行流水线的模型,模型的layer需要被LayerDesc或者继承了LayerDesc的SharedLayerDesc包裹,这里因为不需要共享参数,所以就使用LayerDesc
Expand All @@ -77,8 +109,9 @@
def forward(self, x):
return x.reshape(shape=self.shape)


class AlexNetPipeDesc(PipelineLayer):
def __init__(self, num_classes=10, **kwargs):
def __init__(self, num_classes=CLASS_NUM, **kwargs):
self.num_classes = num_classes
decs = [
LayerDesc(
Expand Down Expand Up @@ -108,14 +141,11 @@
]
super(AlexNetPipeDesc, self).__init__(
layers=decs, loss_fn=nn.CrossEntropyLoss(), **kwargs)

然后初始化分布式环境,这一步主要是构建流水线通信组的拓扑

.. code-block:: python

batch_size = 4
micro_batch_size = 2

strategy = fleet.DistributedStrategy()
model_parallel_size = 1
data_parallel_size = 1
Expand All @@ -126,12 +156,11 @@
"pp_degree": pipeline_parallel_size
}
strategy.pipeline_configs = {
"accumulate_steps": batch_size // micro_batch_size,
"micro_batch_size": micro_batch_size
"accumulate_steps": BATCH_SIZE // MICRO_BATCH_SIZE,
"micro_batch_size": MICRO_BATCH_SIZE
}


fleet.init(is_collective=True, strategy=strategy)

fleet.init(is_collective=True, strategy=strategy)

为了保证流水线并行参数初始化和普通模型初始化一致,需要在不同卡间设置不同的seed。

Expand Down Expand Up @@ -162,7 +191,6 @@ fleet.distributed_optimizer(...):这一步则是为优化器添加分布式属

.. code-block:: python


class ReshapeHelp(Layer):
def __init__(self, shape):
super(ReshapeHelp, self).__init__()
Expand Down Expand Up @@ -214,35 +242,16 @@ fleet.distributed_optimizer(...):这一步则是为优化器添加分布式属
optimizer = fleet.distributed_optimizer(optimizer)


创建mnist数据集

.. code-block:: python

train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=batch_size, drop_last=True
)

开始训练

model.train_batch(...):这一步主要就是执行1F1B的流水线并行方式

.. code-block:: python

for step_id, data in enumerate(train_reader()):
x_data = np.array([x[0] for x in data]).astype("float32").reshape(
batch_size, 1, 28, 28
)
y_data = np.array([x[1] for x in data]).astype("int64").reshape(
batch_size, 1
)
img = paddle.to_tensor(x_data)
label = paddle.to_tensor(y_data)
img.stop_gradient = True
label.stop_gradient = True
if step_id >= 5:
break

loss = model.train_batch([img, label], optimizer, scheduler)
for i, (image, label) in enumerate(train_reader()):
if i >= 5:
break
loss = model.train_batch([image, label], optimizer, scheduler)
print("pp_loss: ", loss.numpy())

运行方式(需要保证当前机器有两张GPU):
Expand All @@ -252,7 +261,7 @@ model.train_batch(...):这一步主要就是执行1F1B的流水线并行方式
export CUDA_VISIBLE_DEVICES=0,1
python -m paddle.distributed.launch alexnet_dygraph_pipeline.py # alexnet_dygraph_pipeline.py是用户运行动态图流水线的python文件

基于AlexNet的流水线并行动态图代码:`alex <https://github.com/PaddlePaddle/FleetX/tree/develop/examples/pipeline>`_。
基于AlexNet的完整的流水线并行动态图代码:`alex <https://github.com/PaddlePaddle/FleetX/tree/develop/examples/pipeline>`_。

控制台输出信息如下:

Expand Down