Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DeepSpeedExamples
11 changes: 9 additions & 2 deletions deepspeed/pt/deepspeed_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
TENSOR_CORE_ALIGN_SIZE = 8
ADAM_OPTIMIZER = 'adam'
LAMB_OPTIMIZER = 'lamb'
DEEPSPEED_OPTIMIZERS = [ADAM_OPTIMIZER, LAMB_OPTIMIZER]
TORCH_ADAM_OPTIMIZER = 'torch_adam'
DEEPSPEED_OPTIMIZERS = [ADAM_OPTIMIZER, LAMB_OPTIMIZER, TORCH_ADAM_OPTIMIZER]


def get_amp_enabled(param_dict):
Expand Down Expand Up @@ -457,12 +458,18 @@ def _do_error_check(self):
if self.zero_enabled:
assert self.fp16_enabled, "DeepSpeedConfig: ZeRO is only supported if fp16 is enabled"
assert self.zero_optimization_stage <= MAX_STAGE_ZERO_OPTIMIZATION, "DeepSpeedConfig: Maximum supported ZeRO stage is {}".format(MAX_STAGE_ZERO_OPTIMIZATION)
if self.zero_config.cpu_offload is True:
assert self.zero_optimization_stage == ZERO_OPTIMIZATION_GRADIENTS, "DeepSpeedConfig: cpu-offload supported ZeRO stage is {}".format(ZERO_OPTIMIZATION_GRADIENTS)

assert self.train_micro_batch_size_per_gpu, "DeepSpeedConfig: {} is not defined".format(TRAIN_MICRO_BATCH_SIZE_PER_GPU)

