diff --git a/src/megatron/bridge/data/datasets/packing_utils.py b/src/megatron/bridge/data/datasets/packing_utils.py index dd36f5e1f4..b53edd3563 100644 --- a/src/megatron/bridge/data/datasets/packing_utils.py +++ b/src/megatron/bridge/data/datasets/packing_utils.py @@ -269,3 +269,85 @@ def fill_packing_strategy( assert all(not seq[0] for seq in ifile_handles.values()), "Error: There are items left over from the assignment" assert all(not seq[1] for seq in ifile_handles.values()), "Error: There are items left over from the assignment" return output_data + + +def get_seqlen_list(elem: Dict) -> Tuple[List[int], int]: + """Extract per-sequence token counts from a packed dataset element. + + Args: + elem: A packed dataset element with 'input_ids' and 'seq_start_id' fields. + + Returns: + A tuple of (token_counts, tokens_minus_eos) where token_counts is a list of + per-sequence token counts (excluding EOS) and tokens_minus_eos is the total + token count excluding EOS tokens. + """ + num_seq = len(elem["seq_start_id"]) + tokens_total = len(elem["input_ids"]) + tokens_minus_eos = tokens_total - num_seq + + seq_boundaries = elem["seq_start_id"] + [tokens_total] + + # subtract 1 to account for removing eos token + token_counts = [seq_boundaries[i + 1] - seq_boundaries[i] - 1 for i in range(num_seq)] + + assert sum(token_counts) == tokens_minus_eos, (sum(token_counts), tokens_minus_eos) + + return token_counts, tokens_minus_eos + + +def calculate_avg_seqlen( + dataset_file: str, gbs: int, max_seq_len: int, drop_remainder: bool +) -> Tuple[float, float, float, float]: + """Calculate average sequence length statistics from a packed dataset. + + Args: + dataset_file: Path to the .npy packed dataset file. + gbs: Global batch size used to determine how many rows to process. + max_seq_len: Maximum sequence length (reserved for future use). + drop_remainder: If True, drop rows that don't fill a complete batch. + + Returns: + A tuple of (avg_seqlen_count, avg_seqlen_total, avg_seqlen_sq_individual, avg_seqlen_sq_per_row): + - avg_seqlen_count: Average number of sequences per row. + - avg_seqlen_total: Average total tokens (excluding EOS) per row. + - avg_seqlen_sq_individual: Average of squared per-sequence lengths. + - avg_seqlen_sq_per_row: Average of summed squared sequence lengths per row. + + Raises: + ValueError: If no rows remain after applying drop_remainder, or if no sequences are found. + """ + data = np.load(dataset_file, allow_pickle=True) + + total_len_accum = 0 + seqlen_sq_accum = 0 + seq_count_accum = 0 + + rows_total = len(data) + count = (rows_total // gbs) * gbs if drop_remainder else rows_total + + if count != rows_total: + logger.info(f"Dropping {rows_total - count}, total was {rows_total}") + + for i, elem in enumerate(data): + if i >= count: + break + seqlen_list, total_count = get_seqlen_list(elem) + seqlen_sq_list = [s * s for s in seqlen_list] + total_len_accum += total_count + seqlen_sq_accum += sum(seqlen_sq_list) + seq_count_accum += len(seqlen_list) + + if count == 0: + raise ValueError( + f"No rows to process: dataset has {rows_total} rows but gbs={gbs} with drop_remainder={drop_remainder}." + ) + if seq_count_accum == 0: + raise ValueError("No sequences found in dataset; cannot compute average sequence length.") + + avg_seqlen_count = seq_count_accum / count + avg_seqlen_total = total_len_accum / count + avg_seqlen_sq_individual = seqlen_sq_accum / seq_count_accum + avg_seqlen_sq_per_row = seqlen_sq_accum / count + + return avg_seqlen_count, avg_seqlen_total, avg_seqlen_sq_individual, avg_seqlen_sq_per_row diff --git a/src/megatron/bridge/training/utils/flop_utils.py b/src/megatron/bridge/training/utils/flop_utils.py index 4b21d17071..26e0b56574 100644 --- a/src/megatron/bridge/training/utils/flop_utils.py +++ b/src/megatron/bridge/training/utils/flop_utils.py @@ -13,17 +13,25 @@ # limitations under the License. import importlib +from pathlib import Path import torch.nn.functional as F +from megatron.bridge.data.datasets.packing_utils import calculate_avg_seqlen +from megatron.bridge.peft.lora import LoRA from megatron.bridge.training.config import ConfigContainer from megatron.bridge.utils.vocab_utils import calculate_padded_vocab_size +_lora_seq_stats_cache: dict = {} + + def num_floating_point_operations(cfg: ConfigContainer, batch_size: int = 1): """Return the number of floating point operations""" - # If the model provider has a custom TFLOPS calculation method, use it. - if hasattr(cfg.model, "_get_num_floating_point_operations"): + peft = getattr(cfg, "peft", None) + is_lora = isinstance(peft, LoRA) + # If the model provider has a custom TFLOPS calculation method, use it (non-LoRA only). + if not is_lora and hasattr(cfg.model, "_get_num_floating_point_operations"): return cfg.model._get_num_floating_point_operations(batch_size) def calculate_layer_counts(): @@ -200,6 +208,46 @@ def transformer_flops(): num_query_groups = ( cfg.model.num_attention_heads if cfg.model.num_query_groups is None else cfg.model.num_query_groups ) + + is_squad = getattr(getattr(cfg, "dataset", None), "dataset_name", None) == "squad" + hf_model_id = getattr(cfg.model, "hf_model_id", None) + is_llama3_70b = hf_model_id is not None and "Meta-Llama-3-70B" in hf_model_id + packed_specs = getattr(getattr(cfg, "dataset", None), "packed_sequence_specs", None) + packed_data_path = getattr(packed_specs, "packed_train_data_path", None) + # If not explicitly set, try to find the file via dataset_root (the FinetuningDatasetBuilder + # computes this path dynamically, but dataset_root is available from the config). + if packed_data_path is None and packed_specs is not None: + dataset_root = getattr(cfg.dataset, "dataset_root", None) + seq_size = getattr(packed_specs, "packed_sequence_size", None) + if dataset_root is not None and seq_size is not None: + matches = sorted(Path(dataset_root).glob(f"packed/*/training_{seq_size}.npy")) + if matches: + packed_data_path = str(matches[0]) + if is_lora and is_squad and is_llama3_70b and packed_data_path is not None and Path(packed_data_path).exists(): + gbs = cfg.train.global_batch_size + seq_len = cfg.model.seq_length + cache_key = (packed_data_path, gbs, seq_len) + if cache_key not in _lora_seq_stats_cache: + _lora_seq_stats_cache[cache_key] = calculate_avg_seqlen( + packed_data_path, gbs, seq_len, drop_remainder=True + ) + _, avg_tokens, _, avg_seqlen2 = _lora_seq_stats_cache[cache_key] + + hs = cfg.model.hidden_size + n_layers = cfg.model.num_layers + n_heads = cfg.model.num_attention_heads + ffn_hs = cfg.model.ffn_hidden_size + vocab_size = cfg.model.vocab_size + + model_flops_frozen = ( + avg_tokens + * n_layers + * hs**2 + * (12 + 12 * num_query_groups / n_heads + 18 * ffn_hs / hs + 6 * vocab_size / (n_layers * hs)) + ) + model_flops_unfrozen = n_layers * hs**2 * (12 * avg_seqlen2 / hs) + + return batch_size * (model_flops_frozen * (2.0 / 3.0) + model_flops_unfrozen) # MoE. if cfg.model.num_moe_experts is None: # Every Transformer MLP is dense.