Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,9 @@ def __init__(
monitor_memory=False,
)

log_dist(f"DeepSpeed Flops Profiler Enabled: {self.flops_profiler_enabled()}",
ranks=[0])
if dist.get_rank() == 0:
logger.info(
f"DeepSpeed Flops Profiler Enabled: {self.flops_profiler_enabled()}")

if self.flops_profiler_enabled():
self.flops_profiler = FlopsProfiler(self.module, self)
Expand Down Expand Up @@ -690,9 +691,6 @@ def zero_prefetch_bucket_size(self):
def zero_param_persistence_threshold(self):
return self._config.zero_config.param_persistence_threshold

def zero_model_persistence_threshold(self):
return self._config.zero_config.model_persistence_threshold

def zero_gather_16bit_weights_on_model_save(self):
return self._config.zero_config.gather_16bit_weights_on_model_save

Expand Down Expand Up @@ -784,17 +782,18 @@ def _configure_lr_scheduler(self, client_lr_scheduler):
# First check for scheduler in json configuration
lr_scheduler = self._scheduler_from_config(self.optimizer)
if lr_scheduler:
log_dist(
f"DeepSpeed using configured LR scheduler = {self.scheduler_name()}",
ranks=[0])
if self.global_rank == 0:
logger.info(
f"DeepSpeed using configured LR scheduler = {self.scheduler_name()}")
self.lr_scheduler = lr_scheduler
else:
if isinstance(client_lr_scheduler, Callable):
log_dist('DeepSpeed using client callable to create LR scheduler',
ranks=[0])
if self.global_rank == 0:
logger.info('DeepSpeed using client callable to create LR scheduler')
self.lr_scheduler = client_lr_scheduler(self.basic_optimizer)
else:
log_dist('DeepSpeed using client LR scheduler', ranks=[0])
if self.global_rank == 0:
logger.info('DeepSpeed using client LR scheduler')
self.lr_scheduler = client_lr_scheduler

log_dist(f'DeepSpeed LR Scheduler = {self.lr_scheduler}', ranks=[0])
Expand Down Expand Up @@ -1097,26 +1096,31 @@ def _configure_optimizer(self, client_optimizer, model_parameters):
client_optimizer.param_groups[:] = [
pg for pg in client_optimizer.param_groups if len(pg["params"]) != 0
]
log_dist(
"Removing param_group that has no 'params' in the client Optimizer",
ranks=[0])
if self.global_rank == 0:
logger.info(
"Removing param_group that has no 'params' in the client Optimizer"
)

basic_optimizer = client_optimizer
log_dist('Using client Optimizer as basic optimizer', ranks=[0])
if self.global_rank == 0:
logger.info('Using client Optimizer as basic optimizer')
else:
basic_optimizer = client_optimizer(model_parameters)
log_dist('Using client callable to create basic optimizer', ranks=[0])
if self.global_rank == 0:
logger.info('Using client callable to create basic optimizer')
else:
basic_optimizer = self._configure_basic_optimizer(model_parameters)
log_dist(
f"Using DeepSpeed Optimizer param name {self.optimizer_name()} as basic optimizer",
ranks=[0])
if self.global_rank == 0:
logger.info(
"Using DeepSpeed Optimizer param name {} as basic optimizer".format(
self.optimizer_name()))

self._check_for_duplicates(basic_optimizer)

self.basic_optimizer = basic_optimizer
log_dist("DeepSpeed Basic Optimizer = {basic_optimizer.__class__.__name__}",
ranks=[0])
if self.global_rank == 0:
logger.info("DeepSpeed Basic Optimizer = {}".format(
basic_optimizer.__class__.__name__))