assert self.gradient_accumulation_steps, 'DeepSpeedConfig: {} is not defined'.format(
assert self.gradient_accumulation_steps, "DeepSpeedConfig: {} is not defined".format(
GRADIENT_ACCUMULATION_STEPS)

if self.optimizer_name == TORCH_ADAM_OPTIMIZER:
assert self.zero_enabled, "ZeRO is not enabled with using TORCH_ADAM_OPTIMIZER"
assert self.zero_config.cpu_offload, " cpu_offload is not enabled with using TORCH_ADAM_OPTIMIZER"

def _do_warning_check(self):
fp16_enabled = self.fp16_enabled or self.zero_enabled

Expand Down
29 changes: 14 additions & 15 deletions deepspeed/pt/deepspeed_light.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from deepspeed.pt.fp16_unfused_optimizer import FP16_UnfusedOptimizer
from deepspeed.pt.deepspeed_fused_lamb import FusedLamb
from deepspeed.pt.deepspeed_config import DeepSpeedConfig, \
ADAM_OPTIMIZER, LAMB_OPTIMIZER, DEEPSPEED_OPTIMIZERS
ADAM_OPTIMIZER, LAMB_OPTIMIZER, TORCH_ADAM_OPTIMIZER, DEEPSPEED_OPTIMIZERS

from deepspeed.pt.deepspeed_dataloader import DeepSpeedDataLoader
from deepspeed.pt.deepspeed_constants import \
Expand Down Expand Up @@ -169,9 +169,6 @@ def __init__(self,
self.optimizer = None
self.lr_scheduler = None
if model_parameters or optimizer:
optimizer_name = self.optimizer_name()
if optimizer_name is not None and "torch.optim" in optimizer_name:
self.zero_set_cpu_offload()
self._configure_optimizer(optimizer, model_parameters)
self._configure_lr_scheduler(lr_scheduler)
self._report_progress(0)
Expand Down Expand Up @@ -298,9 +295,6 @@ def zero_overlap_comm(self):
def zero_cpu_offload(self):
return self._config.zero_config.cpu_offload

def zero_set_cpu_offload(self):
self._config.zero_config.cpu_offload = True

def zero_optimization_stage(self):
return self._config.zero_optimization_stage

Expand Down Expand Up @@ -500,13 +494,8 @@ def _configure_distributed_model(self, model):

# Configure optimizer
def _configure_optimizer(self, client_optimizer, model_parameters):
#jie:
if client_optimizer is not None and self.zero_cpu_offload():
optimizer_parameters = self.optimizer_params()
basic_optimizer = torch.optim.Adam(client_optimizer.param_groups,
**optimizer_parameters)
logger.info('Using CPU Optimizer as basic optimizer')
elif client_optimizer is not None:

if client_optimizer is not None:
basic_optimizer = client_optimizer
logger.info('Using client Optimizer as basic optimizer')
else:
Expand All @@ -519,13 +508,21 @@ def _configure_optimizer(self, client_optimizer, model_parameters):

if self.zero_optimization():
assert not self.amp_enabled(), "Amp and ZeRO are not currently compatible, please use (legacy) fp16 mode which performs similar to amp opt_mode=O2"
if self.optimizer_name() != ADAM_OPTIMIZER:
if self.optimizer_name() not in [ADAM_OPTIMIZER, TORCH_ADAM_OPTIMIZER]:
assert self.zero_allow_untested_optimizer(), \
'You are using an untested ZeRO Optimizer. Please add <"zero_allow_untested_optimizer": true> in the configuration file to use it.'

logger.warning(
"**** You are using ZeRO with an untested optimizer, proceed with caution *****"
)
if self.zero_cpu_offload():
if self.optimizer_name() != TORCH_ADAM_OPTIMIZER:
assert self.zero_allow_untested_optimizer(), \
'You are using ZeRO-Offload with an untested Optimizer. Please add <"zero_allow_untested_optimizer": true> in the configuration file to use it.'

logger.warning(
"**** You are using ZeRO-Offload with an untested optimizer, proceed with caution *****"
)
self.optimizer = self._configure_zero_optimizer(basic_optimizer)
elif self.amp_enabled():
assert not self.fp16_enabled(), "Cannot enable both amp with (legacy) fp16 mode"
Expand All @@ -551,6 +548,8 @@ def _configure_basic_optimizer(self, model_parameters):
optimizer = FusedAdam(model_parameters, **optimizer_parameters)
elif self.optimizer_name() == LAMB_OPTIMIZER:
optimizer = FusedLamb(model_parameters, **optimizer_parameters)
elif self.optimizer_name() == TORCH_ADAM_OPTIMIZER:
optimizer = torch.optim.Adam(model_parameters, **optimizer_parameters)
else:
torch_optimizer = getattr(torch.optim, self.optimizer_name())
optimizer = torch_optimizer(model_parameters, **optimizer_parameters)
Expand Down
11 changes: 5 additions & 6 deletions deepspeed/pt/deepspeed_zero_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,16 +1244,15 @@ def step(self, closure=None):
self.optimizer.step()

if self.cpu_offload:
stream = torch.cuda.current_stream()
with torch.cuda.stream(self.migration_stream):
for averaged_gradients_cpu, fp32_partition in zip(self.averaged_gradients_on_cpu, self.single_partition_of_fp32_groups):
averaged_gradients_cpu = async_copy_to(fp32_partition,
torch.cuda.current_device(),
self.migration_stream)
for fp16_partitions, fp32_partition in zip(self.parallel_partitioned_fp16_groups, self.single_partition_of_fp32_groups):
fp16_partitions[partition_id] = async_copy_to(
fp32_partition,
torch.cuda.current_device(),
torch.cuda.main_stream())
stream)
#for averaged_gradients_cpu, fp32_partition in zip(self.averaged_gradients_on_cpu, self.single_partition_of_fp32_groups):
# averaged_gradients_cpu = [fp32_partition]
else:
for fp16_partitions, fp32_partition in zip(self.parallel_partitioned_fp16_groups, self.single_partition_of_fp32_groups):
fp16_partitions[partition_id].data.copy_(fp32_partition.data)
Expand Down Expand Up @@ -1415,7 +1414,7 @@ def backward(self, loss, retain_graph=False):
device=torch.cuda.current_device())
self.ipg_buffer.append(buf_1)
self.ipg_index = 0

torch.cuda.empty_cache()
self.loss_scaler.backward(loss.float(), retain_graph=retain_graph)

def check_overflow(self, partition_gradients=True):
Expand Down
89 changes: 54 additions & 35 deletions tests/unit/test_checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,22 +235,26 @@ def _test_checkpoint_fused_optimizer(args, model, hidden_dim, load_optimizer_sta
load_optimizer_states=False)


