Skip to content

Commit

Permalink
Merge branch 'main' into oss-export-deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
oyilmaz-nvidia authored Apr 3, 2024
2 parents f22e9e0 + ea5d1ef commit 648acb6
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 203 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/cicd-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,9 @@ jobs:
# --env HYDRA_FULL_ERROR=1
steps:
- name: Log into ACR (Azure Container Registry) # this login is for the pushing step after
uses: azure/docker-login@v1
with:
login-server: nemoci.azurecr.io
password: ${{ secrets.ACR_PASSWORD }}
username: nemoci
run: |
# Login to Azure Container Registry
az acr login --name nemoci.azurecr.io
- name: Checkout repository
uses: actions/checkout@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from importlib.metadata import version
from typing import Any, Callable, Optional

import torch
from pkg_resources import packaging

from nemo.collections.nlp.modules.common.megatron.utils import ApexGuardDefaults
from nemo.collections.nlp.parts import utils_funcs
Expand Down Expand Up @@ -79,54 +81,56 @@ def __init__(
ub_tp_comm_overlap: bool = False,
ub_bulk_wgrad: bool = True,
ub_bulk_dgrad: bool = True,
ub_split_ag: bool = True,
ub_split_rs: bool = True,
ub_atomic_gemm_ag: bool = False,
ub_atomic_gemm_rs: bool = False,
autocast_dtype: Any = 16,
zero_centered_gamma: bool = False,
device: str = 'cuda',
**kwargs,
) -> None:
if not HAVE_MEGATRON_CORE or not HAVE_TE:
raise ImportError(IMPORT_ERROR)

super().__init__(
hidden_size=hidden_size,
ffn_hidden_size=ffn_hidden_size,
layernorm_epsilon=layernorm_epsilon,
num_attention_heads=num_attention_heads,
init_method=init_method,
output_layer_init_method=output_layer_init_method,
hidden_dropout=hidden_dropout,
attention_dropout=attention_dropout,
layer_number=layer_number,
kv_channels=kv_channels,
self_attn_mask_type=self_attn_mask_type,
tp_group=tp_group,
tp_size=tp_size,
params_dtype=params_dtype,
get_rng_state_tracker=get_rng_state_tracker,
fuse_wgrad_accumulation=fuse_wgrad_accumulation,
seq_length=seq_length,
micro_batch_size=micro_batch_size,
sequence_parallel=sequence_parallel,
apply_residual_connection_post_layernorm=apply_residual_connection_post_layernorm,
output_layernorm=output_layernorm,
layer_type=layer_type,
drop_path_rate=drop_path_rate,
set_parallel_mode=tp_size > 1,
fuse_qkv_params=True,
zero_centered_gamma=zero_centered_gamma,
ub_tp_comm_overlap=ub_tp_comm_overlap,
ub_bulk_wgrad=ub_bulk_wgrad,
ub_bulk_dgrad=ub_bulk_dgrad,
ub_split_ag=ub_split_ag,
ub_split_rs=ub_split_rs,
ub_atomic_gemm_ag=ub_atomic_gemm_ag,
ub_atomic_gemm_rs=ub_atomic_gemm_rs,
device=device,
)
# use_emha=use_emha,
transformer_layer_args = {
"hidden_size": hidden_size,
"ffn_hidden_size": ffn_hidden_size,
"layernorm_epsilon": layernorm_epsilon,
"num_attention_heads": num_attention_heads,
"init_method": init_method,
"output_layer_init_method": output_layer_init_method,
"hidden_dropout": hidden_dropout,
"attention_dropout": attention_dropout,
"layer_number": layer_number,
"kv_channels": kv_channels,
"self_attn_mask_type": self_attn_mask_type,
"tp_group": tp_group,
"tp_size": tp_size,
"params_dtype": params_dtype,
"get_rng_state_tracker": get_rng_state_tracker,
"fuse_wgrad_accumulation": fuse_wgrad_accumulation,
"seq_length": seq_length,
"micro_batch_size": micro_batch_size,
"sequence_parallel": sequence_parallel,
"apply_residual_connection_post_layernorm": apply_residual_connection_post_layernorm,
"output_layernorm": output_layernorm,
"layer_type": layer_type,
"drop_path_rate": drop_path_rate,
"set_parallel_mode": tp_size > 1,
"fuse_qkv_params": True,
"zero_centered_gamma": zero_centered_gamma,
"ub_tp_comm_overlap": ub_tp_comm_overlap,
"ub_bulk_wgrad": ub_bulk_wgrad,
"ub_bulk_dgrad": ub_bulk_dgrad,
"device": device,
}
te_version = packaging.version.Version(version("transformer-engine"))
if te_version > packaging.version.Version("1.5.0"):
transformer_layer_args["ub_overlap_ag"] = kwargs.get("ub_overlap_ag", True)
transformer_layer_args["ub_overlap_rs"] = kwargs.get("ub_overlap_rs", True)
else:
transformer_layer_args["ub_split_ag"] = kwargs.get("ub_split_ag", True)
transformer_layer_args["ub_split_rs"] = kwargs.get("ub_split_rs", True)
transformer_layer_args["ub_atomic_gemm_ag"] = kwargs.get("ub_atomic_gemm_ag", False)
transformer_layer_args["ub_atomic_gemm_rs"] = kwargs.get("ub_atomic_gemm_rs", False)
super().__init__(**transformer_layer_args)

