Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
55657dc
synced gpus
stas00 Mar 16, 2021
05bddda
fix
stas00 Mar 16, 2021
a5adf30
fix
stas00 Mar 16, 2021
a241298
Merge remote-tracking branch 'origin/master' into ds-zero-3
stas00 Mar 16, 2021
08e43f2
need to use t5-small for quality tests
stas00 Mar 16, 2021
2cd947b
notes
stas00 Mar 17, 2021
a374e57
Merge remote-tracking branch 'origin/master' into ds-zero-3
stas00 Mar 17, 2021
d48ad17
complete merge
stas00 Mar 17, 2021
ad7605d
fix a disappearing std stream problem
stas00 Mar 18, 2021
a7fafbb
start zero3 tests
stas00 Mar 18, 2021
b725c96
Merge remote-tracking branch 'origin/master' into ds-zero-3
stas00 Mar 18, 2021
f936858
wip
stas00 Mar 19, 2021
0a5d872
tune params
stas00 Mar 19, 2021
80fbbe7
sorting out the pre-trained model loading
stas00 Mar 23, 2021
538a402
reworking generate loop wip
stas00 Mar 23, 2021
a3ce950
wip
stas00 Mar 30, 2021
050abd6
Merge remote-tracking branch 'origin/master' into ds-zero-3
stas00 Mar 30, 2021
0f58b8d
style
stas00 Mar 30, 2021
d2f1f70
fix tests
stas00 Mar 30, 2021
d585c8c
split the tests
stas00 Mar 30, 2021
399b981
refactor tests
stas00 Mar 30, 2021
dfe28b3
wip
stas00 Mar 30, 2021
2c3564e
parameterized
stas00 Mar 30, 2021
46e2368
fix
stas00 Mar 30, 2021
8f26f9a
workout the resume from non-ds checkpoint pass + test
stas00 Mar 31, 2021
47e3556
cleanup
stas00 Mar 31, 2021
1cfec3c
remove no longer needed code
stas00 Mar 31, 2021
1cde0d8
Merge remote-tracking branch 'origin/master' into ds-zero-3
stas00 Mar 31, 2021
17e0464
split getter/setter functions
stas00 Mar 31, 2021
ea44db6
complete the docs
stas00 Apr 1, 2021
aa63fc2
suggestions
stas00 Apr 1, 2021
1399f59
gpus and their compute capabilities link
stas00 Apr 1, 2021
cb0de9d
Apply suggestions from code review
stas00 Apr 1, 2021
f34a62b
style
stas00 Apr 2, 2021
376fe60
Merge branch 'ds-zero-3' of github.com:stas00/transformers into ds-ze…
stas00 Apr 2, 2021
0799eac
remove invalid paramgd
stas00 Apr 5, 2021
e365dfe
automatically configure zero3 params that rely on hidden size
stas00 Apr 6, 2021
b3137ae
make _get_resized_embeddings zero3-aware
stas00 Apr 6, 2021
1708a82
add test exercising resize_token_embeddings()
stas00 Apr 6, 2021
6ee8744
add docstring
stas00 Apr 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
644 changes: 584 additions & 60 deletions docs/source/main_classes/trainer.rst

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"enabled": true,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 32,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
Expand Down
48 changes: 48 additions & 0 deletions examples/tests/deepspeed/ds_config_zero3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"fp16": {
"enabled": true,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},

"zero_optimization": {
"stage": 3,
"cpu_offload": true,
"cpu_offload_params": true,
"cpu_offload_use_pin_memory" : true,
"overlap_comm": true,
"contiguous_gradients": true,
"sub_group_size": 1e14,
"reduce_bucket_size": 0,
"stage3_prefetch_bucket_size": 0,
"stage3_param_persistence_threshold": 0,
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9,
"stage3_gather_fp16_weights_on_model_save": true
},

"optimizer": {
"type": "AdamW",
"params": {
"lr": 3e-5,
"betas": [0.8, 0.999],
"eps": 1e-8,
"weight_decay": 3e-7
}
},

"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 3e-5,
"warmup_num_steps": 500
}
},

"steps_per_print": 2000,
"wall_clock_breakdown": false
}
413 changes: 310 additions & 103 deletions examples/tests/deepspeed/test_deepspeed.py

Large diffs are not rendered by default.

182 changes: 149 additions & 33 deletions src/transformers/generation_utils.py

Large diffs are not rendered by default.