@pytest.mark.parametrize('zero_stage, use_cpu_offload',
'''
@pytest.mark.parametrize('zero_stage, use_cpu_offload, optimizer_type',
[
(1,
False),
False,
"adam"),
(2,
False),
False,
"adam"),
(2,
True),
True,
"torch_adam"),
])
#@pytest.mark.parametrize("zero_stage", [1, 2])
def test_checkpoint_zero_optimizer(tmpdir, zero_stage, use_cpu_offload):
def test_checkpoint_zero_optimizer(tmpdir, zero_stage, use_cpu_offload, optimizer_type):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
"optimizer": {
"type": "Adam",
"type": optimizer_type,
"params": {
"lr": 0.00015,
"betas": [0.8,
Expand All @@ -264,7 +268,7 @@ def test_checkpoint_zero_optimizer(tmpdir, zero_stage, use_cpu_offload):
},
"zero_optimization": {
"stage": zero_stage,
"zero_cpu_offload": use_cpu_offload
"cpu_offload": use_cpu_offload
}
}
args = args_from_dict(tmpdir, config_dict)
Expand All @@ -284,24 +288,31 @@ def _test_checkpoint_zero_optimizer(args, model, hidden_dim, load_optimizer_stat
model=model,
hidden_dim=hidden_dim,
load_optimizer_states=True)
'''


#@pytest.mark.parametrize("zero_stage", [1, 2])
@pytest.mark.parametrize('zero_stage, use_cpu_offload',
@pytest.mark.parametrize('zero_stage, use_cpu_offload, optimizer_type',
[
(1,
False),
False,
"adam"),
(2,
False),
False,
"adam"),
(2,
True),
True,
"torch_adam"),
])
def test_checkpoint_zero_no_optimizer(tmpdir, zero_stage, use_cpu_offload):
def test_checkpoint_zero_no_optimizer(tmpdir,
zero_stage,
use_cpu_offload,
optimizer_type):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
"optimizer": {
"type": "Adam",
"type": optimizer_type,
"params": {
"lr": 0.00015,
"betas": [0.8,
Expand All @@ -315,7 +326,7 @@ def test_checkpoint_zero_no_optimizer(tmpdir, zero_stage, use_cpu_offload):
},
"zero_optimization": {
"stage": zero_stage,
"zero_cpu_offload": use_cpu_offload
"cpu_offload": use_cpu_offload
}
}
args = args_from_dict(tmpdir, config_dict)
Expand All @@ -341,23 +352,27 @@ def _test_checkpoint_zero_no_optimizer(args,


#@pytest.mark.parametrize("zero_stage", [0, 1, 2])
@pytest.mark.parametrize('zero_stage, use_cpu_offload',
@pytest.mark.parametrize('zero_stage, use_cpu_offload, optimizer_type',
[
(0,
False),
False,
"adam"),
(1,
False),
False,
"adam"),
(2,
False),
False,
"adam"),
(2,
True),
True,
"torch_adam"),
])
def test_checkpoint_lr_scheduler(tmpdir, zero_stage, use_cpu_offload):
def test_checkpoint_lr_scheduler(tmpdir, zero_stage, use_cpu_offload, optimizer_type):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
"optimizer": {
"type": "Adam",
"type": optimizer_type,
"params": {
"lr": 0.00015,
"betas": [0.8,
Expand All @@ -371,7 +386,7 @@ def test_checkpoint_lr_scheduler(tmpdir, zero_stage, use_cpu_offload):
},
"zero_optimization": {
"stage": zero_stage,
"zero_cpu_offload": use_cpu_offload
"cpu_offload": use_cpu_offload
},
"scheduler": {
"type": "WarmupLR",
Expand Down Expand Up @@ -409,23 +424,27 @@ def _test_checkpoint_lr_scheduler(args,


#@pytest.mark.parametrize("zero_stage", [0, 1, 2])
@pytest.mark.parametrize('zero_stage, use_cpu_offload',
@pytest.mark.parametrize('zero_stage, use_cpu_offload, optimizer_type',
[
(0,
False),
False,
"adam"),
(1,
False),
False,
"adam"),
(2,
False),
False,
"adam"),
(2,
True),
True,
"torch_adam"),
])
def test_checkpoint_no_lr_scheduler(tmpdir, zero_stage, use_cpu_offload):
def cpu_offload_lr_scheduler(tmpdir, zero_stage, use_cpu_offload, optimizer_type):
config_dict = {
"train_batch_size": 2,
"steps_per_print": 1,
"optimizer": {
"type": "Adam",
"type": optimizer_type,
"params": {
"lr": 1e-5
}
Expand All @@ -435,7 +454,7 @@ def test_checkpoint_no_lr_scheduler(tmpdir, zero_stage, use_cpu_offload):
},
"zero_optimization": {
"stage": zero_stage,
"zero_cpu_offload": use_cpu_offload
"cpu_offload": use_cpu_offload
},
"scheduler": {
"type": "WarmupLR",
Expand All @@ -452,11 +471,11 @@ def test_checkpoint_no_lr_scheduler(tmpdir, zero_stage, use_cpu_offload):
model = SimpleModel(hidden_dim, empty_grad=False)

@distributed_test(world_size=[2])
def _test_checkpoint_no_lr_scheduler(args,
model,
hidden_dim,
load_optimizer_states,
load_lr_scheduler_states):
def _cpu_offload_lr_scheduler(args,
model,
hidden_dim,
load_optimizer_states,
load_lr_scheduler_states):
checkpoint_correctness_verification(
args,
model,
Expand Down
Loading