# Dtype for forward pass - ignore amp O2
self.dtype = utils_funcs.torch_dtype_from_precision(autocast_dtype, megatron_amp_O2=None)
Expand Down Expand Up @@ -172,38 +176,42 @@ def __init__(self, config, layer_number=1, hidden_dropout=None):
self.is_first_microbatch = True
precision = 'bf16' if config.bf16 else 16

super().__init__(
hidden_size=config.hidden_size,
ffn_hidden_size=config.ffn_hidden_size,
layernorm_epsilon=config.layernorm_epsilon,
num_attention_heads=config.num_attention_heads,
init_method=config.init_method,
output_layer_init_method=config.output_layer_init_method,
hidden_dropout=config.hidden_dropout,
attention_dropout=config.attention_dropout,
layer_number=layer_number + self._get_layer_offset(),
kv_channels=config.kv_channels,
# self_attn_mask_type='causal', # Use default 'causal'
tp_size=parallel_state.get_tensor_model_parallel_world_size(),
params_dtype=config.params_dtype,
get_rng_state_tracker=tensor_parallel.random.get_cuda_rng_tracker,
fuse_wgrad_accumulation=config.gradient_accumulation_fusion,
seq_length=None, # used for jit warmup
micro_batch_size=None, # used for jit warmup
sequence_parallel=config.sequence_parallel,
apply_residual_connection_post_layernorm=config.apply_residual_connection_post_layernorm,
autocast_dtype=precision,
# use_emha=False, # Use default 'False'
ub_tp_comm_overlap=config.tp_comm_overlap,
ub_bulk_wgrad=config.tp_comm_bulk_wgrad,
ub_bulk_dgrad=config.tp_comm_bulk_dgrad,
ub_split_ag=config.tp_comm_split_ag,
ub_split_rs=config.tp_comm_split_rs,
ub_atomic_gemm_ag=config.tp_comm_atomic_ag,
ub_atomic_gemm_rs=config.tp_comm_atomic_rs,
zero_centered_gamma=config.layernorm_zero_centered_gamma,
device='cpu' if config.use_cpu_initialization else 'cuda',
)
transformer_layer_args = {
"hidden_size": config.hidden_size,
"ffn_hidden_size": config.ffn_hidden_size,
"layernorm_epsilon": config.layernorm_epsilon,
"num_attention_heads": config.num_attention_heads,
"init_method": config.init_method,
"output_layer_init_method": config.output_layer_init_method,
"hidden_dropout": config.hidden_dropout,
"attention_dropout": config.attention_dropout,
"layer_number": layer_number + self._get_layer_offset(),
"kv_channels": config.kv_channels,
"tp_size": parallel_state.get_tensor_model_parallel_world_size(),
"params_dtype": config.params_dtype,
"get_rng_state_tracker": tensor_parallel.random.get_cuda_rng_tracker,
"fuse_wgrad_accumulation": config.gradient_accumulation_fusion,
"seq_length": None, # used for jit warmup
"micro_batch_size": None, # used for jit warmup
"sequence_parallel": config.sequence_parallel,
"apply_residual_connection_post_layernorm": config.apply_residual_connection_post_layernorm,
"autocast_dtype": precision,
"ub_tp_comm_overlap": config.tp_comm_overlap,
"ub_bulk_wgrad": config.tp_comm_bulk_wgrad,
"ub_bulk_dgrad": config.tp_comm_bulk_dgrad,
"zero_centered_gamma": config.layernorm_zero_centered_gamma,
"device": 'cpu' if config.use_cpu_initialization else 'cuda',
}
te_version = packaging.version.Version(version("transformer-engine"))
if te_version > packaging.version.Version("1.5.0"):
transformer_layer_args["ub_overlap_ag"] = config.tp_comm_overlap_ag
transformer_layer_args["ub_overlap_rs"] = config.tp_comm_overlap_rs
else:
transformer_layer_args["ub_split_ag"] = config.tp_comm_split_ag
transformer_layer_args["ub_split_rs"] = config.tp_comm_split_rs
transformer_layer_args["ub_atomic_gemm_ag"] = config.tp_comm_atomic_ag
transformer_layer_args["ub_atomic_gemm_rs"] = config.tp_comm_atomic_rs
super().__init__(**transformer_layer_args)

