From 6a399bf18e28a49e26cee2b49f0dd1dde0f337b8 Mon Sep 17 00:00:00 2001 From: rahilbathwal5 Date: Wed, 3 Aug 2022 11:42:15 -0700 Subject: [PATCH] Revert "Release swap buffers for persisted params (#2089)" This reverts commit 2210ebe70f68135b6b43e91323a7d96a403a2299. --- deepspeed/runtime/engine.py | 66 ++++++++++--------- deepspeed/runtime/zero/parameter_offload.py | 22 ++----- .../runtime/zero/partition_parameters.py | 4 ++ .../zero/partitioned_param_coordinator.py | 10 --- deepspeed/runtime/zero/stage3.py | 23 +++---- 5 files changed, 53 insertions(+), 72 deletions(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index bae8c9157dbc..df3107bed159 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -303,8 +303,9 @@ def __init__( monitor_memory=False, ) - log_dist(f"DeepSpeed Flops Profiler Enabled: {self.flops_profiler_enabled()}", - ranks=[0]) + if dist.get_rank() == 0: + logger.info( + f"DeepSpeed Flops Profiler Enabled: {self.flops_profiler_enabled()}") if self.flops_profiler_enabled(): self.flops_profiler = FlopsProfiler(self.module, self) @@ -690,9 +691,6 @@ def zero_prefetch_bucket_size(self): def zero_param_persistence_threshold(self): return self._config.zero_config.param_persistence_threshold - def zero_model_persistence_threshold(self): - return self._config.zero_config.model_persistence_threshold - def zero_gather_16bit_weights_on_model_save(self): return self._config.zero_config.gather_16bit_weights_on_model_save @@ -784,17 +782,18 @@ def _configure_lr_scheduler(self, client_lr_scheduler): # First check for scheduler in json configuration lr_scheduler = self._scheduler_from_config(self.optimizer) if lr_scheduler: - log_dist( - f"DeepSpeed using configured LR scheduler = {self.scheduler_name()}", - ranks=[0]) + if self.global_rank == 0: + logger.info( + f"DeepSpeed using configured LR scheduler = {self.scheduler_name()}") self.lr_scheduler = lr_scheduler else: if isinstance(client_lr_scheduler, Callable): - log_dist('DeepSpeed using client callable to create LR scheduler', - ranks=[0]) + if self.global_rank == 0: + logger.info('DeepSpeed using client callable to create LR scheduler') self.lr_scheduler = client_lr_scheduler(self.basic_optimizer) else: - log_dist('DeepSpeed using client LR scheduler', ranks=[0]) + if self.global_rank == 0: + logger.info('DeepSpeed using client LR scheduler') self.lr_scheduler = client_lr_scheduler log_dist(f'DeepSpeed LR Scheduler = {self.lr_scheduler}', ranks=[0]) @@ -1097,26 +1096,31 @@ def _configure_optimizer(self, client_optimizer, model_parameters): client_optimizer.param_groups[:] = [ pg for pg in client_optimizer.param_groups if len(pg["params"]) != 0 ] - log_dist( - "Removing param_group that has no 'params' in the client Optimizer", - ranks=[0]) + if self.global_rank == 0: + logger.info( + "Removing param_group that has no 'params' in the client Optimizer" + ) basic_optimizer = client_optimizer - log_dist('Using client Optimizer as basic optimizer', ranks=[0]) + if self.global_rank == 0: + logger.info('Using client Optimizer as basic optimizer') else: basic_optimizer = client_optimizer(model_parameters) - log_dist('Using client callable to create basic optimizer', ranks=[0]) + if self.global_rank == 0: + logger.info('Using client callable to create basic optimizer') else: basic_optimizer = self._configure_basic_optimizer(model_parameters) - log_dist( - f"Using DeepSpeed Optimizer param name {self.optimizer_name()} as basic optimizer", - ranks=[0]) + if self.global_rank == 0: + logger.info( + "Using DeepSpeed Optimizer param name {} as basic optimizer".format( + self.optimizer_name())) self._check_for_duplicates(basic_optimizer) self.basic_optimizer = basic_optimizer - log_dist("DeepSpeed Basic Optimizer = {basic_optimizer.__class__.__name__}", - ranks=[0]) + if self.global_rank == 0: + logger.info("DeepSpeed Basic Optimizer = {}".format( + basic_optimizer.__class__.__name__)) if self.zero_optimization(): assert ( @@ -1137,7 +1141,8 @@ def _configure_optimizer(self, client_optimizer, model_parameters): elif self.amp_enabled(): assert not (self.fp16_enabled() or self.bfloat16_enabled()), "Cannot enable both amp with (legacy) fp16 or bfloat16 mode" amp_params = self.amp_params() - log_dist(f"Initializing AMP with these params: {amp_params}", ranks=[0]) + if self.global_rank == 0: + logger.info(f"Initializing AMP with these params: {amp_params}") try: logger.info("Initializing Apex amp from: {}".format(amp.__path__)) except NameError: @@ -1338,8 +1343,8 @@ def _configure_bf16_optimizer(self, optimizer): if optimizer is None: optimizer = DummyOptim(list(self.module.parameters())) - log_dist('Creating BF16 optimizer', ranks=[0]) - + if self.global_rank == 0: + logger.info('Creating unfused BF16 optimizer') timers = self.timers if self.wall_clock_breakdown() else None optimizer = BF16_Optimizer( optimizer, @@ -1354,6 +1359,7 @@ def _configure_bf16_optimizer(self, optimizer): def _configure_zero_optimizer(self, optimizer): zero_stage = self.zero_optimization_stage() + log_dist('Creating fp16 ZeRO stage {} optimizer'.format(zero_stage), ranks=[0]) assert self.communication_data_type in (torch.float16, torch.bfloat16), "ZeRO supports only 'communication_data_type': ['fp16', 'bfp16']" timers = self.timers if self.wall_clock_breakdown() else None @@ -1371,8 +1377,6 @@ def _configure_zero_optimizer(self, optimizer): round_robin_gradients = self.zero_round_robin_gradients() assert not isinstance(optimizer, DummyOptim), "zero stage 2 requires an optimizer" - log_dist('Creating fp16 ZeRO stage {} optimizer'.format(zero_stage), - ranks=[0]) # Overlap and contiguous grads are meaningless in stage 1 and are ignored if zero_stage == ZERO_OPTIMIZATION_OPTIMIZER_STATES: overlap_comm = False @@ -1418,8 +1422,10 @@ def _configure_zero_optimizer(self, optimizer): elif zero_stage == ZERO_OPTIMIZATION_WEIGHTS: assert not self.has_moe_layers, "MoE not supported with Stage 3" + logger.info("Initializing ZeRO Stage 3") if dist.get_rank() == 0 else None + from deepspeed.runtime.zero.stage3 import DeepSpeedZeroOptimizer_Stage3 + if isinstance(optimizer, DummyOptim): - log_dist("Creating ZeRO Offload", ranks=[0]) optimizer = DeepSpeedZeRoOffload( self.module, timers=timers, @@ -1429,13 +1435,10 @@ def _configure_zero_optimizer(self, optimizer): max_reuse_distance=self.zero_max_reuse_distance(), max_live_parameters=self.zero_max_live_parameters(), param_persistence_threshold=self.zero_param_persistence_threshold(), - model_persistence_threshold=self.zero_model_persistence_threshold(), offload_param_config=self.zero_offload_param(), mpu=self.mpu) else: - log_dist('Creating fp16 ZeRO stage {} optimizer'.format(zero_stage), - ranks=[0]) - from deepspeed.runtime.zero.stage3 import DeepSpeedZeroOptimizer_Stage3 + optimizer = DeepSpeedZeroOptimizer_Stage3( self.module, optimizer, @@ -1451,7 +1454,6 @@ def _configure_zero_optimizer(self, optimizer): max_reuse_distance=self.zero_max_reuse_distance(), max_live_parameters=self.zero_max_live_parameters(), param_persistence_threshold=self.zero_param_persistence_threshold(), - model_persistence_threshold=self.zero_model_persistence_threshold(), dp_process_group=self.data_parallel_group, reduce_scatter=self.zero_reduce_scatter(), overlap_comm=self.zero_overlap_comm(), diff --git a/deepspeed/runtime/zero/parameter_offload.py b/deepspeed/runtime/zero/parameter_offload.py index adb60c71dd98..688b81900e36 100644 --- a/deepspeed/runtime/zero/parameter_offload.py +++ b/deepspeed/runtime/zero/parameter_offload.py @@ -3,7 +3,6 @@ Licensed under the MIT license. """ -import sys import torch from torch.cuda import Stream from collections import OrderedDict @@ -174,11 +173,10 @@ def __init__(self, max_reuse_distance=1000000000, max_live_parameters=1000000000, param_persistence_threshold=100000, - model_persistence_threshold=sys.maxsize, offload_param_config=None, mpu=None): - see_memory_usage("DeepSpeedZeRoOffload initialize [begin]", force=True) + see_memory_usage("TensorOffload initialize beginning", force=True) print_rank_0(f"initialized {__class__.__name__} with args: {locals()}", force=False) @@ -199,11 +197,8 @@ def __init__(self, _inject_parameters(module, ZeROOrderedDict) - self.param_numel_persistence_threshold = int(param_persistence_threshold) - self.model_persistence_threshold = int(model_persistence_threshold) - self.persistent_parameters = self.mark_persistent_parameters( - self.param_numel_persistence_threshold, - self.model_persistence_threshold) + self.persistence_threshold = int(param_persistence_threshold) + self.persistent_parameters = self.mark_persistent_parameters() self.param_coordinators = {} self._prefetch_bucket_sz = int(prefetch_bucket_size) @@ -219,8 +214,6 @@ def __init__(self, f'Created module hooks: forward = {len(self.forward_hooks)}, backward = {len(self.backward_hooks)}', force=False) - see_memory_usage("DeepSpeedZeRoOffload initialize [end]", force=True) - @instrument_w_nvtx def partition_all_parameters(self): """Partitioning Parameters that were not partitioned usually if parameters @@ -299,15 +292,12 @@ def _end_of_forward_hook(module, *args): global FWD_MODULE_STACK FWD_MODULE_STACK.append(self.module) - def mark_persistent_parameters(self, param_threshold, model_threshold): + def mark_persistent_parameters(self): persistent_params = [] total_persistent_parameters = 0 params_count = 0 for _, param in self.module.named_parameters(recurse=True): - if param.ds_numel + total_persistent_parameters > model_threshold: - continue - - if param.ds_numel < param_threshold: + if param.ds_numel < self.persistence_threshold: params_count += 1 param.ds_persist = True persistent_params.append(param) @@ -315,7 +305,7 @@ def mark_persistent_parameters(self, param_threshold, model_threshold): print_rank_0( f"Parameter Offload: Total persistent parameters: {total_persistent_parameters} in {params_count} params", - force=True) + force=False) return persistent_params diff --git a/deepspeed/runtime/zero/partition_parameters.py b/deepspeed/runtime/zero/partition_parameters.py index e28ad63b5ba1..717c72b33344 100755 --- a/deepspeed/runtime/zero/partition_parameters.py +++ b/deepspeed/runtime/zero/partition_parameters.py @@ -687,7 +687,11 @@ def get_model(): # It can be same as local_device or it could be CPU or NVMe. self.remote_device = self.local_device if remote_device is None else remote_device self.pin_memory = pin_memory if (self.remote_device +<<<<<<< HEAD == OFFLOAD_CPU_DEVICE) else False +======= + == OffloadDeviceEnum.cpu) else False +>>>>>>> parent of 2210ebe (Release swap buffers for persisted params (#2089)) # Enable fp16 param swapping to NVMe if self.remote_device == OFFLOAD_NVME_DEVICE: diff --git a/deepspeed/runtime/zero/partitioned_param_coordinator.py b/deepspeed/runtime/zero/partitioned_param_coordinator.py index 351cb4ad4f1b..323b0f8e2e69 100644 --- a/deepspeed/runtime/zero/partitioned_param_coordinator.py +++ b/deepspeed/runtime/zero/partitioned_param_coordinator.py @@ -398,16 +398,6 @@ def __all_gather_params(self, params: Set[Parameter]) -> None: assert param.ds_status == ZeroParamStatus.INFLIGHT, param.ds_summary() self.__inflight_param_registry[param] = handle - # Release swap buffers for persisted params on nvme since they will never be partitioned or evicted from GPU - swap_persisted_params = [ - p for p in partitioned_params - if p.ds_persist and p.ds_tensor.final_location == OffloadDeviceEnum.nvme - ] - if swap_persisted_params: - swap_persisted_params[ - 0].nvme_swapper.remove_partition_and_release_buffers( - swap_persisted_params) - @instrument_w_nvtx def __release_param(self, param: Parameter) -> None: if param.ds_status == ZeroParamStatus.AVAILABLE and not param.ds_active_sub_modules: diff --git a/deepspeed/runtime/zero/stage3.py b/deepspeed/runtime/zero/stage3.py index f5b6ffa73161..b3daefa3768d 100755 --- a/deepspeed/runtime/zero/stage3.py +++ b/deepspeed/runtime/zero/stage3.py @@ -3,7 +3,6 @@ Licensed under the MIT license. """ -import sys import gc import collections from typing import Deque, Dict, Tuple @@ -89,7 +88,6 @@ def __init__(self, max_reuse_distance=1000000000, max_live_parameters=1000000000, param_persistence_threshold=100000, - model_persistence_threshold=sys.maxsize, dp_process_group=None, reduce_scatter=True, overlap_comm=False, @@ -148,18 +146,15 @@ def __init__(self, self.params_in_nvme_and_cpu = False self.max_params_in_cpu = 0 - self.parameter_offload = DeepSpeedZeRoOffload( - module=module, - timers=timers, - ds_config=ds_config, - overlap_comm=overlap_comm, - prefetch_bucket_size=prefetch_bucket_size, - max_reuse_distance=max_reuse_distance, - max_live_parameters=max_live_parameters, - param_persistence_threshold=param_persistence_threshold, - model_persistence_threshold=model_persistence_threshold, - offload_param_config=offload_optimizer_config) - + self.parameter_offload = DeepSpeedZeRoOffload(module, + timers, + ds_config, + overlap_comm, + prefetch_bucket_size, + max_reuse_distance, + max_live_parameters, + param_persistence_threshold, + offload_param_config) self.persistent_parameters = self.parameter_offload.persistent_parameters self._configure_offloading(offload_optimizer_config, offload_param_config)