149 changes: 116 additions & 33 deletions src/transformers/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import numbers
import os
import sys
import tempfile
from copy import deepcopy
from pathlib import Path
Expand Down Expand Up @@ -268,7 +269,77 @@ def rewrite_logs(d):
return new_d


def init_deepspeed(trainer, num_training_steps, resume_from_checkpoint=None):
_is_deepspeed_zero3_enabled = None


def is_deepspeed_zero3_enabled():
"""
This function answers to the question of whether DeepSpeed is going to be used and run using ZeRO Stage 3.

It includes an auto-discovery method, see comments in the code for details.

Returns: ``True`` if either it was explicitly enabled via ``deepspeed_zero3_enable(True)`` or the auto-detector was
able to derive that the ``Trainer`` will be running via DeepSpeed ZeRO stage 3.
"""
global _is_deepspeed_zero3_enabled
if _is_deepspeed_zero3_enabled is None:
_is_deepspeed_zero3_enabled = False
# Try to auto-discover if we are about to use DeepSpeed with ZeRO3 enabled. This will only
# work for scripts using cli to pass --deepspeed ds_config.json. If cmd args aren't used,
# then to get the model efficiently loaded across multiple-gpus one has to explicitly call
# is_deepspeed_zero3_enabled(True) **before** instantiating a model object
if "--deepspeed" in sys.argv:
idx = sys.argv.index("--deepspeed")
ds_config = sys.argv[idx + 1]
if not os.path.exists(ds_config):
raise ValueError("--deepspeed requires a valid path to a config file")
config = deepspeed_parse_config(ds_config)
if (
"zero_optimization" in config
and "stage" in config["zero_optimization"]
and config["zero_optimization"]["stage"] == 3
):
_is_deepspeed_zero3_enabled = True

return _is_deepspeed_zero3_enabled


def deepspeed_zero3_enable(enable=True):
"""
``is_deepspeed_zero3_enabled()`` tries to derive automatically if DeepSpeed ZeRO 3 is going to be used by looking
at ``sys.argv`` which may or may contain information about where to find the DeepSpeed config if any.

This function allows for explicit enabling/disabling of this global flag.

Args:
enable: if set to ``True`` will make ``is_deepspeed_zero3_enabled()`` return ``True``
"""
global _is_deepspeed_zero3_enabled
_is_deepspeed_zero3_enabled = enable


def deepspeed_parse_config(ds_config):
"""
If ``ds_config`` isn't already a dict, read it from the config file.

If it's already a dict, return a copy of it, so that we can freely modify it.
"""
require_version("deepspeed>0.3.13")

if isinstance(ds_config, dict):
# Don't modify user's data should they want to reuse it (e.g. in tests), because once we
# modified it, it will not be accepted here again, since some config params must be not set by users
config = deepcopy(ds_config)
elif isinstance(ds_config, str):
with io.open(ds_config, "r", encoding="utf-8") as f:
config = json.load(f)
else:
raise ValueError("expecting either a path to a config file or a pre-populated dict")

return config


def deepspeed_init(trainer, num_training_steps, resume_from_checkpoint=None):
"""
Init DeepSpeed, after updating the DeepSpeed configuration with any relevant Trainer's args.

Expand All @@ -284,21 +355,10 @@ def init_deepspeed(trainer, num_training_steps, resume_from_checkpoint=None):
"""
import deepspeed

require_version("deepspeed>0.3.12")

args = trainer.args
ds_config_file = args.deepspeed
model = trainer.model

if isinstance(args.deepspeed, dict):
# Don't modify user's data should they want to reuse it (e.g. in tests), because once we
# modified it, it will not be accepted here again, since some config params must be not set by users
config = deepcopy(args.deepspeed)
elif isinstance(args.deepspeed, str):
with io.open(ds_config_file, "r", encoding="utf-8") as f:
config = json.load(f)
else:
raise ValueError("expecting either a path to a config file or a pre-populated dict")
config = deepspeed_parse_config(args.deepspeed)

# The following code translates relevant trainer's cl args into the DS config

Expand All @@ -324,9 +384,7 @@ def init_deepspeed(trainer, num_training_steps, resume_from_checkpoint=None):
config["gradient_accumulation_steps"] = args.gradient_accumulation_steps

if "gradient_clipping" in config:
logger.info(
f"Keeping the `gradient_clipping` config from {ds_config_file} intact, ignoring any gradient clipping-specific cl args"
)
logger.info("Keeping the `gradient_clipping` config intact, ignoring any gradient clipping-specific cl args")
else: # override only if the ds config doesn't already have this section
config["gradient_clipping"] = args.max_grad_norm

