Skip to content
Merged
82 changes: 82 additions & 0 deletions src/megatron/bridge/data/datasets/packing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,85 @@ def fill_packing_strategy(
assert all(not seq[0] for seq in ifile_handles.values()), "Error: There are items left over from the assignment"
assert all(not seq[1] for seq in ifile_handles.values()), "Error: There are items left over from the assignment"
return output_data


def get_seqlen_list(elem: Dict) -> Tuple[List[int], int]:
"""Extract per-sequence token counts from a packed dataset element.

Args:
elem: A packed dataset element with 'input_ids' and 'seq_start_id' fields.

Returns:
A tuple of (token_counts, tokens_minus_eos) where token_counts is a list of
per-sequence token counts (excluding EOS) and tokens_minus_eos is the total
token count excluding EOS tokens.
"""
num_seq = len(elem["seq_start_id"])
tokens_total = len(elem["input_ids"])
tokens_minus_eos = tokens_total - num_seq

seq_boundaries = elem["seq_start_id"] + [tokens_total]

# subtract 1 to account for removing eos token
token_counts = [seq_boundaries[i + 1] - seq_boundaries[i] - 1 for i in range(num_seq)]

assert sum(token_counts) == tokens_minus_eos, (sum(token_counts), tokens_minus_eos)

return token_counts, tokens_minus_eos


def calculate_avg_seqlen(
dataset_file: str, gbs: int, max_seq_len: int, drop_remainder: bool
) -> Tuple[float, float, float, float]:
"""Calculate average sequence length statistics from a packed dataset.

Args:
dataset_file: Path to the .npy packed dataset file.
gbs: Global batch size used to determine how many rows to process.
max_seq_len: Maximum sequence length (reserved for future use).
drop_remainder: If True, drop rows that don't fill a complete batch.

Returns:
A tuple of (avg_seqlen_count, avg_seqlen_total, avg_seqlen_sq_individual, avg_seqlen_sq_per_row):
- avg_seqlen_count: Average number of sequences per row.
- avg_seqlen_total: Average total tokens (excluding EOS) per row.
- avg_seqlen_sq_individual: Average of squared per-sequence lengths.
- avg_seqlen_sq_per_row: Average of summed squared sequence lengths per row.

Raises:
ValueError: If no rows remain after applying drop_remainder, or if no sequences are found.
"""
data = np.load(dataset_file, allow_pickle=True)

total_len_accum = 0
seqlen_sq_accum = 0
seq_count_accum = 0

rows_total = len(data)
count = (rows_total // gbs) * gbs if drop_remainder else rows_total

if count != rows_total:
logger.info(f"Dropping {rows_total - count}, total was {rows_total}")

for i, elem in enumerate(data):
if i >= count:
break
seqlen_list, total_count = get_seqlen_list(elem)
seqlen_sq_list = [s * s for s in seqlen_list]
total_len_accum += total_count
seqlen_sq_accum += sum(seqlen_sq_list)
seq_count_accum += len(seqlen_list)

if count == 0:
raise ValueError(
f"No rows to process: dataset has {rows_total} rows but gbs={gbs} with drop_remainder={drop_remainder}."
)
if seq_count_accum == 0:
raise ValueError("No sequences found in dataset; cannot compute average sequence length.")

avg_seqlen_count = seq_count_accum / count
avg_seqlen_total = total_len_accum / count
avg_seqlen_sq_individual = seqlen_sq_accum / seq_count_accum
avg_seqlen_sq_per_row = seqlen_sq_accum / count

return avg_seqlen_count, avg_seqlen_total, avg_seqlen_sq_individual, avg_seqlen_sq_per_row
52 changes: 50 additions & 2 deletions src/megatron/bridge/training/utils/flop_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@
# limitations under the License.

import importlib
from pathlib import Path

import torch.nn.functional as F

from megatron.bridge.data.datasets.packing_utils import calculate_avg_seqlen
from megatron.bridge.peft.lora import LoRA
from megatron.bridge.training.config import ConfigContainer
from megatron.bridge.utils.vocab_utils import calculate_padded_vocab_size


_lora_seq_stats_cache: dict = {}


def num_floating_point_operations(cfg: ConfigContainer, batch_size: int = 1):
"""Return the number of floating point operations"""
# If the model provider has a custom TFLOPS calculation method, use it.
if hasattr(cfg.model, "_get_num_floating_point_operations"):
peft = getattr(cfg, "peft", None)
is_lora = isinstance(peft, LoRA)
# If the model provider has a custom TFLOPS calculation method, use it (non-LoRA only).
if not is_lora and hasattr(cfg.model, "_get_num_floating_point_operations"):
return cfg.model._get_num_floating_point_operations(batch_size)

def calculate_layer_counts():
Expand Down Expand Up @@ -200,6 +208,46 @@ def transformer_flops():
num_query_groups = (
cfg.model.num_attention_heads if cfg.model.num_query_groups is None else cfg.model.num_query_groups
)

is_squad = getattr(getattr(cfg, "dataset", None), "dataset_name", None) == "squad"
hf_model_id = getattr(cfg.model, "hf_model_id", None)
is_llama3_70b = hf_model_id is not None and "Meta-Llama-3-70B" in hf_model_id
packed_specs = getattr(getattr(cfg, "dataset", None), "packed_sequence_specs", None)
packed_data_path = getattr(packed_specs, "packed_train_data_path", None)
# If not explicitly set, try to find the file via dataset_root (the FinetuningDatasetBuilder
# computes this path dynamically, but dataset_root is available from the config).
if packed_data_path is None and packed_specs is not None:
dataset_root = getattr(cfg.dataset, "dataset_root", None)
seq_size = getattr(packed_specs, "packed_sequence_size", None)
if dataset_root is not None and seq_size is not None:
matches = sorted(Path(dataset_root).glob(f"packed/*/training_{seq_size}.npy"))
if matches:
packed_data_path = str(matches[0])
Comment on lines +219 to +225
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Path discovery via glob is brittle.

Using sorted(Path(...).glob(...)) and taking matches[0] relies on alphabetical ordering, which may not correspond to the intended file. Consider adding a warning if multiple matches exist or making the selection criteria more explicit.

Suggested improvement
             if dataset_root is not None and seq_size is not None:
                 matches = sorted(Path(dataset_root).glob(f"packed/*/training_{seq_size}.npy"))
                 if matches:
+                    if len(matches) > 1:
+                        import logging
+                        logging.getLogger(__name__).warning(
+                            f"Multiple packed data files found: {matches}. Using {matches[0]}"
+                        )
                     packed_data_path = str(matches[0])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/megatron/bridge/training/utils/flop_utils.py` around lines 201 - 207, The
current logic that sets packed_data_path by taking the first element of
sorted(Path(dataset_root).glob(f"packed/*/training_{seq_size}.npy")) is brittle
and may pick the wrong file; update the block that reads
cfg.dataset.dataset_root, packed_specs.packed_sequence_size and builds matches
to handle multiple hits explicitly: if multiple matches exist, either (a) choose
deterministically by a clear criterion (e.g., natural sort, newest by mtime, or
a specific directory attribute) and document that choice, or (b) emit a warning
via the project logger including the full list of matches and which path is
selected (or raise an error to force user disambiguation); ensure the final
assignment to packed_data_path and any logs include the selected path so
consumers of packed_data_path can be traced.

if is_lora and is_squad and is_llama3_70b and packed_data_path is not None and Path(packed_data_path).exists():
gbs = cfg.train.global_batch_size
seq_len = cfg.model.seq_length
cache_key = (packed_data_path, gbs, seq_len)
if cache_key not in _lora_seq_stats_cache:
_lora_seq_stats_cache[cache_key] = calculate_avg_seqlen(
packed_data_path, gbs, seq_len, drop_remainder=True
)
_, avg_tokens, _, avg_seqlen2 = _lora_seq_stats_cache[cache_key]

hs = cfg.model.hidden_size
n_layers = cfg.model.num_layers
n_heads = cfg.model.num_attention_heads
ffn_hs = cfg.model.ffn_hidden_size
vocab_size = cfg.model.vocab_size

model_flops_frozen = (
avg_tokens
* n_layers
* hs**2
* (12 + 12 * num_query_groups / n_heads + 18 * ffn_hs / hs + 6 * vocab_size / (n_layers * hs))
)
model_flops_unfrozen = n_layers * hs**2 * (12 * avg_seqlen2 / hs)

return batch_size * (model_flops_frozen * (2.0 / 3.0) + model_flops_unfrozen)
# MoE.
if cfg.model.num_moe_experts is None:
# Every Transformer MLP is dense.
Expand Down
Loading