Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions deepspeed/pt/deepspeed_light.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ def __init__(self,
self.optimizer = None
self.lr_scheduler = None
if model_parameters or optimizer:
if "torch.optim" in self.optimizer_name():
optimizer_name = self.optimizer_name()
if optimizer_name is not None and "torch.optim" in optimizer_name:
self.zero_set_cpu_offload()
self._configure_optimizer(optimizer, model_parameters)
self._configure_lr_scheduler(lr_scheduler)
Expand Down Expand Up @@ -264,7 +265,6 @@ def sparse_gradients_enabled(self):
def train_batch_size(self):
return self._config.train_batch_size


def train_micro_batch_size_per_gpu(self):
return self._config.train_micro_batch_size_per_gpu

Expand Down Expand Up @@ -501,11 +501,11 @@ def _configure_distributed_model(self, model):
# Configure optimizer
def _configure_optimizer(self, client_optimizer, model_parameters):
#jie:
if self.zero_cpu_offload():
if client_optimizer is not None and self.zero_cpu_offload():
optimizer_parameters = self.optimizer_params()
basic_optimizer = torch.optim.Adam(client_optimizer.param_groups,
**optimizer_parameters)
logger.info('Using CPU Optimizer as basic optimizer'
**optimizer_parameters)
logger.info('Using CPU Optimizer as basic optimizer')
elif client_optimizer is not None:
basic_optimizer = client_optimizer
logger.info('Using client Optimizer as basic optimizer')
Expand Down Expand Up @@ -547,8 +547,8 @@ def _configure_basic_optimizer(self, model_parameters):
"'max_grad_norm' is not supported as an optimizer parameter, please switch to using the deepspeed parameter 'gradient_clipping' see: https://www.deepspeed.ai/docs/config-json/#gradient-clipping for more details"
)
if self.optimizer_name() == ADAM_OPTIMIZER:
from apex.optimizers.fused_adam import FusedAdam
optimizer = FusedAdam(model_parameters, **optimizer_parameters)
from apex.optimizers.fused_adam import FusedAdam
optimizer = FusedAdam(model_parameters, **optimizer_parameters)
elif self.optimizer_name() == LAMB_OPTIMIZER:
optimizer = FusedLamb(model_parameters, **optimizer_parameters)
else:
Expand Down
52 changes: 32 additions & 20 deletions deepspeed/pt/deepspeed_zero_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def move_to_cpu(tensor_list):
def print_rank_msg(msg):
print(f"rank {dist.get_rank()} - {msg}")


#jie:asyn move to target device
def async_migrate_to(obj, dev, main_stream=None):
if torch.is_tensor(obj):
Expand Down Expand Up @@ -128,6 +129,7 @@ def async_copy_to(obj, dev, main_stream=None):
elif isinstance(obj, collections.Sequence):
return [async_copy_to(o, dev, main_stream) for o in obj]