Expand All @@ -336,6 +394,7 @@ def init_deepspeed(trainer, num_training_steps, resume_from_checkpoint=None):
# 2. HF scheduler + HF optimizer: Yes
# 3. DS scheduler + HF optimizer: Yes
# 4. HF scheduler + DS optimizer: No
#
# Unless Offload is enabled in which case it's:
# 1. DS scheduler + DS optimizer: Yes
# 2. HF scheduler + HF optimizer: No
Expand All @@ -344,7 +403,7 @@ def init_deepspeed(trainer, num_training_steps, resume_from_checkpoint=None):

optimizer = None
if "optimizer" in config:
logger.info(f"Updating the `scheduler` config from {ds_config_file} with other command line arguments")
logger.info("Updating the `scheduler` config with other command line arguments")

# to avoid inconsistent values of lr and warm up steps the command line args override config
params = dict(
Expand Down Expand Up @@ -384,7 +443,7 @@ def init_deepspeed(trainer, num_training_steps, resume_from_checkpoint=None):
# WarmupDecayLR| linear | get_linear_schedule_with_warmup |
lr_scheduler = None
if "scheduler" in config:
logger.info(f"Updating the `scheduler` config from {ds_config_file} with other command line arguments")
logger.info("Updating the `scheduler` config with other command line arguments")
# the user won't easily know the correct num_training_steps should they use WarmupDecayLR,
# so let's set it to the correct value
if config["scheduler"]["type"] == "WarmupDecayLR":
Expand Down Expand Up @@ -417,29 +476,41 @@ def init_deepspeed(trainer, num_training_steps, resume_from_checkpoint=None):
# - `amp`: which delegates amp work to apex (which needs to be available), but it cannot be used with any ZeRO features, so probably best to be avoided.
if trainer.fp16_backend == "apex":
if "amp" in config:
logger.info(
f"Keeping the `amp` config from {ds_config_file} intact, ignoring any amp-specific cl args"
)
logger.info("Keeping the `amp` config intact, ignoring any amp-specific cl args")
else:
config["amp"] = {
"enabled": True,
"opt_level": args.fp16_opt_level,
}
elif trainer.fp16_backend == "amp":
if "fp16" in config:
logger.info(
f"Keeping the `fp16` config from {ds_config_file} intact, ignoring any fp16-specific cl args"
)
logger.info("Keeping the `fp16` config intact, ignoring any fp16-specific cl args")
else:
config["fp16"] = {
"enabled": True,
}

# zero
if "zero_optimization" in config:
zero = config["zero_optimization"]

# now we know for sure if zero3 is enabled
deepspeed_zero3_enable(zero.get("stage") == 3)

# automatically assign the optimal config values based on model config
hidden_size = model.config.hidden_size
if zero.get("reduce_bucket_size") == 0:
zero["reduce_bucket_size"] = hidden_size * hidden_size
if zero.get("stage3_prefetch_bucket_size") == 0:
zero["stage3_prefetch_bucket_size"] = 0.9 * hidden_size * hidden_size
if zero.get("stage3_param_persistence_threshold") == 0:
zero["stage3_param_persistence_threshold"] = 10 * hidden_size

# keep for quick debug:
# from pprint import pprint; pprint(config)

# init that takes part of the config via `args`, and the bulk of it via `config_params`
model_parameters = filter(lambda p: p.requires_grad, model.parameters())

model, optimizer, _, lr_scheduler = deepspeed.initialize(
model=model,
model_parameters=model_parameters,
Expand All @@ -448,14 +519,26 @@ def init_deepspeed(trainer, num_training_steps, resume_from_checkpoint=None):
lr_scheduler=lr_scheduler,
)

if resume_from_checkpoint is not None: # and os.path.isdir(resume_from_checkpoint):
logger.info(f"Attempting to resume from {resume_from_checkpoint}")
# this magically updates self.optimizer and self.lr_scheduler
load_path, _ = model.load_checkpoint(
resume_from_checkpoint, load_optimizer_states=True, load_lr_scheduler_states=True
)
if load_path is None:
raise ValueError(f"[deepspeed] failed to resume from checkpoint {resume_from_checkpoint}")
if resume_from_checkpoint is not None:

# it's possible that the user is trying to resume from model_path, which doesn't necessarily
# contain a deepspeed checkpoint. e.g. examples just check if the dir exists and assume it's
# a resume from a checkpoint and not just a local pretrained weight. So we check here if the
# path contains what looks like a deepspeed checkpoint
import glob

deepspeed_checkpoint_dirs = sorted(glob.glob(f"{resume_from_checkpoint}/global_step*"))

if len(deepspeed_checkpoint_dirs) > 0:
logger.info(f"Attempting to resume from {resume_from_checkpoint}")
# this magically updates self.optimizer and self.lr_scheduler
load_path, _ = model.load_checkpoint(
resume_from_checkpoint, load_optimizer_states=True, load_lr_scheduler_states=True
)
if load_path is None:
raise ValueError(f"[deepspeed] failed to resume from checkpoint {resume_from_checkpoint}")
else:
logger.info(f"{resume_from_checkpoint} doesn't have deepspeed checkpoints, doing nothing")

return model, optimizer, lr_scheduler

Expand Down
56 changes: 43 additions & 13 deletions src/transformers/modeling_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
replace_return_docstrings,
)
from .generation_utils import GenerationMixin
from .integrations import is_deepspeed_zero3_enabled
from .utils import logging


