diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py index 6cd875905864bd..e7108b3f4f3432 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_optimizer.py @@ -50,8 +50,11 @@ def __init__(self, clip, hcg): @imperative_base.no_grad def _dygraph_clip(self, params_grads): params_and_grads = [] - sum_square_list_dist = [] - sum_square_list_not_dist = [] + + sum_square_dist_fp16 = [] + sum_square_dist_fp32 = [] + sum_square_not_dist_fp16 = [] + sum_square_not_dist_fp32 = [] for p, g in params_grads: if g is None: @@ -71,20 +74,51 @@ def _dygraph_clip(self, params_grads): if not_shared_enable: if p.is_distributed: - sum_square_list_dist.append(sum_square) + if p.dtype == paddle.float16: + sum_square_dist_fp16.append(sum_square) + elif p.dtype == paddle.float32: + sum_square_dist_fp32.append(sum_square) else: - sum_square_list_not_dist.append(sum_square) - - global_norm_var_dist = layers.concat(sum_square_list_dist) if len( - sum_square_list_dist) != 0 else layers.concat( - [paddle.to_tensor([0.])]) - global_norm_var_dist = layers.reduce_sum(global_norm_var_dist) - - global_norm_var_not_dist = layers.concat( - sum_square_list_not_dist) if len( - sum_square_list_not_dist) != 0 else layers.concat( - [paddle.to_tensor([0.])]) - global_norm_var_not_dist = layers.reduce_sum(global_norm_var_not_dist) + if p.dtype == paddle.float16: + sum_square_not_dist_fp16.append(sum_square) + elif p.dtype == paddle.float32: + sum_square_not_dist_fp32.append(sum_square) + + # global norm of distributed FP16 params_and_grads + if len(sum_square_dist_fp16) == 0: + global_norm_dist_fp16 = paddle.to_tensor([0.], dtype=paddle.float32) + else: + global_norm_dist_fp16 = layers.concat(sum_square_dist_fp16) + global_norm_dist_fp16 = layers.reduce_sum(global_norm_dist_fp16) + global_norm_dist_fp16 = paddle.cast( + global_norm_dist_fp16, dtype=paddle.float32) + + # global norm of non-distributed FP16 params_and_grads + if len(sum_square_not_dist_fp16) == 0: + global_norm_not_dist_fp16 = paddle.to_tensor( + [0.], dtype=paddle.float32) + else: + global_norm_not_dist_fp16 = layers.concat(sum_square_not_dist_fp16) + global_norm_not_dist_fp16 = layers.reduce_sum( + global_norm_not_dist_fp16) + global_norm_not_dist_fp16 = paddle.cast( + global_norm_not_dist_fp16, dtype=paddle.float32) + + # global norm of distributed FP32 params_and_grads + global_norm_dist_fp32 = layers.concat(sum_square_dist_fp32) if len( + sum_square_dist_fp32) != 0 else paddle.to_tensor( + [0.], dtype=paddle.float32) + global_norm_dist_fp32 = layers.reduce_sum(global_norm_dist_fp32) + + # global norm of non-distributed FP32 params_and_grads + global_norm_not_dist_fp32 = layers.concat( + sum_square_not_dist_fp32) if len( + sum_square_not_dist_fp32) != 0 else paddle.to_tensor( + [0.], dtype=paddle.float32) + global_norm_not_dist_fp32 = layers.reduce_sum(global_norm_not_dist_fp32) + + global_norm_var_dist = global_norm_dist_fp16 + global_norm_dist_fp32 + global_norm_var_not_dist = global_norm_not_dist_fp16 + global_norm_not_dist_fp32 # add all reduce to get global norm of distributed params_and_grads if self._hcg.get_model_parallel_world_size() > 1: @@ -105,22 +139,26 @@ def _dygraph_clip(self, params_grads): global_norm_var_not_dist, group=self._hcg.get_sharding_parallel_group()) - global_norm_var = layers.sqrt(global_norm_var_dist + - global_norm_var_not_dist) + global_norm_var_fp32 = layers.sqrt(global_norm_var_dist + + global_norm_var_not_dist) max_global_norm = layers.fill_constant( - shape=[1], dtype=global_norm_var.dtype, value=self.clip_norm) + shape=[1], dtype=global_norm_var_fp32.dtype, value=self.clip_norm) clip_var = layers.elementwise_div( x=max_global_norm, y=layers.elementwise_max( - x=global_norm_var, y=max_global_norm)) + x=global_norm_var_fp32, y=max_global_norm)) + clip_var_fp16 = paddle.cast(clip_var, paddle.float16) for p, g in params_grads: if g is None: continue if getattr(p, 'need_clip', True) is False: params_and_grads.append((p, g)) continue - new_grad = layers.elementwise_mul(x=g, y=clip_var) + if p.dtype == paddle.float16: + new_grad = layers.elementwise_mul(x=g, y=clip_var_fp16) + else: + new_grad = layers.elementwise_mul(x=g, y=clip_var) params_and_grads.append((p, new_grad)) return params_and_grads diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_fp16.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_fp16.py new file mode 100644 index 00000000000000..3e5eedbec9aea3 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_mp_fp16.py @@ -0,0 +1,59 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division +from __future__ import print_function + +import paddle +import numpy as np +from hybrid_parallel_mp_model import TestDistMPTraning +import paddle.distributed.fleet as fleet +import unittest + + +class TestMPFP16(TestDistMPTraning): + def build_optimizer(self, model): + grad_clip = paddle.nn.ClipGradByGlobalNorm(1.0) + scheduler = paddle.optimizer.lr.ExponentialDecay( + learning_rate=0.001, gamma=0.999, verbose=True) + optimizer = paddle.optimizer.SGD(scheduler, + grad_clip=grad_clip, + parameters=model.parameters()) + + model, optimizer = paddle.amp.decorate( + models=model, + optimizers=optimizer, + level='O2', + save_dtype='float32') + + return optimizer + + def train_batch(self, batch, model, optimizer, is_mp): + scaler = paddle.amp.GradScaler(init_loss_scaling=5160) + if is_mp: + scaler = fleet.distributed_scaler(scaler) + with paddle.amp.auto_cast(enable=True, level="O2"): + output = model(batch) + loss = output.mean() + + scaled = scaler.scale(loss) + scaled.backward() + scaler.step(optimizer) + scaler.update() + optimizer.clear_grad() + return scaled + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_amp.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_amp.py index 33a04a5e7e1838..84d11670027fef 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_amp.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_amp.py @@ -61,11 +61,14 @@ def test_pp_model(self): rank_id = dist.get_rank() set_random_seed(1024, dp_id, rank_id) + grad_clip = paddle.nn.ClipGradByGlobalNorm(1.0) + #construct model a model_a = AlexNet(10) scheduler_a = paddle.optimizer.lr.PiecewiseDecay( boundaries=[2], values=[0.001, 0.002], verbose=True) optimizer_a = paddle.optimizer.SGD(learning_rate=scheduler_a, + grad_clip=grad_clip, parameters=model_a.parameters()) scaler_a = paddle.amp.GradScaler(init_loss_scaling=2**5) @@ -80,6 +83,7 @@ def test_pp_model(self): scheduler_b = paddle.optimizer.lr.PiecewiseDecay( boundaries=[2], values=[0.001, 0.002], verbose=True) optimizer_b = paddle.optimizer.SGD(learning_rate=scheduler_b, + grad_clip=grad_clip, parameters=model_b.parameters()) model_b = fleet.distributed_model(model_b) optimizer_b = fleet.distributed_optimizer(optimizer_b) diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_fp16.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_fp16.py index 571459365addfc..9042cdba976753 100644 --- a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_fp16.py +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_fp16.py @@ -61,11 +61,14 @@ def test_pp_model(self): rank_id = dist.get_rank() set_random_seed(1024, dp_id, rank_id) + grad_clip = paddle.nn.ClipGradByGlobalNorm(1.0) + #construct model a model_a = AlexNet(10) scheduler_a = paddle.optimizer.lr.PiecewiseDecay( boundaries=[2], values=[0.001, 0.002], verbose=True) optimizer_a = paddle.optimizer.SGD(learning_rate=scheduler_a, + grad_clip=grad_clip, parameters=model_a.parameters()) scaler_a = paddle.amp.GradScaler(init_loss_scaling=2**5) @@ -75,6 +78,7 @@ def test_pp_model(self): scheduler_b = paddle.optimizer.lr.PiecewiseDecay( boundaries=[2], values=[0.001, 0.002], verbose=True) optimizer_b = paddle.optimizer.SGD(learning_rate=scheduler_b, + grad_clip=grad_clip, parameters=model_b.parameters()) param_len = len(model_a.parameters()) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_tensor_parallel.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_tensor_parallel.py index 4b9d6764bbb3b6..3705deb5ad856f 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_tensor_parallel.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_tensor_parallel.py @@ -30,6 +30,9 @@ def test_hybrid_parallel_mp_model(self): def test_hybrid_parallel_mp_amp(self): self.run_mnist_2gpu('hybrid_parallel_mp_amp.py') + def test_hybrid_parallel_mp_fp16(self): + self.run_mnist_2gpu('hybrid_parallel_mp_fp16.py') + def test_hybrid_parallel_mp_clip_grad(self): self.run_mnist_2gpu('hybrid_parallel_mp_clip_grad.py')