diff --git a/deepspeed/inference/config.py b/deepspeed/inference/config.py index a2facbf2b4a5..c3c4110559af 100644 --- a/deepspeed/inference/config.py +++ b/deepspeed/inference/config.py @@ -1,5 +1,5 @@ import torch -from deepspeed.runtime.config_utils import DeepSpeedConfigModel +from deepspeed.runtime.config_utils import DeepSpeedConfigModel, DtypeEnum from deepspeed.runtime.zero.config import DeepSpeedZeroConfig from pydantic import Field from pydantic import validator @@ -7,33 +7,6 @@ from enum import Enum -class DtypeEnum(Enum): - # The torch dtype must always be the first value (so we return torch.dtype) - fp16 = torch.float16, "torch.float16", "fp16", "float16", "half" - bf16 = torch.bfloat16, "torch.bfloat16", "bf16", "bfloat16" - fp32 = torch.float32, "torch.float32", "fp32", "float32", "float" - int8 = torch.int8, "torch.int8", "int8" - - # Copied from https://stackoverflow.com/a/43210118 - # Allows us to use multiple values for each Enum index and returns first - # listed value when Enum is called - def __new__(cls, *values): - obj = object.__new__(cls) - # first value is canonical value - obj._value_ = values[0] - for other_value in values[1:]: - cls._value2member_map_[other_value] = obj - obj._all_values = values - return obj - - def __repr__(self): - return "<%s.%s: %s>" % ( - self.__class__.__name__, - self._name_, - ", ".join([repr(v) for v in self._all_values]), - ) - - class MoETypeEnum(str, Enum): residual = "residual" standard = "standard" diff --git a/deepspeed/runtime/config.py b/deepspeed/runtime/config.py old mode 100755 new mode 100644 index 98548c45873d..0e8a96ba270f --- a/deepspeed/runtime/config.py +++ b/deepspeed/runtime/config.py @@ -1,1059 +1,488 @@ -""" -Copyright (c) Microsoft Corporation -Licensed under the MIT license. -""" -import os -from typing import Union - -import torch -import json -import copy -import base64 - -from .constants import * -from .fp16.loss_scaler import ( - INITIAL_LOSS_SCALE, - SCALE_WINDOW, - DELAYED_SHIFT, - MIN_LOSS_SCALE, -) -from .config_utils import ( - get_scalar_param, - dict_raise_error_on_duplicate_keys, - ScientificNotationEncoder, -) -from .zero.config import get_zero_config, ZeroStageEnum -from .activation_checkpointing.config import DeepSpeedActivationCheckpointingConfig -from ..comm.config import DeepSpeedCommsConfig -from ..monitor.config import DeepSpeedMonitorConfig - +from pydantic import Field, validator, root_validator from deepspeed import comm as dist +from deepspeed.config_utils import DeepSpeedConfigModel, DtypeEnum +from deepspeed.runtime.zero.config import DeepSpeedZeroConfig -from ..git_version_info import version as __version__ -from ..utils import logger - -from ..elasticity import ( - elasticity_enabled, - compute_elastic_config, - ensure_immutable_elastic_config, -) -from ..elasticity.config import ElasticityConfigError -from ..elasticity.constants import ( - ELASTICITY, - IGNORE_NON_ELASTIC_BATCH_INFO, - IGNORE_NON_ELASTIC_BATCH_INFO_DEFAULT, - MODEL_PARLLEL_SIZE, - MODEL_PARLLEL_SIZE_DEFAULT, - NUM_GPUS_PER_NODE, - NUM_GPUS_PER_NODE_DEFAULT, -) - -from ..profiling.config import DeepSpeedFlopsProfilerConfig -from ..autotuning.config import DeepSpeedAutotuningConfig -from ..nebula.config import DeepSpeedNebulaConfig - -from ..compression.config import get_compression_config, get_quantize_enabled -from ..compression.constants import * -from .swap_tensor.aio_config import get_aio_config - -from .data_pipeline.config import get_data_efficiency_enabled, get_data_efficiency_config, get_curriculum_enabled_legacy, get_curriculum_params_legacy -from .data_pipeline.constants import * - -TENSOR_CORE_ALIGN_SIZE = 8 - -ADAGRAD_OPTIMIZER = 'adagrad' -ADAM_OPTIMIZER = 'adam' -ADAMW_OPTIMIZER = 'adamw' -LAMB_OPTIMIZER = 'lamb' -ONEBIT_ADAM_OPTIMIZER = 'onebitadam' -ZERO_ONE_ADAM_OPTIMIZER = 'zerooneadam' -ONEBIT_LAMB_OPTIMIZER = 'onebitlamb' -DEEPSPEED_OPTIMIZERS = [ - ADAGRAD_OPTIMIZER, - ADAM_OPTIMIZER, - ADAMW_OPTIMIZER, - LAMB_OPTIMIZER, - ONEBIT_ADAM_OPTIMIZER, - ONEBIT_LAMB_OPTIMIZER, - ZERO_ONE_ADAM_OPTIMIZER -] - -# extra optimizer parameters for adam/adamw -TORCH_ADAM_PARAM = "torch_adam" - -# default to adamw logic for adam/adamw optimizers unless user explicitly opts out -ADAM_W_MODE = "adam_w_mode" -ADAM_W_MODE_DEFAULT = True - - -class DeepSpeedConfigError(Exception): - pass - - -def get_pld_enabled(param_dict): - if PROGRESSIVE_LAYER_DROP in param_dict.keys(): - return get_scalar_param(param_dict[PROGRESSIVE_LAYER_DROP], - PLD_ENABLED, - PLD_ENABLED_DEFAULT) - else: - return False - - -def get_pld_params(param_dict): - if PROGRESSIVE_LAYER_DROP in param_dict.keys(): - pld_params = copy.copy(param_dict[PROGRESSIVE_LAYER_DROP]) - pld_params.pop(PLD_ENABLED) - return pld_params - else: - return False + +class DeepSpeedFP16Config(DeepSpeedConfigModel): + enabled: bool = False + master_weights_and_grads: bool = Field(False, alias="fp16_master_weights_and_grads") + auto_cast: bool = False + loss_scale: float = Field(0, ge=0) + initial_scale_power: float = Field(16, ge=0) + loss_scale_window: float = Field(1000, ge=0) + hysteresis: float = Field(2, ge=0) + min_loss_scale: float = Field(1, gt=0) -def get_amp_enabled(param_dict): - if AMP in param_dict.keys(): - return get_scalar_param(param_dict[AMP], AMP_ENABLED, AMP_ENABLED_DEFAULT) - else: - return False +class DeepSpeedBF16Config(DeepSpeedConfigModel): + enabled: bool = False -def get_amp_params(param_dict): - if AMP in param_dict.keys(): - amp_params = copy.copy(param_dict[AMP]) - amp_params.pop(AMP_ENABLED) +class DeepSpeedAMPConfig(DeepSpeedConfigModel): + enabled: bool = False + + @property + def params(self): + amp_params = self.dict() + amp_params.pop("enabled") return amp_params - else: - return False - - -def get_fp16_enabled(param_dict): - if FP16 in param_dict.keys(): - return get_scalar_param(param_dict[FP16], FP16_ENABLED, FP16_ENABLED_DEFAULT) - else: - return False - - -def get_bfloat16_enabled(param_dict): - for key in [BFLOAT16, BFLOAT16_OLD]: - if key in param_dict.keys(): - return get_scalar_param(param_dict[key], - BFLOAT16_ENABLED, - BFLOAT16_ENABLED_DEFAULT) - return False - - -def get_fp16_master_weights_and_grads_enabled(param_dict): - if get_fp16_enabled(param_dict): - return get_scalar_param(param_dict[FP16], - FP16_MASTER_WEIGHTS_AND_GRADS, - FP16_MASTER_WEIGHTS_AND_GRADS_DEFAULT) - else: - return False - - -def get_fp16_auto_cast(param_dict): - if get_fp16_enabled(param_dict): - return get_scalar_param(param_dict[FP16], FP16_AUTO_CAST, FP16_AUTO_CAST_DEFAULT) - - -def get_loss_scale(param_dict): - if get_fp16_enabled(param_dict): - return get_scalar_param(param_dict[FP16], - FP16_LOSS_SCALE, - FP16_LOSS_SCALE_DEFAULT) - elif get_bfloat16_enabled(param_dict): - return 1.0 - else: - return FP16_LOSS_SCALE_DEFAULT - -def get_initial_dynamic_scale(param_dict): - if get_fp16_enabled(param_dict): - initial_scale_power = get_scalar_param(param_dict[FP16], - FP16_INITIAL_SCALE_POWER, - FP16_INITIAL_SCALE_POWER_DEFAULT) - elif get_bfloat16_enabled(param_dict): - initial_scale_power = 0 - else: - initial_scale_power = FP16_INITIAL_SCALE_POWER_DEFAULT + class Config: + extra = "allow" # This config can be populated by any kwargs that AMP supports - return 2**initial_scale_power - - -def get_dynamic_loss_scale_args(param_dict): - loss_scale_args = None - if get_fp16_enabled(param_dict): - fp16_dict = param_dict[FP16] - dynamic_loss_args = [ - FP16_INITIAL_SCALE_POWER, - FP16_LOSS_SCALE_WINDOW, - FP16_MIN_LOSS_SCALE, - FP16_HYSTERESIS, - ] - if any(arg in list(fp16_dict.keys()) for arg in dynamic_loss_args): - init_scale = get_scalar_param(fp16_dict, - FP16_INITIAL_SCALE_POWER, - FP16_INITIAL_SCALE_POWER_DEFAULT) - scale_window = get_scalar_param(fp16_dict, - FP16_LOSS_SCALE_WINDOW, - FP16_LOSS_SCALE_WINDOW_DEFAULT) - delayed_shift = get_scalar_param(fp16_dict, - FP16_HYSTERESIS, - FP16_HYSTERESIS_DEFAULT) - min_loss_scale = get_scalar_param(fp16_dict, - FP16_MIN_LOSS_SCALE, - FP16_MIN_LOSS_SCALE_DEFAULT) - loss_scale_args = { - INITIAL_LOSS_SCALE: 2**init_scale, - SCALE_WINDOW: scale_window, - DELAYED_SHIFT: delayed_shift, - MIN_LOSS_SCALE: min_loss_scale, - } - - return loss_scale_args - - -def get_gradient_accumulation_steps(param_dict): - return get_scalar_param(param_dict, - GRADIENT_ACCUMULATION_STEPS, - GRADIENT_ACCUMULATION_STEPS_DEFAULT) - - -def get_sparse_gradients_enabled(param_dict): - return get_scalar_param(param_dict, SPARSE_GRADIENTS, SPARSE_GRADIENTS_DEFAULT) - - -def get_communication_data_type(param_dict): - val = get_scalar_param(param_dict, - COMMUNICATION_DATA_TYPE, - COMMUNICATION_DATA_TYPE_DEFAULT) - val = val.lower() if val is not None else val - if val is None: - return val # we must determine it by other parameters - elif val == "fp32": - return torch.float32 - elif val == "fp16": - return torch.float16 - elif val == "bfp16": - return torch.bfloat16 - - raise ValueError( - f"Invalid communication_data_type. Supported data types: ['fp16', 'bfp16', 'fp32']. Got: {val}" - ) - - -def get_prescale_gradients(param_dict): - return get_scalar_param(param_dict, PRESCALE_GRADIENTS, PRESCALE_GRADIENTS_DEFAULT) - - -def get_gradient_predivide_factor(param_dict): - return get_scalar_param(param_dict, - GRADIENT_PREDIVIDE_FACTOR, - GRADIENT_PREDIVIDE_FACTOR_DEFAULT) - - -def get_steps_per_print(param_dict): - return get_scalar_param(param_dict, STEPS_PER_PRINT, STEPS_PER_PRINT_DEFAULT) - - -def get_disable_allgather(param_dict): - return get_scalar_param(param_dict, DISABLE_ALLGATHER, DISABLE_ALLGATHER_DEFAULT) - - -def get_dump_state(param_dict): - return get_scalar_param(param_dict, DUMP_STATE, DUMP_STATE_DEFAULT) - - -def get_gradient_clipping(param_dict): - return get_scalar_param(param_dict, GRADIENT_CLIPPING, GRADIENT_CLIPPING_DEFAULT) - - -def get_sparse_attention(param_dict): - if SPARSE_ATTENTION in param_dict.keys(): - sparsity = param_dict[SPARSE_ATTENTION] - mode = get_sparse_attention_mode(sparsity) - - if mode == SPARSE_DENSE_MODE: - return get_sparse_dense_config(sparsity) - elif mode == SPARSE_FIXED_MODE: - return get_sparse_fixed_config(sparsity) - elif mode == SPARSE_VARIABLE_MODE: - return get_sparse_variable_config(sparsity) - elif mode == SPARSE_BIGBIRD_MODE: - return get_sparse_bigbird_config(sparsity) - elif mode == SPARSE_BSLONGFORMER_MODE: - return get_sparse_bslongformer_config(sparsity) - else: - raise NotImplementedError( - f"Given sparsity mode, {mode}, has not been implemented yet!") - - else: - return None - - -def get_sparse_dense_config(sparsity): - block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) - return {SPARSE_MODE: SPARSE_DENSE_MODE, SPARSE_BLOCK: block} - - -def get_sparse_fixed_config(sparsity): - block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) - different_layout_per_head = get_scalar_param( - sparsity, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD_DEFAULT, - ) - num_local_blocks = get_scalar_param(sparsity, - SPARSE_NUM_LOCAL_BLOCKS, - SPARSE_NUM_LOCAL_BLOCKS_DEFAULT) - num_global_blocks = get_scalar_param(sparsity, - SPARSE_NUM_GLOBAL_BLOCKS, - SPARSE_NUM_GLOBAL_BLOCKS_DEFAULT) - attention = get_scalar_param(sparsity, - SPARSE_ATTENTION_TYPE, - SPARSE_ATTENTION_TYPE_DEFAULT) - horizontal_global_attention = get_scalar_param( - sparsity, - SPARSE_HORIZONTAL_GLOBAL_ATTENTION, - SPARSE_HORIZONTAL_GLOBAL_ATTENTION_DEFAULT, - ) - num_different_global_patterns = get_scalar_param( - sparsity, - SPARSE_NUM_DIFFERENT_GLOBAL_PATTERNS, - SPARSE_NUM_DIFFERENT_GLOBAL_PATTERNS_DEFAULT, - ) - - return { - SPARSE_MODE: SPARSE_FIXED_MODE, - SPARSE_BLOCK: block, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD: different_layout_per_head, - SPARSE_NUM_LOCAL_BLOCKS: num_local_blocks, - SPARSE_NUM_GLOBAL_BLOCKS: num_global_blocks, - SPARSE_ATTENTION_TYPE: attention, - SPARSE_HORIZONTAL_GLOBAL_ATTENTION: horizontal_global_attention, - SPARSE_NUM_DIFFERENT_GLOBAL_PATTERNS: num_different_global_patterns, - } - - -def get_sparse_variable_config(sparsity): - block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) - different_layout_per_head = get_scalar_param( - sparsity, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD_DEFAULT, - ) - num_random_blocks = get_scalar_param(sparsity, - SPARSE_NUM_RANDOM_BLOCKS, - SPARSE_NUM_RANDOM_BLOCKS_DEFAULT) - local_window_blocks = get_scalar_param(sparsity, - SPARSE_LOCAL_WINDOW_BLOCKS, - SPARSE_LOCAL_WINDOW_BLOCKS_DEFAULT) - global_block_indices = get_scalar_param(sparsity, - SPARSE_GLOBAL_BLOCK_INDICES, - SPARSE_GLOBAL_BLOCK_INDICES_DEFAULT) - global_block_end_indices = get_scalar_param( - sparsity, - SPARSE_GLOBAL_BLOCK_END_INDICES, - SPARSE_GLOBAL_BLOCK_END_INDICES_DEFAULT, - ) - attention = get_scalar_param(sparsity, - SPARSE_ATTENTION_TYPE, - SPARSE_ATTENTION_TYPE_DEFAULT) - horizontal_global_attention = get_scalar_param( - sparsity, - SPARSE_HORIZONTAL_GLOBAL_ATTENTION, - SPARSE_HORIZONTAL_GLOBAL_ATTENTION_DEFAULT, - ) - - return { - SPARSE_MODE: SPARSE_VARIABLE_MODE, - SPARSE_BLOCK: block, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD: different_layout_per_head, - SPARSE_NUM_RANDOM_BLOCKS: num_random_blocks, - SPARSE_LOCAL_WINDOW_BLOCKS: local_window_blocks, - SPARSE_GLOBAL_BLOCK_INDICES: global_block_indices, - SPARSE_GLOBAL_BLOCK_END_INDICES: global_block_end_indices, - SPARSE_ATTENTION_TYPE: attention, - SPARSE_HORIZONTAL_GLOBAL_ATTENTION: horizontal_global_attention, - } - - -def get_sparse_bigbird_config(sparsity): - block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) - different_layout_per_head = get_scalar_param( - sparsity, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD_DEFAULT, - ) - num_random_blocks = get_scalar_param(sparsity, - SPARSE_NUM_RANDOM_BLOCKS, - SPARSE_NUM_RANDOM_BLOCKS_DEFAULT) - num_sliding_window_blocks = get_scalar_param( - sparsity, - SPARSE_NUM_SLIDING_WINDOW_BLOCKS, - SPARSE_NUM_SLIDING_WINDOW_BLOCKS_DEFAULT, - ) - num_global_blocks = get_scalar_param(sparsity, - SPARSE_NUM_GLOBAL_BLOCKS, - SPARSE_NUM_GLOBAL_BLOCKS_DEFAULT) - - return { - SPARSE_MODE: SPARSE_BIGBIRD_MODE, - SPARSE_BLOCK: block, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD: different_layout_per_head, - SPARSE_NUM_RANDOM_BLOCKS: num_random_blocks, - SPARSE_NUM_SLIDING_WINDOW_BLOCKS: num_sliding_window_blocks, - SPARSE_NUM_GLOBAL_BLOCKS: num_global_blocks, - } - - -def get_sparse_bslongformer_config(sparsity): - block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) - different_layout_per_head = get_scalar_param( - sparsity, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD_DEFAULT, - ) - num_sliding_window_blocks = get_scalar_param( - sparsity, - SPARSE_NUM_SLIDING_WINDOW_BLOCKS, - SPARSE_NUM_SLIDING_WINDOW_BLOCKS_DEFAULT, - ) - global_block_indices = get_scalar_param(sparsity, - SPARSE_GLOBAL_BLOCK_INDICES, - SPARSE_GLOBAL_BLOCK_INDICES_DEFAULT) - global_block_end_indices = get_scalar_param( - sparsity, - SPARSE_GLOBAL_BLOCK_END_INDICES, - SPARSE_GLOBAL_BLOCK_END_INDICES_DEFAULT, - ) - - return { - SPARSE_MODE: SPARSE_BSLONGFORMER_MODE, - SPARSE_BLOCK: block, - SPARSE_DIFFERENT_LAYOUT_PER_HEAD: different_layout_per_head, - SPARSE_NUM_SLIDING_WINDOW_BLOCKS: num_sliding_window_blocks, - SPARSE_GLOBAL_BLOCK_INDICES: global_block_indices, - SPARSE_GLOBAL_BLOCK_END_INDICES: global_block_end_indices, - } - - -def get_sparse_attention_mode(param_dict): - if SPARSE_MODE in param_dict.keys(): - return param_dict[SPARSE_MODE] - else: - return SPARSE_MODE_DEFAULT - - -def get_sparse_attention_type(param_dict): - if SPARSE_ATTENTION_TYPE in param_dict.keys(): - return param_dict[SPARSE_ATTENTION_TYPE] - else: - return SPARSE_ATTENTION_TYPE_DEFAULT - - -def get_pipeline_config(param_dict): - """Parses pipeline engine configuration. """ - default_pipeline = { - "stages": "auto", - "partition": "best", - "seed_layers": False, - "activation_checkpoint_interval": 0, - } - config = default_pipeline - for key, val in param_dict.get("pipeline", {}).items(): - config[key] = val - return config - - -def get_optimizer_name(param_dict): - if OPTIMIZER in param_dict.keys() and TYPE in param_dict[OPTIMIZER].keys(): - return param_dict[OPTIMIZER][TYPE] - else: - return OPTIMIZER_TYPE_DEFAULT - - -def get_optimizer_params(param_dict): - if (get_optimizer_name(param_dict) is not None - and OPTIMIZER_PARAMS in param_dict[OPTIMIZER].keys()): - return param_dict[OPTIMIZER][OPTIMIZER_PARAMS] - else: - return None - - -def get_optimizer_gradient_clipping(param_dict): - optimizer_params = get_optimizer_params(param_dict) - if optimizer_params is not None and MAX_GRAD_NORM in optimizer_params.keys(): - return optimizer_params[MAX_GRAD_NORM] - else: - return None - - -def get_optimizer_legacy_fusion(param_dict): - if OPTIMIZER in param_dict.keys() and LEGACY_FUSION in param_dict[OPTIMIZER].keys(): - return param_dict[OPTIMIZER][LEGACY_FUSION] - else: - return LEGACY_FUSION_DEFAULT - - -def get_zero_allow_untested_optimizer(param_dict): - return get_scalar_param(param_dict, - ZERO_ALLOW_UNTESTED_OPTIMIZER, - ZERO_ALLOW_UNTESTED_OPTIMIZER_DEFAULT) - - -def get_scheduler_name(param_dict): - if SCHEDULER in param_dict.keys() and TYPE in param_dict[SCHEDULER].keys(): - return param_dict[SCHEDULER][TYPE] - else: - return SCHEDULER_TYPE_DEFAULT - - -def get_scheduler_params(param_dict): - if (get_scheduler_name(param_dict) is not None - and SCHEDULER_PARAMS in param_dict[SCHEDULER].keys()): - return param_dict[SCHEDULER][SCHEDULER_PARAMS] - else: - return None - - -def get_train_batch_size(param_dict): - return get_scalar_param(param_dict, TRAIN_BATCH_SIZE, TRAIN_BATCH_SIZE_DEFAULT) - - -def get_train_micro_batch_size_per_gpu(param_dict): - return get_scalar_param( - param_dict, - TRAIN_MICRO_BATCH_SIZE_PER_GPU, - TRAIN_MICRO_BATCH_SIZE_PER_GPU_DEFAULT, - ) - -def get_wall_clock_breakdown(param_dict): - return get_scalar_param(param_dict, - WALL_CLOCK_BREAKDOWN, - WALL_CLOCK_BREAKDOWN_DEFAULT) +class DeepSpeedOptimizerConfig(DeepSpeedConfigModel): + type: str = None + params: Dict[str, Any] = None + legacy_fusion: bool = False -def get_memory_breakdown(param_dict): - return get_scalar_param(param_dict, MEMORY_BREAKDOWN, MEMORY_BREAKDOWN_DEFAULT) +class DeepSpeedSchedulerConfig(DeepSpeedConfigModel): + type: str = None + params: Dict[str, Any] = None -def get_eigenvalue_config(param_dict): - if get_quantize_enabled(param_dict): - param_dict = param_dict[QUANTIZE_TRAINING] - assert not get_eigenvalue_enabled(param_dict), "Eigenvalue based MoQ is temporarily disabled" - return ( - get_eigenvalue_enabled(param_dict), - get_eigenvalue_verbose(param_dict), - get_eigenvalue_max_iter(param_dict), - get_eigenvalue_tol(param_dict), - get_eigenvalue_stability(param_dict), - get_eigenvalue_gas_boundary_resolution(param_dict), - get_eigenvalue_layer_name(param_dict), - get_eigenvalue_layer_num(param_dict), - ) - else: - return ( - EIGENVALUE_ENABLED_DEFAULT, - EIGENVALUE_VERBOSE_DEFAULT, - EIGENVALUE_MAX_ITER_DEFAULT, - EIGENVALUE_TOL_DEFAULT, - EIGENVALUE_STABILITY_DEFAULT, - EIGENVALUE_GAS_BOUNDARY_RESOLUTION_DEFAULT, - EIGENVALUE_LAYER_NAME_DEFAULT, - EIGENVALUE_LAYER_NUM_DEFAULT, - ) +class DeepSpeedEigenvalueConfig(DeepSpeedConfigModel): + enabled: bool = False + verbose: bool = False + max_iter: int = Field(100, ge=0) + tol: float = Field(1e-2, ge=0) + stability: float = Field(1e-6, ge=0) + gas_boundary_resolution: int = Field(1, ge=0) + layer_name: str = "bert.encoder.layer" + layer_num: int = Field(0, ge=0) -def get_eigenvalue_enabled(param_dict): - if EIGENVALUE in param_dict.keys(): - return get_scalar_param(param_dict[EIGENVALUE], - EIGENVALUE_ENABLED, - EIGENVALUE_ENABLED_DEFAULT) - else: - return EIGENVALUE_ENABLED_DEFAULT - - -def get_eigenvalue_verbose(param_dict): - if EIGENVALUE in param_dict.keys(): - return get_scalar_param(param_dict[EIGENVALUE], - EIGENVALUE_VERBOSE, - EIGENVALUE_VERBOSE_DEFAULT) - else: - return EIGENVALUE_VERBOSE_DEFAULT - - -def get_eigenvalue_max_iter(param_dict): - if EIGENVALUE in param_dict.keys(): - return get_scalar_param(param_dict[EIGENVALUE], - EIGENVALUE_MAX_ITER, - EIGENVALUE_MAX_ITER_DEFAULT) - else: - return EIGENVALUE_MAX_ITER_DEFAULT - - -def get_eigenvalue_tol(param_dict): - if EIGENVALUE in param_dict.keys(): - return get_scalar_param(param_dict[EIGENVALUE], - EIGENVALUE_TOL, - EIGENVALUE_TOL_DEFAULT) - else: - return EIGENVALUE_TOL_DEFAULT - - -def get_eigenvalue_stability(param_dict): - if EIGENVALUE in param_dict.keys(): - return get_scalar_param(param_dict[EIGENVALUE], - EIGENVALUE_STABILITY, - EIGENVALUE_STABILITY_DEFAULT) - else: - return EIGENVALUE_STABILITY_DEFAULT - - -def get_eigenvalue_gas_boundary_resolution(param_dict): - if EIGENVALUE in param_dict.keys(): - return get_scalar_param( - param_dict[EIGENVALUE], - EIGENVALUE_GAS_BOUNDARY_RESOLUTION, - EIGENVALUE_GAS_BOUNDARY_RESOLUTION_DEFAULT, - ) - else: - return EIGENVALUE_GAS_BOUNDARY_RESOLUTION_DEFAULT +class DeepSpeedPipelineConfig(DeepSpeedConfigModel): + stages: str = "auto" # TODO: convert to Enum class + partition: str = "best" # TODO: convert to Enum class + seed_layers: bool = False + activation_checkpoint_interval: int = Field(0, ge=0) -def get_eigenvalue_layer_name(param_dict): - if EIGENVALUE in param_dict.keys(): - return get_scalar_param(param_dict[EIGENVALUE], - EIGENVALUE_LAYER_NAME, - EIGENVALUE_LAYER_NAME_DEFAULT) - else: - return EIGENVALUE_LAYER_NAME_DEFAULT +class DeepSpeedPLDConfig(DeepSpeedConfigModel): + enabled: bool = False + theta: float = Field(1.0, ge=0) + gamma: float = Field(0.001, ge=0) + @property + def params(self): + if not self.enabled: # TODO: Check if this can be removed + return False + pld_params = self.dict() + pld_params.pop("enabled") + return pld_params -def get_eigenvalue_layer_num(param_dict): - if EIGENVALUE in param_dict.keys(): - return get_scalar_param(param_dict[EIGENVALUE], - EIGENVALUE_LAYER_NUM, - EIGENVALUE_LAYER_NUM_DEFAULT) - else: - return EIGENVALUE_LAYER_NUM_DEFAULT +class CheckpointValidationEnum(str, Enum): + IGNORE: "IGNORE" + WARN: "WARN" + FAIL: "FAIL" + + +class ParallelWriteConfig(DeepSpeedConfigModel): + pipeline_stage: bool = False + + +class DeepSpeedCheckpointConfig(DeepSpeedConfigModel): + tag_validation: CheckpointValidationEnum = "Warn" + load_universal: bool = False + use_node_local_storage: bool = False + parallel_write: ParallelWriteConfig = {} + + @validator("tag_validation", pre=True) + def upper_case_str(cls, field_value, values): + return field_values.upper() + + +class DeepSpeedDataTypesConfig(DeepSpeedConfigModel): + grad_accum_dtype: DtypeEnum = None + + +class DeepSpeedConfig(DeepSpeedConfigModel): + mpu: object = None + global_rank: int = Field(0, ge=0) + world_size: int = Field(1, ge=1) + + train_batch_size: int = Field(None, ge=1) + train_micro_batch_size_per_gpu: int = Field(None, ge=1) + gradient_accumulation_steps: int = Field(None, ge=1) + steps_per_print: int = Field(10, ge=1) + dump_state: bool = False + + disable_allgather: bool = False + communication_data_type: DtypeEnum = None + prescale_gradients: bool = False + gradient_predivide_factor: float = Field(1.0, ge=0) + sparse_gradients: bool = False + + zero_optimization: DeepSpeedZeroConfig = {} + activation_checkpointing: DeepSpeedActivationCheckpointingConfig = {} + comms_logger: DeepSpeedCommsConfig = {} + monitor_config: DeepSpeedMonitorConfig = Field( + {} + ) # TODO csv_monitor, wandb, tensorboard are values that need to be placed into this config + compression_training: DeepSpeedCompressionConfig = {} + flops_profiler: DeepSpeedFlopsProfilerConfig = {} + nebula: DeepSpeedNebulaConfig = {} + fp16: DeepSpeedFP16Config = {} + bf16: DeepSpeedBF16Config = Field( + {}, + alias="bfloat16") # Alias for backward compatibility + optimizer: DeepSpeedOptimizerConfig = {} + scheduler: DeepSpeedSchedulerConfig = {} + autotuning: DeepSpeedAutotuningConfig = {} + amp: DeepSpeedAMPConfig = {} + eigenvalue: DeepSpeedEigenvalueConfig = {} + pipeline: DeepSpeedPipelineConfig = {} + progressive_layer_drop: DeepSpeedPLDConfig = {} + curriculum_learning: DeepSpeedCurriculumLearningConfig = {} + data_efficiency: DeepSpeedDataEfficiencyConfig = {} + checkpoint: DeepSpeedCheckpointConfig = {} + data_types: DeepSpeedDataTypeConfig = {} + aio: DeepSpeedAIOConfig = {} + elasticity: DeepSpeedElasticityConfig = {} + + gradient_clipping: float = Field(0.0, ge=0) + zero_allow_untested_optimizer: bool = False + memory_breakdown: bool = False + sparse_attention: bool = False + wall_clock_breakdown: bool = False + dataloader_drop_last: bool = False + vocabulary_size: int = 1024 # TODO: verify if this value is even used + + # Theses are here for backward compatibility with any downstream + # applications that use the ds_config directly, but should be removed + # before/at v1.0 release + @property + def zero_config(self): + return self.zero_optimization + + @property + def activation_checkpointing_config(self): + return self.activation_checkpointing.dict() + + @property + def comms_config(self): + return self.comms_logger.dict() + + @property + def compression_config(self): + return self.compression_training.dict() + + @property + def flops_profiler_config(self): + return self.flops_profiler.dict() + + @property + def nebula_config(self): + return self.nebula.dict() + + @property + def autotuning_config(self): + return self.autotuning.dict() + + @property + def aio_config(self): + return self.aio.dict() + + @property + def zero_enabled(self): + return bool(self.zero.stage > 0) + + @property + def fp16_enabled(self): + return self.fp16.enabled + + @property + def fp16_auto_cast(self): + return self.fp16.autocast + + @property + def fp16_master_weights_and_gradients(self): + return self.fp16.master_weights_and_grads + + @property + def loss_scale(self): + return self.fp16.loss_scale + + @property + def initial_dynamic_scale(self): + if self.bf16_enabled: + return 0 + return 2**self.fp16.initial_scale_power + + @property + def dynamic_loss_scale_args(self): + if not self.fp16_enabled: + return None + loss_scale_args = { + INITIAL_LOSS_SCALE: 2**self.fp16.initial_scale_power, + SCALE_WINDOW: self.fp16.loss_scale_window, + DELAYED_SHIFT: self.fp16.hysteresis, + MIN_LOSS_SCALE: self.fp16.min_loss_scale, + } + return loss_scale_args + + @property + def optimizer_name(self): + opt_type = self.optimizer.type + if opt_type is None: + return opt_type + elif opt_type.lower() in DEEPSPEED_OPTIMIZER: + return opt_type.lower() + else: + return opt_type -def get_checkpoint_params(param_dict): - return param_dict.get(CHECKPOINT, {}) + @property + def optimizer_params(self): + return self.optimizer.params + @property + def optimizer_legacy_fusion(self): + return self.optimizer.legacy_fusion -def get_data_types_params(param_dict): - return param_dict.get(DATA_TYPES, {}) + @property + def scheduler_name(self): + return self.scheduler.type + @property + def scheduler_params(self): + return self.scheduler.params -def get_checkpoint_tag_validation_mode(checkpoint_params): - tag_validation_mode = checkpoint_params.get(CHECKPOINT_TAG_VALIDATION, - CHECKPOINT_TAG_VALIDATION_DEFAULT) - tag_validation_mode = tag_validation_mode.upper() - if tag_validation_mode in CHECKPOINT_TAG_VALIDATION_MODES: - return tag_validation_mode - else: - raise DeepSpeedConfigError( - "Checkpoint config contains invalid tag_validation " - f"value of {tag_validation_mode}, expecting one of {CHECKPOINT_TAG_VALIDATION_MODES}" - ) + @property + def wall_clock_breakdown(self): + return self.wall_clock_breakdown | self.flops_profiler.enabled + @property + def bfloat16_enabled(self): + return self.bf16.enabled -def get_checkpoint_parallel_write_pipeline(checkpoint_params): - par_write_params = checkpoint_params.get(CHECKPOINT_PARALLEL_WRITE, {}) - par_write_pipeline = par_write_params.get( - CHECKPOINT_PARALLEL_WRITE_PIPELINE_STAGE, - CHECKPOINT_PARALLEL_WRITE_PIPELINE_STAGE_DEFAULT) - if par_write_pipeline in [True, False]: - return par_write_pipeline - else: - raise DeepSpeedConfigError( - "checkpoint::parallel_write::pipeline_stage " - f"value of '{par_write_pipeline}' is invalid, expecting: true or false") + @property + def amp_enabled(self): + return self.amp.enabled + @property + def amp_params(self): + return self.amp.params -def get_dataloader_drop_last(param_dict): - return get_scalar_param(param_dict, - DATALOADER_DROP_LAST, - DATALOADER_DROP_LAST_DEFAULT) + @property + def eigenvalue_enabled(self): + return self.eigenvalue.enabled + @property + def eigenvalue_verbose(self): + return self.eigenvalue.verbose -'''Write deepspeed config files by modifying basic templates. -Can be used for quickly changing parameters via command line parameters.''' + @property + def eigenvalue_max_iter(self): + return self.eigenvalue.max_iter + @property + def eigenvalue_tol(self): + return self.eigenvalue.tol -class DeepSpeedConfigWriter: - def __init__(self, data=None): - self.data = data if data is not None else {} + @property + def eigenvalue_stability(self): + return self.eigenvalue.stability - def add_config(self, key, value): - self.data[key] = value + @property + def eigenvalue_gas_boundary_resolution(self): + return self.eigenvalue.gas_boundary_resolution - def load_config(self, filename): - self.data = json.load(open(filename, - "r"), - object_pairs_hook=dict_raise_error_on_duplicate_keys) + @property + def eigenvalue_layer_name(self): + return self.eigenvalue.layer_name - def write_config(self, filename): - with open(filename, "w") as outfile: - json.dump(self.data, outfile) + @property + def eigenvalue_layer_num(self): + return self.eigenvalue.layer_num + @property + def pld_enabled(self): + return self.progressive_layer_drop.enabled -class DeepSpeedConfig(object): - def __init__(self, config: Union[str, dict], mpu=None): - super(DeepSpeedConfig, self).__init__() - if isinstance(config, dict): - self._param_dict = config - elif os.path.exists(config): - self._param_dict = json.load( - open(config, - "r"), - object_pairs_hook=dict_raise_error_on_duplicate_keys) - else: - try: - config_decoded = base64.urlsafe_b64decode(config).decode('utf-8') - self._param_dict = json.loads(config_decoded) - except (UnicodeDecodeError, AttributeError): - raise ValueError( - f"Expected a string path to an existing deepspeed config, or a dictionary or a valid base64. Received: {config}" - ) - try: - self.global_rank = dist.get_rank() - if mpu is None: - self.world_size = dist.get_world_size() - else: - self.world_size = mpu.get_data_parallel_world_size() - except: - self.global_rank = 0 - self.world_size = 1 - - # If elastic-mode enabled, update compute + update _param_dict - self.elasticity_enabled = elasticity_enabled(self._param_dict) - if self.elasticity_enabled: - logger.info("DeepSpeed elasticity support enabled") - final_batch_size, valid_gpus, micro_batch_size = compute_elastic_config( - ds_config=self._param_dict, - target_deepspeed_version=__version__, - world_size=self.world_size, - ) + @property + def pld_params(self): + return self.progressive_layer_drop.params - elastic_dict = self._param_dict[ELASTICITY] - - # Ensure the resource scheduler saw the same elastic config we are using at runtime - ensure_immutable_elastic_config(runtime_elastic_config_dict=elastic_dict) - - self.elastic_model_parallel_size = elastic_dict.get( - MODEL_PARLLEL_SIZE, - MODEL_PARLLEL_SIZE_DEFAULT) - if self.elastic_model_parallel_size < 1: - raise ElasticityConfigError( - "Model-Parallel size cannot be less than 1, " - f"given model-parallel size: {self.elastic_model_parallel_size}") - - self.num_gpus_per_node = elastic_dict.get(NUM_GPUS_PER_NODE, - NUM_GPUS_PER_NODE_DEFAULT) - if self.num_gpus_per_node < 1: - raise ElasticityConfigError( - "NUmber of GPUs per node cannot be less than 1, " - f"given number of GPUs per node: {self.num_gpus_per_node}") - - ignore_non_elastic_batch_info = elastic_dict.get( - IGNORE_NON_ELASTIC_BATCH_INFO, - IGNORE_NON_ELASTIC_BATCH_INFO_DEFAULT) - - if not ignore_non_elastic_batch_info: - batch_params = [ - TRAIN_BATCH_SIZE, - TRAIN_MICRO_BATCH_SIZE_PER_GPU, - GRADIENT_ACCUMULATION_STEPS, - ] - if any(map(lambda t: t in self._param_dict, batch_params)): - raise ElasticityConfigError("One or more batch related parameters were found in your " \ - f"ds_config ({TRAIN_BATCH_SIZE}, {TRAIN_MICRO_BATCH_SIZE_PER_GPU}, and/or " \ - f"{GRADIENT_ACCUMULATION_STEPS}). These parameters *will not be used* since " \ - "elastic training is enabled, which takes control of these parameters. " \ - "If you want to suppress this error (the parameters will be silently ignored) " \ - f"please set {IGNORE_NON_ELASTIC_BATCH_INFO}':true in your elasticity config.") - - # micro_bsz * world_size * gas = total_batch_size - # gas = total_batch_size // (micro_bsz * world_size) - gradient_accu_steps = final_batch_size // (micro_batch_size * - self.world_size) - - if TRAIN_BATCH_SIZE in self._param_dict: - logger.warning( - "[Elasticity] overriding training_batch_size: " - f"{self._param_dict[TRAIN_BATCH_SIZE]} -> {final_batch_size}") - if TRAIN_MICRO_BATCH_SIZE_PER_GPU in self._param_dict: - logger.warning( - "[Elasticity] overriding train_micro_batch_size_per_gpu: " - f"{self._param_dict[TRAIN_MICRO_BATCH_SIZE_PER_GPU]} -> {micro_batch_size}" - ) - if GRADIENT_ACCUMULATION_STEPS in self._param_dict: - logger.warning( - "[Elasticity] overriding gradient_accumulation_steps: " - f"{self._param_dict[GRADIENT_ACCUMULATION_STEPS]} -> {gradient_accu_steps}" - ) - - logger.info(f"[Elasticity] valid GPU counts: {valid_gpus}") - - self._param_dict[TRAIN_BATCH_SIZE] = final_batch_size - self._param_dict[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = micro_batch_size - self._param_dict[GRADIENT_ACCUMULATION_STEPS] = gradient_accu_steps - - # Pass a copy so that user json is unmodified, e.g. for logging - self._initialize_params(copy.copy(self._param_dict)) - self._configure_train_batch_size() - self._do_sanity_check() - - def _initialize_params(self, param_dict): - self.train_batch_size = get_train_batch_size(param_dict) - #print(f"beginning get_train_batch_size = {get_train_batch_size}") - self.train_micro_batch_size_per_gpu = get_train_micro_batch_size_per_gpu( - param_dict) - self.gradient_accumulation_steps = get_gradient_accumulation_steps(param_dict) - self.steps_per_print = get_steps_per_print(param_dict) - self.dump_state = get_dump_state(param_dict) - - self.disable_allgather = get_disable_allgather(param_dict) - self.communication_data_type = get_communication_data_type(param_dict) - self.prescale_gradients = get_prescale_gradients(param_dict) - self.gradient_predivide_factor = get_gradient_predivide_factor(param_dict) - self.sparse_gradients_enabled = get_sparse_gradients_enabled(param_dict) - - self.zero_config = get_zero_config(param_dict) - self.zero_optimization_stage = self.zero_config.stage - self.zero_enabled = self.zero_optimization_stage > 0 - - self.activation_checkpointing_config = DeepSpeedActivationCheckpointingConfig( - param_dict) - - self.comms_config = DeepSpeedCommsConfig(param_dict) - self.monitor_config = DeepSpeedMonitorConfig(param_dict) - - self.gradient_clipping = get_gradient_clipping(param_dict) - self.fp16_enabled = get_fp16_enabled(param_dict) - self.fp16_auto_cast = get_fp16_auto_cast(param_dict) - self.bfloat16_enabled = get_bfloat16_enabled(param_dict) - assert not (self.fp16_enabled and self.bfloat16_enabled), 'bfloat16 and fp16 modes cannot be simultaneously enabled' - self.fp16_master_weights_and_gradients = get_fp16_master_weights_and_grads_enabled( - param_dict) - self.amp_enabled = get_amp_enabled(param_dict) - self.amp_params = get_amp_params(param_dict) - self.loss_scale = get_loss_scale(param_dict) - self.initial_dynamic_scale = get_initial_dynamic_scale(param_dict) - self.dynamic_loss_scale_args = get_dynamic_loss_scale_args(param_dict) - - self.compression_config = get_compression_config(param_dict) - - self.optimizer_name = get_optimizer_name(param_dict) - if (self.optimizer_name is not None - and self.optimizer_name.lower() in DEEPSPEED_OPTIMIZERS): - self.optimizer_name = self.optimizer_name.lower() - - self.optimizer_params = get_optimizer_params(param_dict) - self.optimizer_legacy_fusion = get_optimizer_legacy_fusion(param_dict) - - self.zero_allow_untested_optimizer = get_zero_allow_untested_optimizer( - param_dict) - - self.scheduler_name = get_scheduler_name(param_dict) - self.scheduler_params = get_scheduler_params(param_dict) - - self.flops_profiler_config = DeepSpeedFlopsProfilerConfig(param_dict) - self.wall_clock_breakdown = (get_wall_clock_breakdown(param_dict) - | self.flops_profiler_config.enabled) - self.memory_breakdown = get_memory_breakdown(param_dict) - self.autotuning_config = DeepSpeedAutotuningConfig(param_dict) - - ( - self.eigenvalue_enabled, - self.eigenvalue_verbose, - self.eigenvalue_max_iter, - self.eigenvalue_tol, - self.eigenvalue_stability, - self.eigenvalue_gas_boundary_resolution, - self.eigenvalue_layer_name, - self.eigenvalue_layer_num, - ) = get_eigenvalue_config(param_dict) - - self.sparse_attention = get_sparse_attention(param_dict) - self.pipeline = get_pipeline_config(param_dict) - - self.pld_enabled = get_pld_enabled(param_dict) - self.pld_params = get_pld_params(param_dict) - - self.curriculum_enabled_legacy = get_curriculum_enabled_legacy(param_dict) - self.curriculum_params_legacy = get_curriculum_params_legacy(param_dict) - - self.data_efficiency_enabled = get_data_efficiency_enabled(param_dict) - self.data_efficiency_config = get_data_efficiency_config(param_dict) - - checkpoint_params = get_checkpoint_params(param_dict) - validation_mode = get_checkpoint_tag_validation_mode(checkpoint_params) - self.checkpoint_tag_validation_enabled = (validation_mode != - ValidationMode.IGNORE) - self.checkpoint_tag_validation_fail = validation_mode == ValidationMode.FAIL - self.load_universal_checkpoint = checkpoint_params.get( - LOAD_UNIVERSAL_CHECKPOINT, - LOAD_UNIVERSAL_CHECKPOINT_DEFAULT) - - self.use_node_local_storage = checkpoint_params.get( - USE_NODE_LOCAL_STORAGE_CHECKPOINT, - USE_NODE_LOCAL_STORAGE_CHECKPOINT_DEFAULT) - - data_types_params = get_data_types_params(param_dict) - self.grad_accum_dtype = data_types_params.get(GRAD_ACCUM_DTYPE, - GRAD_ACCUM_DTYPE_DEFAULT) - - par_write_pipe = get_checkpoint_parallel_write_pipeline(checkpoint_params) - self.checkpoint_parallel_write_pipeline = par_write_pipe - - self.aio_config = get_aio_config(param_dict) - - self.dataloader_drop_last = get_dataloader_drop_last(param_dict) - - self.nebula_config = DeepSpeedNebulaConfig(param_dict) + @property + def curriculum_enabled_legacy(self): + return self.curriculum_learning.enabled - def _batch_assertion(self): - - train_batch = self.train_batch_size - micro_batch = self.train_micro_batch_size_per_gpu - grad_acc = self.gradient_accumulation_steps + @property + def curriculum_params_legacy(self): + return self.curriculum_learning.params - assert ( - train_batch > 0 - ), f"Train batch size: {train_batch} has to be greater than 0" + @property + def data_efficiency_enabled(self): + return self.data_efficiency.enabled - assert ( - micro_batch > 0 - ), f"Micro batch size per gpu: {micro_batch} has to be greater than 0" + @property + def data_efficiency_config(self): + return self.data_efficiency.config - assert ( - grad_acc > 0 - ), f"Gradient accumulation steps: {grad_acc} has to be greater than 0" + @property + def checkpoint_tag_validation_enabled(self): + return self.checkpoint.tag_validation != CheckpointValidationEnum.IGNORE - assert train_batch == micro_batch * grad_acc * self.world_size, ( - f"Check batch related parameters. train_batch_size is not equal " - "to micro_batch_per_gpu * gradient_acc_step * world_size " - f"{train_batch} != {micro_batch} * {grad_acc} * {self.world_size}" - ) + @property + def checkpoint_tag_validation_enabled(self): + return self.checkpoint.tag_validation == CheckpointValidationEnum.FAIL + + @property + def load_universal_checkpoint(self): + return self.checkpoint.load_universal + + @property + def use_node_local_storage(self): + return self.checkpoint.use_node_local_storage + + @property + def grad_accum_dtype(self): + return self.data_types.grad_accum_dtype + + @property + def checkpoint_parallel_write_pipeline(self): + return self.checkpoint.parallel_write.pipeline_stage + + @property + def elasticity_enabled(self): + return self.elasticity.enabled - def _set_batch_related_parameters(self): + # Validation functions + @validator("global_rank") + def get_global_rank(cls, field_value, values): + try: + field_value = dist.get_rank() + except: + pass + return field_value - train_batch = self.train_batch_size - micro_batch = self.train_micro_batch_size_per_gpu - grad_acc = self.gradient_accumulation_steps + @validator("world_size") + def get_world_size(cls, field_value, values): + try: + if values.get("mpu") != None: + field_value = mpu.get_data_parallel_world_size() + else: + field_value = dist.get_world_size() + except: + pass + return field_value - #print(f"train_batch = {train_batch}, micro_batch={micro_batch}") + @validator("vocabulary_size") + def check_vocabulary_size(cls, field_value, values): + # TODO: verify this is still used + if field_value % TENSOR_CORE_ALIGN_SIZE != 0: + logger.warning( + f"Vocabulary size {field_value} is not aligned to {TENSOR_CORE_ALIGN_SIZE}, may import tensor core utilization." + ) + return field_value + + @root_validator + def check_optimizer_params(cls, values): + opt_params = values.get("optimizer").params + if opt_params.get("max_grad_norm", 0) > 0: + if values.get("global_rank") == 0: + if values.get("fp16").enabled: + logger.warning( + f"In FP16 mode, DeepSpeed will pass {MAX_GRAD_NORM}:{opt_params.get('max_grad_norm')} to FP16 wrapper" + ) + else: + logger.warning( + f"In FP32 mode, DeepSpeed does not permit max_grad_norm ({opt_params.get('max_grad_norm')}) > 0, setting to zero" + ) + values["optimizer"].params["max_grad_norm"] = 0 + return values + + """ Root Validators """ + # Note that these are executed in order of definition + @root_validator + def _exclusive_fp16_bf16(cls, values): + assert not ( + values.get("bf16").enabled and values.get("fp16").enabled + ), "bf16 and fp16 modes cannot be simultaneously enabled" + return values + + @root_validator + def _check_fp16_and_zero_configs(cls, values): + if values.get("fp16").master_weights_and_grads: + assert ( + self.zero.enabled and self.zero.stage == ZeroStageEnum.gradients + ), "fp16.master_weights_and_grads is only supported with ZeRO Stage 2 for now." + return values + + @root_validator + def _set_batch_related_parameters(cls, values): + train_batch = values.get("train_batch_size") + micro_batch = values.get("train_micro_batch_size_per_gpu") + grad_acc = values.get("gradient_accumulation_steps") + world_size = values.get("world_size") # all values are provided nothing needs to be set if train_batch is not None and micro_batch is not None and grad_acc is not None: - return + pass # global_accumulation_steps needs to be set elif train_batch is not None and micro_batch is not None: grad_acc = train_batch // micro_batch - grad_acc //= self.world_size - self.gradient_accumulation_steps = grad_acc + grad_acc //= world_size + values["gradient_accumulation_steps"] = grad_acc # micro_batch_per_gpu needs to be set elif train_batch is not None and grad_acc is not None: - micro_batch = train_batch // self.world_size + micro_batch = train_batch // world_size micro_batch //= grad_acc - self.train_micro_batch_size_per_gpu = micro_batch + values["train_micro_batch_size_per_gpu"] = micro_batch # train_batch_size needs to be set elif micro_batch is not None and grad_acc is not None: train_batch_size = micro_batch * grad_acc - train_batch_size *= self.world_size - self.train_batch_size = train_batch_size + train_batch_size *= world_size + values["train_batch_size"] = train_batch_size # gradient_accumulation_steps and micro_batch_per_gpus is set elif train_batch is not None: - self.gradient_accumulation_steps = 1 - self.train_micro_batch_size_per_gpu = train_batch // self.world_size + values["gradient_accumulation_steps"] = 1 + values["train_micro_batch_size_per_gpu"] = train_batch // world_size # train_batch_size and gradient_accumulation_step is set elif micro_batch is not None: - self.train_batch_size = micro_batch * self.world_size - self.gradient_accumulation_steps = 1 + values["train_batch_size"] = micro_batch * world_size + values["gradient_accumulation_steps"] = 1 # either none of the three parameters are provided or just gradient_accumulation_step is provided else: - assert False, \ - 'Either train_batch_size or train_micro_batch_size_per_gpu needs to be provided' - - def _configure_train_batch_size(self): - self._set_batch_related_parameters() - self._batch_assertion() - - def _do_sanity_check(self): - self._do_error_check() - - self._do_warning_check() - - def print_user_config(self): - logger.info(" json = {}".format( - json.dumps( - self._param_dict, - sort_keys=True, - indent=4, - cls=ScientificNotationEncoder, - separators=(",", - ":"), - ))) - - def print(self, name): - logger.info("{}:".format(name)) - for arg in sorted(vars(self)): - if arg != "_param_dict": - dots = "." * (29 - len(arg)) - logger.info(" {} {} {}".format(arg, dots, getattr(self, arg))) - - self.print_user_config() - - def _do_error_check(self): - assert ( - self.train_micro_batch_size_per_gpu - ), "DeepSpeedConfig: {} is not defined".format(TRAIN_MICRO_BATCH_SIZE_PER_GPU) + assert ( + False + ), "Either train_batch_size or train_micro_batch_size_per_gpu needs to be provided" - assert ( - self.gradient_accumulation_steps - ), "DeepSpeedConfig: {} is not defined".format(GRADIENT_ACCUMULATION_STEPS) + return values - if self.zero_enabled: - assert ( - self.zero_optimization_stage <= ZeroStageEnum.max_stage - ), "DeepSpeedConfig: Maximum supported ZeRO stage is {}".format( - ZeroStageEnum.max_stage - ) + # This validator must go after _set_batch_related_parameters + @root_validator + def _batch_assertion(cls, values): + train_batch = values.get("train_batch_size") + micro_batch = values.get("train_micro_batch_size_per_gpu") + grad_acc = values.get("gradient_accumulation_steps") + world_size = values.get("world_size") - if self.fp16_master_weights_and_gradients: - assert self.zero_enabled and self.zero_optimization_stage == ZeroStageEnum.gradients, "Fp16_master_weights_and_grads is only supported with ZeRO Stage 2 for now." + assert ( + train_batch > 0 + ), f"Train batch size: {train_batch} has to be greater than 0" - def _do_warning_check(self): - fp16_enabled = self.fp16_enabled + assert ( + micro_batch > 0 + ), f"Micro batch size per gpu: {micro_batch} has to be greater than 0" - vocabulary_size = self._param_dict.get(VOCABULARY_SIZE, VOCABULARY_SIZE_DEFAULT) - if vocabulary_size and vocabulary_size % TENSOR_CORE_ALIGN_SIZE != 0: - logger.warning( - "DeepSpeedConfig: vocabulary size {} is not aligned to {}, may import tensor core utilization." - .format(vocabulary_size, - TENSOR_CORE_ALIGN_SIZE)) - - if (self.optimizer_params is not None - and MAX_GRAD_NORM in self.optimizer_params.keys() - and self.optimizer_params[MAX_GRAD_NORM] > 0): - if fp16_enabled: - if self.global_rank == 0: - logger.warning( - "DeepSpeedConfig: In FP16 mode, DeepSpeed will pass {}:{} to FP16 wrapper" - .format(MAX_GRAD_NORM, - self.optimizer_params[MAX_GRAD_NORM])) - else: - if self.global_rank == 0: - logger.warning( - "DeepSpeedConfig: In FP32 mode, DeepSpeed does not permit MAX_GRAD_NORM ({}) > 0, setting to zero" - .format(self.optimizer_params[MAX_GRAD_NORM])) - self.optimizer_params[MAX_GRAD_NORM] = 0.0 + assert ( + grad_acc > 0 + ), f"Gradient accumulation steps: {grad_acc} has to be greater than 0" + + assert train_batch == micro_batch * grad_acc * world_size, ( + f"Check batch related parameters. train_batch_size is not equal " + "to micro_batch_per_gpu * gradient_acc_step * world_size " + f"{train_batch} != {micro_batch} * {grad_acc} * {world_size}" + ) + return values diff --git a/deepspeed/runtime/config.py.bak b/deepspeed/runtime/config.py.bak new file mode 100755 index 000000000000..5a28497df72d --- /dev/null +++ b/deepspeed/runtime/config.py.bak @@ -0,0 +1,476 @@ +""" +Copyright (c) Microsoft Corporation +Licensed under the MIT license. +""" +import os +from typing import Union + +import torch +import json +import copy +import base64 + +from .constants import * +from .fp16.loss_scaler import ( + INITIAL_LOSS_SCALE, + SCALE_WINDOW, + DELAYED_SHIFT, + MIN_LOSS_SCALE, +) +from .config_utils import ( + get_scalar_param, + dict_raise_error_on_duplicate_keys, + ScientificNotationEncoder, +) +from .zero.config import get_zero_config, ZeroStageEnum +from .activation_checkpointing.config import DeepSpeedActivationCheckpointingConfig +from ..comm.config import DeepSpeedCommsConfig +from ..monitor.config import DeepSpeedMonitorConfig + +from deepspeed import comm as dist + +from ..git_version_info import version as __version__ +from ..utils import logger + +from ..elasticity import ( + elasticity_enabled, + compute_elastic_config, + ensure_immutable_elastic_config, +) +from ..elasticity.config import ElasticityConfigError +from ..elasticity.constants import ( + ELASTICITY, + IGNORE_NON_ELASTIC_BATCH_INFO, + IGNORE_NON_ELASTIC_BATCH_INFO_DEFAULT, + MODEL_PARLLEL_SIZE, + MODEL_PARLLEL_SIZE_DEFAULT, + NUM_GPUS_PER_NODE, + NUM_GPUS_PER_NODE_DEFAULT, +) + +from ..profiling.config import DeepSpeedFlopsProfilerConfig +from ..autotuning.config import DeepSpeedAutotuningConfig +from ..nebula.config import DeepSpeedNebulaConfig + +from ..compression.config import get_compression_config, get_quantize_enabled +from ..compression.constants import * +from .swap_tensor.aio_config import get_aio_config + +from .data_pipeline.config import get_data_efficiency_enabled, get_data_efficiency_config, get_curriculum_enabled_legacy, get_curriculum_params_legacy +from .data_pipeline.constants import * + +TENSOR_CORE_ALIGN_SIZE = 8 + +ADAGRAD_OPTIMIZER = 'adagrad' +ADAM_OPTIMIZER = 'adam' +ADAMW_OPTIMIZER = 'adamw' +LAMB_OPTIMIZER = 'lamb' +ONEBIT_ADAM_OPTIMIZER = 'onebitadam' +ZERO_ONE_ADAM_OPTIMIZER = 'zerooneadam' +ONEBIT_LAMB_OPTIMIZER = 'onebitlamb' +DEEPSPEED_OPTIMIZERS = [ + ADAGRAD_OPTIMIZER, + ADAM_OPTIMIZER, + ADAMW_OPTIMIZER, + LAMB_OPTIMIZER, + ONEBIT_ADAM_OPTIMIZER, + ONEBIT_LAMB_OPTIMIZER, + ZERO_ONE_ADAM_OPTIMIZER +] + +# extra optimizer parameters for adam/adamw +TORCH_ADAM_PARAM = "torch_adam" + +# default to adamw logic for adam/adamw optimizers unless user explicitly opts out +ADAM_W_MODE = "adam_w_mode" +ADAM_W_MODE_DEFAULT = True + + +class DeepSpeedConfigError(Exception): + pass + + + +def get_sparse_gradients_enabled(param_dict): + return get_scalar_param(param_dict, SPARSE_GRADIENTS, SPARSE_GRADIENTS_DEFAULT) + + +def get_communication_data_type(param_dict): + val = get_scalar_param(param_dict, + COMMUNICATION_DATA_TYPE, + COMMUNICATION_DATA_TYPE_DEFAULT) + val = val.lower() if val is not None else val + if val is None: + return val # we must determine it by other parameters + elif val == "fp32": + return torch.float32 + elif val == "fp16": + return torch.float16 + elif val == "bfp16": + return torch.bfloat16 + + raise ValueError( + f"Invalid communication_data_type. Supported data types: ['fp16', 'bfp16', 'fp32']. Got: {val}" + ) + + +def get_prescale_gradients(param_dict): + return get_scalar_param(param_dict, PRESCALE_GRADIENTS, PRESCALE_GRADIENTS_DEFAULT) + + +def get_gradient_predivide_factor(param_dict): + return get_scalar_param(param_dict, + GRADIENT_PREDIVIDE_FACTOR, + GRADIENT_PREDIVIDE_FACTOR_DEFAULT) + + +def get_steps_per_print(param_dict): + return get_scalar_param(param_dict, STEPS_PER_PRINT, STEPS_PER_PRINT_DEFAULT) + + +def get_disable_allgather(param_dict): + return get_scalar_param(param_dict, DISABLE_ALLGATHER, DISABLE_ALLGATHER_DEFAULT) + + +def get_dump_state(param_dict): + return get_scalar_param(param_dict, DUMP_STATE, DUMP_STATE_DEFAULT) + + +def get_gradient_clipping(param_dict): + return get_scalar_param(param_dict, GRADIENT_CLIPPING, GRADIENT_CLIPPING_DEFAULT) + + +def get_sparse_attention(param_dict): + if SPARSE_ATTENTION in param_dict.keys(): + sparsity = param_dict[SPARSE_ATTENTION] + mode = get_sparse_attention_mode(sparsity) + + if mode == SPARSE_DENSE_MODE: + return get_sparse_dense_config(sparsity) + elif mode == SPARSE_FIXED_MODE: + return get_sparse_fixed_config(sparsity) + elif mode == SPARSE_VARIABLE_MODE: + return get_sparse_variable_config(sparsity) + elif mode == SPARSE_BIGBIRD_MODE: + return get_sparse_bigbird_config(sparsity) + elif mode == SPARSE_BSLONGFORMER_MODE: + return get_sparse_bslongformer_config(sparsity) + else: + raise NotImplementedError( + f"Given sparsity mode, {mode}, has not been implemented yet!") + + else: + return None + + +def get_sparse_dense_config(sparsity): + block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) + return {SPARSE_MODE: SPARSE_DENSE_MODE, SPARSE_BLOCK: block} + + +def get_sparse_fixed_config(sparsity): + block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) + different_layout_per_head = get_scalar_param( + sparsity, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD_DEFAULT, + ) + num_local_blocks = get_scalar_param(sparsity, + SPARSE_NUM_LOCAL_BLOCKS, + SPARSE_NUM_LOCAL_BLOCKS_DEFAULT) + num_global_blocks = get_scalar_param(sparsity, + SPARSE_NUM_GLOBAL_BLOCKS, + SPARSE_NUM_GLOBAL_BLOCKS_DEFAULT) + attention = get_scalar_param(sparsity, + SPARSE_ATTENTION_TYPE, + SPARSE_ATTENTION_TYPE_DEFAULT) + horizontal_global_attention = get_scalar_param( + sparsity, + SPARSE_HORIZONTAL_GLOBAL_ATTENTION, + SPARSE_HORIZONTAL_GLOBAL_ATTENTION_DEFAULT, + ) + num_different_global_patterns = get_scalar_param( + sparsity, + SPARSE_NUM_DIFFERENT_GLOBAL_PATTERNS, + SPARSE_NUM_DIFFERENT_GLOBAL_PATTERNS_DEFAULT, + ) + + return { + SPARSE_MODE: SPARSE_FIXED_MODE, + SPARSE_BLOCK: block, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD: different_layout_per_head, + SPARSE_NUM_LOCAL_BLOCKS: num_local_blocks, + SPARSE_NUM_GLOBAL_BLOCKS: num_global_blocks, + SPARSE_ATTENTION_TYPE: attention, + SPARSE_HORIZONTAL_GLOBAL_ATTENTION: horizontal_global_attention, + SPARSE_NUM_DIFFERENT_GLOBAL_PATTERNS: num_different_global_patterns, + } + + +def get_sparse_variable_config(sparsity): + block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) + different_layout_per_head = get_scalar_param( + sparsity, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD_DEFAULT, + ) + num_random_blocks = get_scalar_param(sparsity, + SPARSE_NUM_RANDOM_BLOCKS, + SPARSE_NUM_RANDOM_BLOCKS_DEFAULT) + local_window_blocks = get_scalar_param(sparsity, + SPARSE_LOCAL_WINDOW_BLOCKS, + SPARSE_LOCAL_WINDOW_BLOCKS_DEFAULT) + global_block_indices = get_scalar_param(sparsity, + SPARSE_GLOBAL_BLOCK_INDICES, + SPARSE_GLOBAL_BLOCK_INDICES_DEFAULT) + global_block_end_indices = get_scalar_param( + sparsity, + SPARSE_GLOBAL_BLOCK_END_INDICES, + SPARSE_GLOBAL_BLOCK_END_INDICES_DEFAULT, + ) + attention = get_scalar_param(sparsity, + SPARSE_ATTENTION_TYPE, + SPARSE_ATTENTION_TYPE_DEFAULT) + horizontal_global_attention = get_scalar_param( + sparsity, + SPARSE_HORIZONTAL_GLOBAL_ATTENTION, + SPARSE_HORIZONTAL_GLOBAL_ATTENTION_DEFAULT, + ) + + return { + SPARSE_MODE: SPARSE_VARIABLE_MODE, + SPARSE_BLOCK: block, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD: different_layout_per_head, + SPARSE_NUM_RANDOM_BLOCKS: num_random_blocks, + SPARSE_LOCAL_WINDOW_BLOCKS: local_window_blocks, + SPARSE_GLOBAL_BLOCK_INDICES: global_block_indices, + SPARSE_GLOBAL_BLOCK_END_INDICES: global_block_end_indices, + SPARSE_ATTENTION_TYPE: attention, + SPARSE_HORIZONTAL_GLOBAL_ATTENTION: horizontal_global_attention, + } + + +def get_sparse_bigbird_config(sparsity): + block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) + different_layout_per_head = get_scalar_param( + sparsity, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD_DEFAULT, + ) + num_random_blocks = get_scalar_param(sparsity, + SPARSE_NUM_RANDOM_BLOCKS, + SPARSE_NUM_RANDOM_BLOCKS_DEFAULT) + num_sliding_window_blocks = get_scalar_param( + sparsity, + SPARSE_NUM_SLIDING_WINDOW_BLOCKS, + SPARSE_NUM_SLIDING_WINDOW_BLOCKS_DEFAULT, + ) + num_global_blocks = get_scalar_param(sparsity, + SPARSE_NUM_GLOBAL_BLOCKS, + SPARSE_NUM_GLOBAL_BLOCKS_DEFAULT) + + return { + SPARSE_MODE: SPARSE_BIGBIRD_MODE, + SPARSE_BLOCK: block, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD: different_layout_per_head, + SPARSE_NUM_RANDOM_BLOCKS: num_random_blocks, + SPARSE_NUM_SLIDING_WINDOW_BLOCKS: num_sliding_window_blocks, + SPARSE_NUM_GLOBAL_BLOCKS: num_global_blocks, + } + + +def get_sparse_bslongformer_config(sparsity): + block = get_scalar_param(sparsity, SPARSE_BLOCK, SPARSE_BLOCK_DEFAULT) + different_layout_per_head = get_scalar_param( + sparsity, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD_DEFAULT, + ) + num_sliding_window_blocks = get_scalar_param( + sparsity, + SPARSE_NUM_SLIDING_WINDOW_BLOCKS, + SPARSE_NUM_SLIDING_WINDOW_BLOCKS_DEFAULT, + ) + global_block_indices = get_scalar_param(sparsity, + SPARSE_GLOBAL_BLOCK_INDICES, + SPARSE_GLOBAL_BLOCK_INDICES_DEFAULT) + global_block_end_indices = get_scalar_param( + sparsity, + SPARSE_GLOBAL_BLOCK_END_INDICES, + SPARSE_GLOBAL_BLOCK_END_INDICES_DEFAULT, + ) + + return { + SPARSE_MODE: SPARSE_BSLONGFORMER_MODE, + SPARSE_BLOCK: block, + SPARSE_DIFFERENT_LAYOUT_PER_HEAD: different_layout_per_head, + SPARSE_NUM_SLIDING_WINDOW_BLOCKS: num_sliding_window_blocks, + SPARSE_GLOBAL_BLOCK_INDICES: global_block_indices, + SPARSE_GLOBAL_BLOCK_END_INDICES: global_block_end_indices, + } + + +def get_sparse_attention_mode(param_dict): + if SPARSE_MODE in param_dict.keys(): + return param_dict[SPARSE_MODE] + else: + return SPARSE_MODE_DEFAULT + + +def get_sparse_attention_type(param_dict): + if SPARSE_ATTENTION_TYPE in param_dict.keys(): + return param_dict[SPARSE_ATTENTION_TYPE] + else: + return SPARSE_ATTENTION_TYPE_DEFAULT + + +def get_optimizer_gradient_clipping(param_dict): + optimizer_params = get_optimizer_params(param_dict) + if optimizer_params is not None and MAX_GRAD_NORM in optimizer_params.keys(): + return optimizer_params[MAX_GRAD_NORM] + else: + return None + + +def get_zero_allow_untested_optimizer(param_dict): + return get_scalar_param(param_dict, + ZERO_ALLOW_UNTESTED_OPTIMIZER, + ZERO_ALLOW_UNTESTED_OPTIMIZER_DEFAULT) + + + + +def get_train_batch_size(param_dict): + return get_scalar_param(param_dict, TRAIN_BATCH_SIZE, TRAIN_BATCH_SIZE_DEFAULT) + + +def get_train_micro_batch_size_per_gpu(param_dict): + return get_scalar_param( + param_dict, + TRAIN_MICRO_BATCH_SIZE_PER_GPU, + TRAIN_MICRO_BATCH_SIZE_PER_GPU_DEFAULT, + ) + + +def get_memory_breakdown(param_dict): + return get_scalar_param(param_dict, MEMORY_BREAKDOWN, MEMORY_BREAKDOWN_DEFAULT) + + + +'''Write deepspeed config files by modifying basic templates. +Can be used for quickly changing parameters via command line parameters.''' + + +class DeepSpeedConfigWriter: + def __init__(self, data=None): + self.data = data if data is not None else {} + + def add_config(self, key, value): + self.data[key] = value + + def load_config(self, filename): + self.data = json.load(open(filename, + "r"), + object_pairs_hook=dict_raise_error_on_duplicate_keys) + + def write_config(self, filename): + with open(filename, "w") as outfile: + json.dump(self.data, outfile) + + +class DeepSpeedConfig(object): + def __init__(self, config: Union[str, dict], mpu=None): + + # If elastic-mode enabled, update compute + update _param_dict + if self.elasticity_enabled: + logger.info("DeepSpeed elasticity support enabled") + final_batch_size, valid_gpus, micro_batch_size = compute_elastic_config( + ds_config=self._param_dict, + target_deepspeed_version=__version__, + world_size=self.world_size, + ) + + elastic_dict = self._param_dict[ELASTICITY] + + # Ensure the resource scheduler saw the same elastic config we are using at runtime + ensure_immutable_elastic_config(runtime_elastic_config_dict=elastic_dict) + + self.elastic_model_parallel_size = elastic_dict.get( + MODEL_PARLLEL_SIZE, + MODEL_PARLLEL_SIZE_DEFAULT) + if self.elastic_model_parallel_size < 1: + raise ElasticityConfigError( + "Model-Parallel size cannot be less than 1, " + f"given model-parallel size: {self.elastic_model_parallel_size}") + + self.num_gpus_per_node = elastic_dict.get(NUM_GPUS_PER_NODE, + NUM_GPUS_PER_NODE_DEFAULT) + if self.num_gpus_per_node < 1: + raise ElasticityConfigError( + "NUmber of GPUs per node cannot be less than 1, " + f"given number of GPUs per node: {self.num_gpus_per_node}") + + ignore_non_elastic_batch_info = elastic_dict.get( + IGNORE_NON_ELASTIC_BATCH_INFO, + IGNORE_NON_ELASTIC_BATCH_INFO_DEFAULT) + + if not ignore_non_elastic_batch_info: + batch_params = [ + TRAIN_BATCH_SIZE, + TRAIN_MICRO_BATCH_SIZE_PER_GPU, + GRADIENT_ACCUMULATION_STEPS, + ] + if any(map(lambda t: t in self._param_dict, batch_params)): + raise ElasticityConfigError("One or more batch related parameters were found in your " \ + f"ds_config ({TRAIN_BATCH_SIZE}, {TRAIN_MICRO_BATCH_SIZE_PER_GPU}, and/or " \ + f"{GRADIENT_ACCUMULATION_STEPS}). These parameters *will not be used* since " \ + "elastic training is enabled, which takes control of these parameters. " \ + "If you want to suppress this error (the parameters will be silently ignored) " \ + f"please set {IGNORE_NON_ELASTIC_BATCH_INFO}':true in your elasticity config.") + + # micro_bsz * world_size * gas = total_batch_size + # gas = total_batch_size // (micro_bsz * world_size) + gradient_accu_steps = final_batch_size // (micro_batch_size * + self.world_size) + + if TRAIN_BATCH_SIZE in self._param_dict: + logger.warning( + "[Elasticity] overriding training_batch_size: " + f"{self._param_dict[TRAIN_BATCH_SIZE]} -> {final_batch_size}") + if TRAIN_MICRO_BATCH_SIZE_PER_GPU in self._param_dict: + logger.warning( + "[Elasticity] overriding train_micro_batch_size_per_gpu: " + f"{self._param_dict[TRAIN_MICRO_BATCH_SIZE_PER_GPU]} -> {micro_batch_size}" + ) + if GRADIENT_ACCUMULATION_STEPS in self._param_dict: + logger.warning( + "[Elasticity] overriding gradient_accumulation_steps: " + f"{self._param_dict[GRADIENT_ACCUMULATION_STEPS]} -> {gradient_accu_steps}" + ) + + logger.info(f"[Elasticity] valid GPU counts: {valid_gpus}") + + self._param_dict[TRAIN_BATCH_SIZE] = final_batch_size + self._param_dict[TRAIN_MICRO_BATCH_SIZE_PER_GPU] = micro_batch_size + self._param_dict[GRADIENT_ACCUMULATION_STEPS] = gradient_accu_steps + + + def print_user_config(self): + logger.info(" json = {}".format( + json.dumps( + self._param_dict, + sort_keys=True, + indent=4, + cls=ScientificNotationEncoder, + separators=(",", + ":"), + ))) + + def print(self, name): + logger.info("{}:".format(name)) + for arg in sorted(vars(self)): + if arg != "_param_dict": + dots = "." * (29 - len(arg)) + logger.info(" {} {} {}".format(arg, dots, getattr(self, arg))) + + self.print_user_config() diff --git a/deepspeed/runtime/config_utils.py b/deepspeed/runtime/config_utils.py index 08a50785ceb9..c31c1b0c338f 100755 --- a/deepspeed/runtime/config_utils.py +++ b/deepspeed/runtime/config_utils.py @@ -8,11 +8,40 @@ import json import collections import collections.abc +import torch from functools import reduce +from enum import Enum from pydantic import BaseModel from deepspeed.utils import logger +class DtypeEnum(Enum): + # The torch dtype must always be the first value (so we return torch.dtype) + fp16 = torch.float16, "torch.float16", "fp16", "float16", "half" + bf16 = torch.bfloat16, "torch.bfloat16", "bf16", "bfloat16" + fp32 = torch.float32, "torch.float32", "fp32", "float32", "float" + int8 = torch.int8, "torch.int8", "int8" + + # Copied from https://stackoverflow.com/a/43210118 + # Allows us to use multiple values for each Enum index and returns first + # listed value when Enum is called + def __new__(cls, *values): + obj = object.__new__(cls) + # first value is canonical value + obj._value_ = values[0] + for other_value in values[1:]: + cls._value2member_map_[other_value] = obj + obj._all_values = values + return obj + + def __repr__(self): + return "<%s.%s: %s>" % ( + self.__class__.__name__, + self._name_, + ", ".join([repr(v) for v in self._all_values]), + ) + + class DeepSpeedConfigModel(BaseModel): """ This class should be used as a base for all DeepSpeed configs. It extends diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index 44c089ac92a1..48566d12d88b 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -745,7 +745,7 @@ def zero_sub_group_size(self): return self._config.zero_config.sub_group_size def zero_optimization_stage(self): - return self._config.zero_optimization_stage + return self._config.zero_config.stage def zero_reduce_bucket_size(self): return self._config.zero_config.reduce_bucket_size