Expand Down Expand Up @@ -660,7 +661,14 @@ def _get_resized_embeddings(
if new_num_tokens is None:
return old_embeddings

old_num_tokens, old_embedding_dim = old_embeddings.weight.size()
if is_deepspeed_zero3_enabled():
import deepspeed

with deepspeed.zero.GatheredParameters(old_embeddings.weight, modifier_rank=None):
old_num_tokens, old_embedding_dim = old_embeddings.weight.size()
else:
old_num_tokens, old_embedding_dim = old_embeddings.weight.size()

if old_num_tokens == new_num_tokens:
return old_embeddings

Expand All @@ -677,8 +685,17 @@ def _get_resized_embeddings(
self._init_weights(new_embeddings)

# Copy token embeddings from the previous weights
num_tokens_to_copy = min(old_num_tokens, new_num_tokens)
new_embeddings.weight.data[:num_tokens_to_copy, :] = old_embeddings.weight.data[:num_tokens_to_copy, :]

# numbers of tokens to copy
n = min(old_num_tokens, new_num_tokens)
if is_deepspeed_zero3_enabled():
import deepspeed

with deepspeed.zero.GatheredParameters(old_embeddings.weight, modifier_rank=0):
if torch.distributed.get_rank() == 0:
new_embeddings.weight.data[:n, :] = old_embeddings.weight.data[:n, :]
else:
new_embeddings.weight.data[:n, :] = old_embeddings.weight.data[:n, :]

return new_embeddings

Expand Down Expand Up @@ -1056,7 +1073,16 @@ def from_pretrained(cls, pretrained_model_name_or_path: Optional[Union[str, os.P
config.name_or_path = pretrained_model_name_or_path

# Instantiate model.
model = cls(config, *model_args, **model_kwargs)

if is_deepspeed_zero3_enabled():
import deepspeed

logger.info("Detected DeepSpeed ZeRO-3: activating zero.init() for this model")
# this immediately partitions the model to avoid the overhead in time and memory copying it on CPU or each GPU first
with deepspeed.zero.Init():
model = cls(config, *model_args, **model_kwargs)
else:
model = cls(config, *model_args, **model_kwargs)

if state_dict is None and not from_tf:
try:
Expand Down Expand Up @@ -1114,15 +1140,19 @@ def from_pretrained(cls, pretrained_model_name_or_path: Optional[Union[str, os.P
# so we need to apply the function recursively.
def load(module: nn.Module, prefix=""):
local_metadata = {} if metadata is None else metadata.get(prefix[:-1], {})
module._load_from_state_dict(
state_dict,
prefix,
local_metadata,
True,
missing_keys,
unexpected_keys,
error_msgs,
)
args = (state_dict, prefix, local_metadata, True, missing_keys, unexpected_keys, error_msgs)
if is_deepspeed_zero3_enabled():
import deepspeed

# because zero3 puts placeholders in model params, this context
# manager gathers (unpartitions) the params of the current layer, then loads from
# the state dict and then re-partitions them again
with deepspeed.zero.GatheredParameters(list(module.parameters(recurse=False)), modifier_rank=0):
if torch.distributed.get_rank() == 0:
module._load_from_state_dict(*args)
else:
module._load_from_state_dict(*args)

for name, child in module._modules.items():
if child is not None:
load(child, prefix + name + ".")
Expand Down
Loading