# Called by MCore's TransformerBlock.forward
# megatron/core/transformer/transformer_block.py
Expand Down
105 changes: 67 additions & 38 deletions nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
MegatronPretrainingRandomSampler,
MegatronPretrainingSampler,
)
from nemo.collections.nlp.data.language_modeling.megatron.gpt_dataset import build_train_valid_test_datasets
from nemo.collections.nlp.data.language_modeling.megatron.gpt_fim_dataset import (
GPTFIMDataset,
GPTFIMDatasetConfig,
Expand Down Expand Up @@ -93,7 +94,10 @@
from megatron.core.datasets.gpt_dataset import GPTDataset, GPTDatasetConfig, MockGPTDataset
from megatron.core.deploy.gpt.model_specs import get_gpt_layer_ammo_spec
from megatron.core.models.gpt import GPTModel as MCoreGPTModel
from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_with_transformer_engine_spec
from megatron.core.models.gpt.gpt_layer_specs import (
get_gpt_layer_local_spec,
get_gpt_layer_with_transformer_engine_spec,
)
from megatron.core.pipeline_parallel.schedules import get_forward_backward_func
from megatron.core.transformer.module import Float16Module as MCoreFloat16Module
from megatron.core.transformer.transformer_config import TransformerConfig
Expand Down Expand Up @@ -133,12 +137,15 @@ def mcore_supports_moe() -> bool:
return False


def get_specs(spec_name, num_experts=None):
def get_specs(spec_name, num_experts=None, moe_grouped_gemm=False, use_te=True):
if num_experts is not None:
assert mcore_supports_moe(), "Megatron-core >= v0.5.0 is required for MoE"

if use_te and spec_name == '':
spec_name = 'te_gpt'
name_spec_dict = {
"": get_gpt_layer_with_transformer_engine_spec(num_experts),
"": get_gpt_layer_local_spec(num_experts, moe_grouped_gemm),
"te_gpt": get_gpt_layer_with_transformer_engine_spec(num_experts, moe_grouped_gemm),
"megatron_falcon_gpt": get_falcon_layer_spec(),
"megatron_gpt_full_te_layer_autocast": get_gpt_full_te_layer_autocast_spec(),
"ammo": get_gpt_layer_ammo_spec(),
Expand Down Expand Up @@ -301,6 +308,10 @@ def __init__(self, cfg: DictConfig, trainer: Trainer):
if self.cfg.get('expert_model_parallel_size', 1) > 1 and self.with_distributed_adam:
raise ValueError('Expert parallelism is currently not supporting distributed optimizer')

self.transformer_engine = cfg.get('transformer_engine', False)
if self.megatron_amp_O2 and not self.transformer_engine:
logging.warning('megatron_amp_O2 is enabled but transformer-engine is not.')

# build_model returns a list of modules which are used for interleaved pipeline parallelism
if isinstance(self.trainer.accelerator, CPUAccelerator):
self.model = build_model(
Expand Down Expand Up @@ -341,8 +352,6 @@ def __init__(self, cfg: DictConfig, trainer: Trainer):
True if (not self.megatron_amp_O2) and (self.autocast_dtype in [torch.float16, torch.bfloat16]) else False
)

self.transformer_engine = cfg.get('transformer_engine', False)

# configuration used for inference
self._inference_config = None

Expand Down Expand Up @@ -380,7 +389,12 @@ def model_provider_func(self, pre_process, post_process):
if self.mcore_gpt:
model = MCoreGPTModel(
config=self.transformer_config,
transformer_layer_spec=get_specs(self.spec_name, self.transformer_config.num_moe_experts),
transformer_layer_spec=get_specs(
self.spec_name,
self.transformer_config.num_moe_experts,
self.transformer_config.moe_grouped_gemm,
self.transformer_engine,
),
vocab_size=self.cfg.get('override_vocab_size', self.padded_vocab_size),
max_sequence_length=self.cfg.get('encoder_seq_length', 512),
pre_process=pre_process,
Expand Down Expand Up @@ -958,9 +972,10 @@ def get_batch(self, data_iterator, tuning):
'tokens': data["tokens"],
'labels': data["labels"],
'loss_mask': data["loss_mask"],
'attention_mask': data["attention_mask"],
'position_ids': data["position_ids"],
}
if "attention_mask" in data:
batch['attention_mask'] = data["attention_mask"]

return batch

Expand Down Expand Up @@ -1296,41 +1311,55 @@ def build_train_valid_test_datasets(self):
fim_tokens = [fim_tokens.prefix, fim_tokens.middle, fim_tokens.suffix, fim_tokens.pad, fim_tokens.eod]
self.tokenizer.add_special_tokens({'additional_special_tokens': fim_tokens})

mock_dataset = True if self.cfg.data.get("data_impl", "mmap") == "mock" else False
kwargs = {
"is_built_on_rank": is_dataset_built_on_rank,
"random_seed": self.cfg.seed,
"sequence_length": self.cfg.data.seq_length,
"path_to_cache": self.cfg.data.index_mapping_dir,
"tokenizer": self.tokenizer,
"reset_position_ids": self.reset_position_ids,
"reset_attention_mask": self.reset_attention_mask,
"eod_mask_loss": self.eod_mask_loss,
"mock": mock_dataset,
"mmap_bin_files": self.cfg.data.get("mmap_bin_files", True),
}

# support for dict data input type
if isinstance(self.cfg.data.data_prefix, DictConfig):
_pref = self.cfg.data.data_prefix
kwargs['blend_per_split'] = [_pref['train'], _pref['validation'], _pref['test']]
if self.cfg.data.get("legacy_dataset", False):
self._train_ds, self._validation_ds, self._test_ds = build_train_valid_test_datasets(
cfg=self.cfg,
trainer=self.trainer,
data_prefix=self.cfg.data.data_prefix,
data_impl=self.cfg.data.data_impl,
splits_string=self.cfg.data.splits_string,
train_valid_test_num_samples=train_valid_test_num_samples,
seq_length=self.cfg.data.seq_length,
seed=self.cfg.seed,
skip_warmup=self.cfg.data.get('skip_warmup', True),
tokenizer=self.tokenizer,
)
else:
kwargs['blend'] = self.cfg.data.data_prefix
kwargs["split"] = self.cfg.data.splits_string
mock_dataset = True if self.cfg.data.get("data_impl", "mmap") == "mock" else False
kwargs = {
"is_built_on_rank": is_dataset_built_on_rank,
"random_seed": self.cfg.seed,
"sequence_length": self.cfg.data.seq_length,
"path_to_cache": self.cfg.data.index_mapping_dir,
"tokenizer": self.tokenizer,
"reset_position_ids": self.reset_position_ids,
"reset_attention_mask": self.reset_attention_mask,
"eod_mask_loss": self.eod_mask_loss,
"mock": mock_dataset,
"mmap_bin_files": self.cfg.data.get("mmap_bin_files", True),
}

# support for dict data input type
if isinstance(self.cfg.data.data_prefix, DictConfig):
_pref = self.cfg.data.data_prefix
kwargs['blend_per_split'] = [_pref['train'], _pref['validation'], _pref['test']]
else:
kwargs['blend'] = self.cfg.data.data_prefix
kwargs["split"] = self.cfg.data.splits_string

if self.cfg.data.get('add_fim', False):
dataset_config = GPTFIMDatasetConfig(self.cfg.data.fim, **kwargs)
if self.cfg.data.get('add_fim', False):
dataset_config = GPTFIMDatasetConfig(self.cfg.data.fim, **kwargs)

self._train_ds, self._validation_ds, self._test_ds = BlendedMegatronDatasetBuilder(
GPTFIMDataset, train_valid_test_num_samples, dataset_config,
).build()
else:
dataset_config = GPTDatasetConfig(**kwargs)
dataset_type = MockGPTDataset if mock_dataset else GPTDataset
self._train_ds, self._validation_ds, self._test_ds = BlendedMegatronDatasetBuilder(
GPTFIMDataset, train_valid_test_num_samples, dataset_config,
).build()
else:
dataset_config = GPTDatasetConfig(**kwargs)
dataset_type = MockGPTDataset if mock_dataset else GPTDataset

self._train_ds, self._validation_ds, self._test_ds = BlendedMegatronDatasetBuilder(
dataset_type, train_valid_test_num_samples, dataset_config,
).build()
self._train_ds, self._validation_ds, self._test_ds = BlendedMegatronDatasetBuilder(
dataset_type, train_valid_test_num_samples, dataset_config,
).build()

if self._train_ds is not None:
logging.info(f'Length of train dataset: {len(self._train_ds)}')
Expand Down
Loading

0 comments on commit 648acb6

Please sign in to comment.