diff --git a/deepspeed/pt/deepspeed_light.py b/deepspeed/pt/deepspeed_light.py index b2b30ddf0ff6..30340056b046 100755 --- a/deepspeed/pt/deepspeed_light.py +++ b/deepspeed/pt/deepspeed_light.py @@ -169,7 +169,8 @@ def __init__(self, self.optimizer = None self.lr_scheduler = None if model_parameters or optimizer: - if "torch.optim" in self.optimizer_name(): + optimizer_name = self.optimizer_name() + if optimizer_name is not None and "torch.optim" in optimizer_name: self.zero_set_cpu_offload() self._configure_optimizer(optimizer, model_parameters) self._configure_lr_scheduler(lr_scheduler) @@ -264,7 +265,6 @@ def sparse_gradients_enabled(self): def train_batch_size(self): return self._config.train_batch_size - def train_micro_batch_size_per_gpu(self): return self._config.train_micro_batch_size_per_gpu @@ -501,11 +501,11 @@ def _configure_distributed_model(self, model): # Configure optimizer def _configure_optimizer(self, client_optimizer, model_parameters): #jie: - if self.zero_cpu_offload(): + if client_optimizer is not None and self.zero_cpu_offload(): optimizer_parameters = self.optimizer_params() basic_optimizer = torch.optim.Adam(client_optimizer.param_groups, - **optimizer_parameters) - logger.info('Using CPU Optimizer as basic optimizer' + **optimizer_parameters) + logger.info('Using CPU Optimizer as basic optimizer') elif client_optimizer is not None: basic_optimizer = client_optimizer logger.info('Using client Optimizer as basic optimizer') @@ -547,8 +547,8 @@ def _configure_basic_optimizer(self, model_parameters): "'max_grad_norm' is not supported as an optimizer parameter, please switch to using the deepspeed parameter 'gradient_clipping' see: https://www.deepspeed.ai/docs/config-json/#gradient-clipping for more details" ) if self.optimizer_name() == ADAM_OPTIMIZER: - from apex.optimizers.fused_adam import FusedAdam - optimizer = FusedAdam(model_parameters, **optimizer_parameters) + from apex.optimizers.fused_adam import FusedAdam + optimizer = FusedAdam(model_parameters, **optimizer_parameters) elif self.optimizer_name() == LAMB_OPTIMIZER: optimizer = FusedLamb(model_parameters, **optimizer_parameters) else: diff --git a/deepspeed/pt/deepspeed_zero_optimizer.py b/deepspeed/pt/deepspeed_zero_optimizer.py index 7cdc61936568..a58aedb145ac 100755 --- a/deepspeed/pt/deepspeed_zero_optimizer.py +++ b/deepspeed/pt/deepspeed_zero_optimizer.py @@ -98,6 +98,7 @@ def move_to_cpu(tensor_list): def print_rank_msg(msg): print(f"rank {dist.get_rank()} - {msg}") + #jie:asyn move to target device def async_migrate_to(obj, dev, main_stream=None): if torch.is_tensor(obj): @@ -128,6 +129,7 @@ def async_copy_to(obj, dev, main_stream=None): elif isinstance(obj, collections.Sequence): return [async_copy_to(o, dev, main_stream) for o in obj] + class FP16_DeepSpeedZeroOptimizer(object): """ DeepSpeedZeroOptimizer designed to reduce the memory footprint @@ -289,11 +291,13 @@ def __init__(self, # a partition of the fp32 master weights that will be updated by this process if self.cpu_offload: self.single_partition_of_fp32_groups.append( - async_copy_to(self.parallel_partitioned_fp16_groups[i] - [partition_id], 'cpu').float() - ) - self.averaged_gradients_on_cpu[i] = [torch.empty_like(self.parallel_partitioned_fp16_groups[i][partition_id], - device='cpu')] + async_copy_to(self.parallel_partitioned_fp16_groups[i][partition_id], + 'cpu').float()) + self.averaged_gradients_on_cpu[i] = [ + torch.empty_like( + self.parallel_partitioned_fp16_groups[i][partition_id], + device='cpu') + ] else: self.single_partition_of_fp32_groups.append( self.parallel_partitioned_fp16_groups[i] @@ -499,12 +503,12 @@ def independent_gradient_partition_epilogue(self): if self.cpu_offload is False: for i, _ in enumerate(self.fp16_groups): self.averaged_gradients[i] = self.get_flat_partition( - self.params_in_partition[i], - self.first_offset[i], - self.partition_size[i], - dtype=torch.half, - device=torch.cuda.current_device(), - return_tensor_list=True) + self.params_in_partition[i], + self.first_offset[i], + self.partition_size[i], + dtype=torch.half, + device=torch.cuda.current_device(), + return_tensor_list=True) self._release_ipg_buffers() @@ -641,7 +645,8 @@ def reduce_independent_p_g_buckets_and_remove_grads(self, param, i): #jie: if self.cpu_offload: with stream(self.migration_stream): - averaged_gradients_on_cpu[i][param_id] = async_copy_to(new_grad_tensor.data.view_as(param.grad), + averaged_gradients_on_cpu[i][param_id] = async_copy_to( + new_grad_tensor.data.view_as(param.grad), 'cpu', self.cpu_computation_stream) else: @@ -767,8 +772,8 @@ def copy_grads_in_partition(self, param): else: ''' self.grads_in_partition = torch.empty(int(total_size), - dtype=torch.half, - device=torch.cuda.current_device()) + dtype=torch.half, + device=torch.cuda.current_device()) see_memory_usage(f"after copying {total_size} gradients into partition") #The allreduce buffer will be rewritted. Copy the gradients in partition to a new buffer @@ -1217,7 +1222,7 @@ def step(self, closure=None): self.single_partition_of_fp32_groups[i].dtype) else: single_grad_partition = _flatten_dense_tensors( - self.averaged_gradients_on_cpu[i]).to( + self.averaged_gradients[i]).to( self.single_partition_of_fp32_groups[i].dtype) assert single_grad_partition.numel() == self.partition_size[i], \ "averaged gradients have different number of elements that partition size {} {} {} {}".format(single_grad_partition.numel(), self.partition_size[i], i, partition_id) @@ -1226,7 +1231,8 @@ def step(self, closure=None): #release all the gradient since we have already created a necessary copy in dp_grad_partition self.free_grad_in_param_list(self.params_in_partition[i]) - self.averaged_gradients[i] = notneeded + if self.cpu_offload is False: + self.averaged_gradients[i] = None single_partition_grad_groups.append(single_grad_partition) @@ -1240,9 +1246,14 @@ def step(self, closure=None): if self.cpu_offload: with torch.cuda.stream(self.migration_stream): for averaged_gradients_cpu, fp32_partition in zip(self.averaged_gradients_on_cpu, self.single_partition_of_fp32_groups): - averaged_gradients_cpu[0] = async_copy_to(fp32_partition, torch.cuda.current_divice(),self.migration_stream) + averaged_gradients_cpu = async_copy_to(fp32_partition, + torch.cuda.current_device(), + self.migration_stream) for fp16_partitions, fp32_partition in zip(self.parallel_partitioned_fp16_groups, self.single_partition_of_fp32_groups): - fp16_partitions[partition_id] = async_copy_to(fp32_partition, torch.cuda.current_divice(),torch.cuda.main_stream()) + fp16_partitions[partition_id] = async_copy_to( + fp32_partition, + torch.cuda.current_device(), + torch.cuda.main_stream()) else: for fp16_partitions, fp32_partition in zip(self.parallel_partitioned_fp16_groups, self.single_partition_of_fp32_groups): fp16_partitions[partition_id].data.copy_(fp32_partition.data) @@ -1503,9 +1514,10 @@ def state_dict(self): self.single_partition_of_fp32_groups) state_dict['single_partition_of_fp32_groups'] = fp32_groups_without_padding - if self.cpu_offload: - state_dict_tmp = async_copy_to(state_dict, 'cpu', torch.cuda.current_stream()) + state_dict_tmp = async_copy_to(state_dict, + 'cpu', + torch.cuda.current_stream()) state_dict = None state_dict = state_dict_tmp diff --git a/deepspeed/pt/deepspeed_zero_utils.py b/deepspeed/pt/deepspeed_zero_utils.py index fd43605d3de1..034a9f4ccb22 100755 --- a/deepspeed/pt/deepspeed_zero_utils.py +++ b/deepspeed/pt/deepspeed_zero_utils.py @@ -2,6 +2,7 @@ from torch.autograd import Variable import collections + def async_migrate_to(obj, dev, main_stream=None): if torch.is_tensor(obj): obj = Variable(obj) diff --git a/tests/unit/test_fp16.py b/tests/unit/test_fp16.py index 29db822262dc..320d026bdd83 100755 --- a/tests/unit/test_fp16.py +++ b/tests/unit/test_fp16.py @@ -217,16 +217,8 @@ def _test_adamw_fp16_empty_grad(args, model, hidden_dim): _test_adamw_fp16_empty_grad(args=args, model=model, hidden_dim=hidden_dim) -@pytest.mark.parametrize('zero_stage, use_cpu_offload', - [ - (1, - False), - (2, - False), - (2, - True), - ]) -def test_adam_fp16_zero_onecycle_compatibility(tmpdir, zero_stage, use_cpu_offload): +@pytest.mark.parametrize("zero_stage", [0, 1, 2]) +def test_adam_fp16_zero_onecycle_compatibility(tmpdir, zero_stage): config_dict = { "train_batch_size": 1, "steps_per_print": 1, @@ -254,8 +246,7 @@ def test_adam_fp16_zero_onecycle_compatibility(tmpdir, zero_stage, use_cpu_offlo "enabled": True }, "zero_optimization": { - "stage": zero_stage, - "cpu_offload": use_cpu_offload + "stage": zero_stage } } @@ -283,17 +274,8 @@ def _test_adam_fp16_zero_onecycle_compatibility(args, model, hidden_dim): hidden_dim=hidden_dim) -#@pytest.mark.parametrize("zero_stage", [1, 2]) -@pytest.mark.parametrize('zero_stage, use_cpu_offload', - [ - (1, - False), - (2, - False), - (2, - True), - ]) -def test_zero_static_scale(tmpdir, zero_stage, use_cpu_offload): +@pytest.mark.parametrize("zero_stage", [1, 2]) +def test_zero_static_scale(tmpdir, zero_stage): config_dict = { "train_batch_size": 4, "steps_per_print": 1, @@ -308,8 +290,7 @@ def test_zero_static_scale(tmpdir, zero_stage, use_cpu_offload): "loss_scale": 138. }, "zero_optimization": { - "stage": zero_stage, - "zero_cpu_offload": use_cpu_offload + "stage": zero_stage } } args = args_from_dict(tmpdir, config_dict) @@ -352,7 +333,8 @@ def test_zero_static_scale_deprecated_format(tmpdir): "fp16": { "enabled": True, "loss_scale": 138. - } + }, + "zero_optimization": True } args = args_from_dict(tmpdir, config_dict) @@ -381,16 +363,8 @@ def _test_zero_static_scale(args): _test_zero_static_scale(args) -@pytest.mark.parametrize('zero_stage, use_cpu_offload', - [ - (1, - False), - (2, - False), - (2, - True), - ]) -def test_zero_allow_untested_optimizer(tmpdir, zero_stage, use_cpu_offload): +@pytest.mark.parametrize("zero_stage", [1, 2]) +def test_zero_allow_untested_optimizer(tmpdir, zero_stage): config_dict = { "train_batch_size": 4, "steps_per_print": 1, @@ -398,8 +372,7 @@ def test_zero_allow_untested_optimizer(tmpdir, zero_stage, use_cpu_offload): "enabled": True, }, "zero_optimization": { - "stage": zero_stage, - "cpu_offload": use_cpu_offload + "stage": zero_stage }, "zero_allow_untested_optimizer": False } @@ -419,16 +392,8 @@ def _test_zero_allow_untested_optimizer(args): _test_zero_allow_untested_optimizer(args) -@pytest.mark.parametrize('zero_stage, use_cpu_offload', - [ - (1, - False), - (2, - False), - (2, - True), - ]) -def test_zero_empty_partition(tmpdir, zero_stage, use_cpu_offload): +@pytest.mark.parametrize("zero_stage", [1, 2]) +def test_zero_empty_partition(tmpdir, zero_stage): config_dict = { "train_micro_batch_size_per_gpu": 1, "gradient_accumulation_steps": 1, @@ -443,8 +408,7 @@ def test_zero_empty_partition(tmpdir, zero_stage, use_cpu_offload): } }, "zero_optimization": { - "stage": zero_stage, - "cpu_offload": use_cpu_offload + "stage": zero_stage } } args = args_from_dict(tmpdir, config_dict)