if self.zero_optimization():
assert (
Expand All @@ -1137,7 +1141,8 @@ def _configure_optimizer(self, client_optimizer, model_parameters):
elif self.amp_enabled():
assert not (self.fp16_enabled() or self.bfloat16_enabled()), "Cannot enable both amp with (legacy) fp16 or bfloat16 mode"
amp_params = self.amp_params()
log_dist(f"Initializing AMP with these params: {amp_params}", ranks=[0])
if self.global_rank == 0:
logger.info(f"Initializing AMP with these params: {amp_params}")
try:
logger.info("Initializing Apex amp from: {}".format(amp.__path__))
except NameError:
Expand Down Expand Up @@ -1338,8 +1343,8 @@ def _configure_bf16_optimizer(self, optimizer):
if optimizer is None:
optimizer = DummyOptim(list(self.module.parameters()))

log_dist('Creating BF16 optimizer', ranks=[0])

if self.global_rank == 0:
logger.info('Creating unfused BF16 optimizer')
timers = self.timers if self.wall_clock_breakdown() else None
optimizer = BF16_Optimizer(
optimizer,
Expand All @@ -1354,6 +1359,7 @@ def _configure_bf16_optimizer(self, optimizer):

def _configure_zero_optimizer(self, optimizer):
zero_stage = self.zero_optimization_stage()
log_dist('Creating fp16 ZeRO stage {} optimizer'.format(zero_stage), ranks=[0])
assert self.communication_data_type in (torch.float16, torch.bfloat16), "ZeRO supports only 'communication_data_type': ['fp16', 'bfp16']"
timers = self.timers if self.wall_clock_breakdown() else None

Expand All @@ -1371,8 +1377,6 @@ def _configure_zero_optimizer(self, optimizer):
round_robin_gradients = self.zero_round_robin_gradients()
assert not isinstance(optimizer, DummyOptim), "zero stage 2 requires an optimizer"

log_dist('Creating fp16 ZeRO stage {} optimizer'.format(zero_stage),
ranks=[0])
# Overlap and contiguous grads are meaningless in stage 1 and are ignored
if zero_stage == ZERO_OPTIMIZATION_OPTIMIZER_STATES:
overlap_comm = False
Expand Down Expand Up @@ -1418,8 +1422,10 @@ def _configure_zero_optimizer(self, optimizer):

elif zero_stage == ZERO_OPTIMIZATION_WEIGHTS:
assert not self.has_moe_layers, "MoE not supported with Stage 3"
logger.info("Initializing ZeRO Stage 3") if dist.get_rank() == 0 else None
from deepspeed.runtime.zero.stage3 import DeepSpeedZeroOptimizer_Stage3

if isinstance(optimizer, DummyOptim):
log_dist("Creating ZeRO Offload", ranks=[0])
optimizer = DeepSpeedZeRoOffload(
self.module,
timers=timers,
Expand All @@ -1429,13 +1435,10 @@ def _configure_zero_optimizer(self, optimizer):
max_reuse_distance=self.zero_max_reuse_distance(),
max_live_parameters=self.zero_max_live_parameters(),
param_persistence_threshold=self.zero_param_persistence_threshold(),
model_persistence_threshold=self.zero_model_persistence_threshold(),
offload_param_config=self.zero_offload_param(),
mpu=self.mpu)
else:
log_dist('Creating fp16 ZeRO stage {} optimizer'.format(zero_stage),
ranks=[0])
from deepspeed.runtime.zero.stage3 import DeepSpeedZeroOptimizer_Stage3

optimizer = DeepSpeedZeroOptimizer_Stage3(
self.module,
optimizer,
Expand All @@ -1451,7 +1454,6 @@ def _configure_zero_optimizer(self, optimizer):
max_reuse_distance=self.zero_max_reuse_distance(),
max_live_parameters=self.zero_max_live_parameters(),
param_persistence_threshold=self.zero_param_persistence_threshold(),
model_persistence_threshold=self.zero_model_persistence_threshold(),
dp_process_group=self.data_parallel_group,
reduce_scatter=self.zero_reduce_scatter(),
overlap_comm=self.zero_overlap_comm(),
Expand Down
22 changes: 6 additions & 16 deletions deepspeed/runtime/zero/parameter_offload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
Licensed under the MIT license.
"""

import sys
import torch
from torch.cuda import Stream
from collections import OrderedDict
Expand Down Expand Up @@ -174,11 +173,10 @@ def __init__(self,
max_reuse_distance=1000000000,
max_live_parameters=1000000000,
param_persistence_threshold=100000,
model_persistence_threshold=sys.maxsize,
offload_param_config=None,
mpu=None):

see_memory_usage("DeepSpeedZeRoOffload initialize [begin]", force=True)
see_memory_usage("TensorOffload initialize beginning", force=True)

print_rank_0(f"initialized {__class__.__name__} with args: {locals()}",
force=False)
Expand All @@ -199,11 +197,8 @@ def __init__(self,

_inject_parameters(module, ZeROOrderedDict)

self.param_numel_persistence_threshold = int(param_persistence_threshold)
self.model_persistence_threshold = int(model_persistence_threshold)
self.persistent_parameters = self.mark_persistent_parameters(
self.param_numel_persistence_threshold,
self.model_persistence_threshold)
self.persistence_threshold = int(param_persistence_threshold)
self.persistent_parameters = self.mark_persistent_parameters()

self.param_coordinators = {}
self._prefetch_bucket_sz = int(prefetch_bucket_size)
Expand All @@ -219,8 +214,6 @@ def __init__(self,
f'Created module hooks: forward = {len(self.forward_hooks)}, backward = {len(self.backward_hooks)}',
force=False)

see_memory_usage("DeepSpeedZeRoOffload initialize [end]", force=True)

@instrument_w_nvtx
def partition_all_parameters(self):
"""Partitioning Parameters that were not partitioned usually if parameters
Expand Down Expand Up @@ -299,23 +292,20 @@ def _end_of_forward_hook(module, *args):
global FWD_MODULE_STACK
FWD_MODULE_STACK.append(self.module)

def mark_persistent_parameters(self, param_threshold, model_threshold):
def mark_persistent_parameters(self):
persistent_params = []
total_persistent_parameters = 0
params_count = 0
for _, param in self.module.named_parameters(recurse=True):
if param.ds_numel + total_persistent_parameters > model_threshold:
continue

if param.ds_numel < param_threshold:
if param.ds_numel < self.persistence_threshold:
params_count += 1
param.ds_persist = True
persistent_params.append(param)
total_persistent_parameters += param.ds_numel

print_rank_0(
f"Parameter Offload: Total persistent parameters: {total_persistent_parameters} in {params_count} params",
force=True)
force=False)

return persistent_params

Expand Down
4 changes: 4 additions & 0 deletions deepspeed/runtime/zero/partition_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,11 @@ def get_model():
# It can be same as local_device or it could be CPU or NVMe.
self.remote_device = self.local_device if remote_device is None else remote_device
self.pin_memory = pin_memory if (self.remote_device
<<<<<<< HEAD
== OFFLOAD_CPU_DEVICE) else False
=======
== OffloadDeviceEnum.cpu) else False
>>>>>>> parent of 2210ebe (Release swap buffers for persisted params (#2089))

# Enable fp16 param swapping to NVMe
if self.remote_device == OFFLOAD_NVME_DEVICE:
Expand Down
10 changes: 0 additions & 10 deletions deepspeed/runtime/zero/partitioned_param_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,6 @@ def __all_gather_params(self, params: Set[Parameter]) -> None:
assert param.ds_status == ZeroParamStatus.INFLIGHT, param.ds_summary()
self.__inflight_param_registry[param] = handle

# Release swap buffers for persisted params on nvme since they will never be partitioned or evicted from GPU
swap_persisted_params = [
p for p in partitioned_params
if p.ds_persist and p.ds_tensor.final_location == OffloadDeviceEnum.nvme
]
if swap_persisted_params:
swap_persisted_params[
0].nvme_swapper.remove_partition_and_release_buffers(
swap_persisted_params)

@instrument_w_nvtx
def __release_param(self, param: Parameter) -> None:
if param.ds_status == ZeroParamStatus.AVAILABLE and not param.ds_active_sub_modules:
Expand Down
23 changes: 9 additions & 14 deletions deepspeed/runtime/zero/stage3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
Licensed under the MIT license.
"""

import sys
import gc
import collections
from typing import Deque, Dict, Tuple
Expand Down Expand Up @@ -89,7 +88,6 @@ def __init__(self,
max_reuse_distance=1000000000,
max_live_parameters=1000000000,
param_persistence_threshold=100000,
model_persistence_threshold=sys.maxsize,
dp_process_group=None,
reduce_scatter=True,
overlap_comm=False,
Expand Down Expand Up @@ -148,18 +146,15 @@ def __init__(self,
self.params_in_nvme_and_cpu = False
self.max_params_in_cpu = 0

self.parameter_offload = DeepSpeedZeRoOffload(
module=module,
timers=timers,
ds_config=ds_config,
overlap_comm=overlap_comm,
prefetch_bucket_size=prefetch_bucket_size,
max_reuse_distance=max_reuse_distance,
max_live_parameters=max_live_parameters,
param_persistence_threshold=param_persistence_threshold,
model_persistence_threshold=model_persistence_threshold,
offload_param_config=offload_optimizer_config)

self.parameter_offload = DeepSpeedZeRoOffload(module,
timers,
ds_config,
overlap_comm,
prefetch_bucket_size,
max_reuse_distance,
max_live_parameters,
param_persistence_threshold,
offload_param_config)
self.persistent_parameters = self.parameter_offload.persistent_parameters
self._configure_offloading(offload_optimizer_config, offload_param_config)

Expand Down