class FP16_DeepSpeedZeroOptimizer(object):
"""
DeepSpeedZeroOptimizer designed to reduce the memory footprint
Expand Down Expand Up @@ -289,11 +291,13 @@ def __init__(self,
# a partition of the fp32 master weights that will be updated by this process
if self.cpu_offload:
self.single_partition_of_fp32_groups.append(
async_copy_to(self.parallel_partitioned_fp16_groups[i]
[partition_id], 'cpu').float()
)
self.averaged_gradients_on_cpu[i] = [torch.empty_like(self.parallel_partitioned_fp16_groups[i][partition_id],
device='cpu')]
async_copy_to(self.parallel_partitioned_fp16_groups[i][partition_id],
'cpu').float())
self.averaged_gradients_on_cpu[i] = [
torch.empty_like(
self.parallel_partitioned_fp16_groups[i][partition_id],
device='cpu')
]
else:
self.single_partition_of_fp32_groups.append(
self.parallel_partitioned_fp16_groups[i]
Expand Down Expand Up @@ -499,12 +503,12 @@ def independent_gradient_partition_epilogue(self):
if self.cpu_offload is False:
for i, _ in enumerate(self.fp16_groups):
self.averaged_gradients[i] = self.get_flat_partition(
self.params_in_partition[i],
self.first_offset[i],
self.partition_size[i],
dtype=torch.half,
device=torch.cuda.current_device(),
return_tensor_list=True)
self.params_in_partition[i],
self.first_offset[i],
self.partition_size[i],
dtype=torch.half,
device=torch.cuda.current_device(),
return_tensor_list=True)

self._release_ipg_buffers()

Expand Down Expand Up @@ -641,7 +645,8 @@ def reduce_independent_p_g_buckets_and_remove_grads(self, param, i):
#jie:
if self.cpu_offload:
with stream(self.migration_stream):
averaged_gradients_on_cpu[i][param_id] = async_copy_to(new_grad_tensor.data.view_as(param.grad),
averaged_gradients_on_cpu[i][param_id] = async_copy_to(
new_grad_tensor.data.view_as(param.grad),
'cpu',
self.cpu_computation_stream)
else:
Expand Down Expand Up @@ -767,8 +772,8 @@ def copy_grads_in_partition(self, param):
else:
'''
self.grads_in_partition = torch.empty(int(total_size),
dtype=torch.half,
device=torch.cuda.current_device())
dtype=torch.half,
device=torch.cuda.current_device())
see_memory_usage(f"after copying {total_size} gradients into partition")

#The allreduce buffer will be rewritted. Copy the gradients in partition to a new buffer
Expand Down Expand Up @@ -1217,7 +1222,7 @@ def step(self, closure=None):
self.single_partition_of_fp32_groups[i].dtype)
else:
single_grad_partition = _flatten_dense_tensors(
self.averaged_gradients_on_cpu[i]).to(
self.averaged_gradients[i]).to(
self.single_partition_of_fp32_groups[i].dtype)
assert single_grad_partition.numel() == self.partition_size[i], \
"averaged gradients have different number of elements that partition size {} {} {} {}".format(single_grad_partition.numel(), self.partition_size[i], i, partition_id)
Expand All @@ -1226,7 +1231,8 @@ def step(self, closure=None):
#release all the gradient since we have already created a necessary copy in dp_grad_partition
self.free_grad_in_param_list(self.params_in_partition[i])

self.averaged_gradients[i] = notneeded
if self.cpu_offload is False:
self.averaged_gradients[i] = None

single_partition_grad_groups.append(single_grad_partition)

Expand All @@ -1240,9 +1246,14 @@ def step(self, closure=None):
if self.cpu_offload:
with torch.cuda.stream(self.migration_stream):
for averaged_gradients_cpu, fp32_partition in zip(self.averaged_gradients_on_cpu, self.single_partition_of_fp32_groups):
averaged_gradients_cpu[0] = async_copy_to(fp32_partition, torch.cuda.current_divice(),self.migration_stream)
averaged_gradients_cpu = async_copy_to(fp32_partition,
torch.cuda.current_device(),
self.migration_stream)
for fp16_partitions, fp32_partition in zip(self.parallel_partitioned_fp16_groups, self.single_partition_of_fp32_groups):
fp16_partitions[partition_id] = async_copy_to(fp32_partition, torch.cuda.current_divice(),torch.cuda.main_stream())
fp16_partitions[partition_id] = async_copy_to(
fp32_partition,
torch.cuda.current_device(),
torch.cuda.main_stream())
else:
for fp16_partitions, fp32_partition in zip(self.parallel_partitioned_fp16_groups, self.single_partition_of_fp32_groups):
fp16_partitions[partition_id].data.copy_(fp32_partition.data)
Expand Down Expand Up @@ -1503,9 +1514,10 @@ def state_dict(self):
self.single_partition_of_fp32_groups)
state_dict['single_partition_of_fp32_groups'] = fp32_groups_without_padding


if self.cpu_offload:
state_dict_tmp = async_copy_to(state_dict, 'cpu', torch.cuda.current_stream())
state_dict_tmp = async_copy_to(state_dict,
'cpu',
torch.cuda.current_stream())
state_dict = None
state_dict = state_dict_tmp

Expand Down
1 change: 1 addition & 0 deletions deepspeed/pt/deepspeed_zero_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from torch.autograd import Variable
import collections


def async_migrate_to(obj, dev, main_stream=None):
if torch.is_tensor(obj):
obj = Variable(obj)
Expand Down
64 changes: 14 additions & 50 deletions tests/unit/test_fp16.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,8 @@ def _test_adamw_fp16_empty_grad(args, model, hidden_dim):
_test_adamw_fp16_empty_grad(args=args, model=model, hidden_dim=hidden_dim)


@pytest.mark.parametrize('zero_stage, use_cpu_offload',
[
(1,
False),
(2,
False),
(2,
True),
])
def test_adam_fp16_zero_onecycle_compatibility(tmpdir, zero_stage, use_cpu_offload):
@pytest.mark.parametrize("zero_stage", [0, 1, 2])
def test_adam_fp16_zero_onecycle_compatibility(tmpdir, zero_stage):
config_dict = {
"train_batch_size": 1,
"steps_per_print": 1,
Expand Down Expand Up @@ -254,8 +246,7 @@ def test_adam_fp16_zero_onecycle_compatibility(tmpdir, zero_stage, use_cpu_offlo
"enabled": True
},
"zero_optimization": {
"stage": zero_stage,
"cpu_offload": use_cpu_offload
"stage": zero_stage
}
}

Expand Down Expand Up @@ -283,17 +274,8 @@ def _test_adam_fp16_zero_onecycle_compatibility(args, model, hidden_dim):
hidden_dim=hidden_dim)


#@pytest.mark.parametrize("zero_stage", [1, 2])
@pytest.mark.parametrize('zero_stage, use_cpu_offload',
[
(1,
False),
(2,
False),
(2,
True),
])
def test_zero_static_scale(tmpdir, zero_stage, use_cpu_offload):
@pytest.mark.parametrize("zero_stage", [1, 2])
def test_zero_static_scale(tmpdir, zero_stage):
config_dict = {
"train_batch_size": 4,
"steps_per_print": 1,
Expand All @@ -308,8 +290,7 @@ def test_zero_static_scale(tmpdir, zero_stage, use_cpu_offload):
"loss_scale": 138.
},
"zero_optimization": {
"stage": zero_stage,
"zero_cpu_offload": use_cpu_offload
"stage": zero_stage
}
}
args = args_from_dict(tmpdir, config_dict)
Expand Down Expand Up @@ -352,7 +333,8 @@ def test_zero_static_scale_deprecated_format(tmpdir):
"fp16": {
"enabled": True,
"loss_scale": 138.
}
},
"zero_optimization": True
}
args = args_from_dict(tmpdir, config_dict)

Expand Down Expand Up @@ -381,25 +363,16 @@ def _test_zero_static_scale(args):
_test_zero_static_scale(args)


@pytest.mark.parametrize('zero_stage, use_cpu_offload',
[
(1,
False),
(2,
False),
(2,
True),
])
def test_zero_allow_untested_optimizer(tmpdir, zero_stage, use_cpu_offload):
@pytest.mark.parametrize("zero_stage", [1, 2])
def test_zero_allow_untested_optimizer(tmpdir, zero_stage):
config_dict = {
"train_batch_size": 4,
"steps_per_print": 1,
"fp16": {
"enabled": True,
},
"zero_optimization": {
"stage": zero_stage,
"cpu_offload": use_cpu_offload
"stage": zero_stage
},
"zero_allow_untested_optimizer": False
}
Expand All @@ -419,16 +392,8 @@ def _test_zero_allow_untested_optimizer(args):
_test_zero_allow_untested_optimizer(args)


@pytest.mark.parametrize('zero_stage, use_cpu_offload',
[
(1,
False),
(2,
False),
(2,
True),
])
def test_zero_empty_partition(tmpdir, zero_stage, use_cpu_offload):
@pytest.mark.parametrize("zero_stage", [1, 2])
def test_zero_empty_partition(tmpdir, zero_stage):
config_dict = {
"train_micro_batch_size_per_gpu": 1,
"gradient_accumulation_steps": 1,
Expand All @@ -443,8 +408,7 @@ def test_zero_empty_partition(tmpdir, zero_stage, use_cpu_offload):
}
},
"zero_optimization": {
"stage": zero_stage,
"cpu_offload": use_cpu_offload
"stage": zero_stage
}
}
args = args_from_dict(tmpdir, config_dict)
Expand Down