Skip to content

feat: refactor megatron data utils#1651

Merged
terrykong merged 20 commits intomainfrom
ashors/mcore-data
Jan 31, 2026
Merged

feat: refactor megatron data utils#1651
terrykong merged 20 commits intomainfrom
ashors/mcore-data

Conversation

@ashors1
Copy link
Contributor

@ashors1 ashors1 commented Dec 17, 2025

What does this PR do ?

Add a one line overview of what this PR aims to accomplish.

Issues

List issues that this PR closes (syntax):

Usage

  • You can potentially add a usage example below
# Add a code snippet demonstrating how to use this

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you run the unit tests and functional tests locally? Visit our Testing Guide for how to run tests
  • Did you add or update any necessary documentation? Visit our Document Development Guide for how to write, build and test the docs.

Additional Information

  • ...

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced microbatch data processing pipeline with optional sequence packing and improved memory efficiency.
    • Streamlined training workflow with cleaner function signatures.
  • Refactor

    • Reorganized internal data processing utilities for better code structure.
    • Simplified parameter handling in training functions.
  • Tests

    • Expanded test coverage for data processing components and training workflows.

✏️ Tip: You can customize this high-level summary in your review settings.

Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
@ashors1 ashors1 mentioned this pull request Dec 17, 2025
3 tasks
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: Anna Shors <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: Anna Shors <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
@ashors1 ashors1 requested review from cuichenx and terrykong January 15, 2026 20:15
Copy link

@cuichenx cuichenx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just some minor comments

Signed-off-by: ashors1 <ashors@nvidia.com>
@ashors1 ashors1 requested a review from yuki-97 January 22, 2026 19:39
Copy link
Contributor

@yuki-97 yuki-97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @ashors1 , lgtm, just left some comments for keeping a similar style for the two backend.

@ashors1 ashors1 marked this pull request as ready for review January 23, 2026 23:03
@ashors1 ashors1 requested review from a team as code owners January 23, 2026 23:03
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 23, 2026

📝 Walkthrough

Walkthrough

This refactoring relocates sequence packing, unpacking, and microbatch processing logic from the common.py module into a dedicated data.py module. The change simplifies the forward pass by delegating pre-processed microbatch preparation (including optional sequence packing, mask/position ID computation, and context-parallel sharding) to centralized data pipeline helpers.

Changes

Cohort / File(s) Summary
Forward pass simplification
nemo_rl/models/megatron/common.py
Removed sequence packing/unpacking helper functions (_pack_sequences_for_megatron, _unpack_sequences_from_megatron, _get_pack_sequence_parameters_for_megatron) and simplified forward_step_arbitrary_loss signature by eliminating padding/packing-related parameters (seq_length_key, pad_individual_seqs_to_multiple_of, pad_packed_seq_to_multiple_of, pad_full_seq_to). Function now expects pre-processed microbatch data from iterator.
New data processing pipeline
nemo_rl/models/megatron/data.py
Added ProcessedInputs and ProcessedMicrobatch dataclasses for encapsulating processed tensor data and metadata. Introduced make_processed_microbatch_iterator, get_microbatch_iterator, and process_microbatch to handle microbatch generation with optional sequence packing. Relocated _pack_sequences_for_megatron, _unpack_sequences_from_megatron, _get_pack_sequence_parameters_for_megatron from common.py. Added process_global_batch for distributed batch normalization and get_and_validate_seqlen utility.
Policy worker integration
nemo_rl/models/policy/workers/megatron_policy_worker.py
Refactored training, logprob, and inference paths to use get_microbatch_iterator and process_global_batch helpers from new data module. Replaced manual batch handling, sequence length propagation, and packing logic with unified pipeline. Removed LossType import; added imports for get_microbatch_iterator and process_global_batch.
Test updates
tests/unit/algorithms/test_sequence_packing_gradients.py, tests/unit/models/megatron/test_megatron_data.py
Updated test imports to reflect relocation of _pack_sequences_for_megatron to data.py. Added make_processed_microbatch_iterator to construct data iterators. Introduced comprehensive unit tests for ProcessedInputs/ProcessedMicrobatch dataclasses, microbatch processing (packed/unpacked), global batch processing, and iterator logic with various batching strategies.

Sequence Diagram(s)

sequenceDiagram
    participant DataSource as Data Source
    participant Iterator as get_microbatch_iterator
    participant Processor as make_processed_microbatch_iterator
    participant Microbatch as ProcessedMicrobatch
    participant Model as Model/Loss

    DataSource->>Iterator: raw batch data
    Iterator->>Processor: create iterator with config
    loop per microbatch
        Processor->>Processor: process_microbatch()
        alt pack_sequences
            Processor->>Processor: _pack_sequences_for_megatron()
        else unpacked
            Processor->>Processor: get_ltor_masks_and_position_ids()
        end
        Processor->>Microbatch: ProcessedInputs + BatchedDataDict
        Microbatch->>Model: forward_step_arbitrary_loss()
        Model->>Model: compute loss with pre-processed tensors
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested labels

CI:L1, Run CICD

Suggested reviewers

  • terrykong
  • zpqiu
  • chtruong814
🚥 Pre-merge checks | ✅ 2 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 78.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Test Results For Major Changes ⚠️ Warning PR description lacks test results and testing documentation despite major refactoring affecting data processing and numerics. Document test results, coverage, performance analysis, and numerics validation in PR description; resolve outstanding review comments.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: refactor megatron data utils' accurately captures the main objective of the PR, which involves refactoring and reorganizing the Megatron data utilities by moving functions between modules and introducing new data processing abstractions.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
nemo_rl/models/policy/workers/megatron_policy_worker.py (2)

1-1: Update copyright year to 2026.
The header should reflect the current year for modified non-test files.

✏️ Proposed fix
-# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
+# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
As per coding guidelines, please update the NVIDIA header year to 2026.

364-402: Use iterator-provided micro_batch_size.
get_microbatch_iterator can change micro_batch_size (e.g., sequence packing ⇒ 1). Passing mbs can misconfigure the pipeline schedule.

🐛 Proposed fix
                 (
                     data_iterator,
                     num_microbatches,
                     micro_batch_size,
-                    seq_length,
+                    _seq_length,
                     padded_seq_length,
                 ) = get_microbatch_iterator(
@@
                         data_iterator=data_iterator,
                         model=self.model,
                         num_microbatches=num_microbatches,
                         seq_length=padded_seq_length,
-                        micro_batch_size=mbs,
+                        micro_batch_size=micro_batch_size,
                         decoder_seq_length=padded_seq_length,
nemo_rl/models/megatron/common.py (1)

1-1: Update copyright year to 2026.
The header should reflect the current year for modified non-test files.

✏️ Proposed fix
-# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
+# Copyright (c) 2026, NVIDIA CORPORATION.  All rights reserved.
As per coding guidelines, please update the NVIDIA header year to 2026.
🤖 Fix all issues with AI agents
In `@nemo_rl/models/megatron/data.py`:
- Around line 33-36: Docstring for the dataclass ProcessedInputs is missing
proper punctuation/formatting (D415) and causing ruff-format/pre-commit
failures; update the class docstring to be a complete sentence ending with a
period (e.g., "Processed microbatch inputs used for model forward pass.") and
ensure quotes/spacing follow project docstring style, then re-run ruff-format
and the pre-commit hooks to commit the formatted changes.
- Line 1: Update the copyright header string "Copyright (c) 2025, NVIDIA
CORPORATION.  All rights reserved." to use 2026 instead of 2025; locate the
top-of-file header in nemo_rl/models/megatron/data.py and replace the year so it
reads 2026.
- Around line 206-224: process_microbatch currently does unguarded "with
straggler_timer(bdata=True):" which fails when straggler_timer is None; fix by
branching: if straggler_timer is None use contextlib.nullcontext() (import
nullcontext from contextlib) and do "with nullcontext():" else do "with
straggler_timer(bdata=True):". Update the with-statement in process_microbatch
to use this conditional context so direct test calls without a timer won't
error.
- Around line 508-515: PackedSeqParams is incorrectly given padded cumulative
lengths for cu_seqlens_q and cu_seqlens_kv; change the assignments so
cu_seqlens_q and cu_seqlens_kv receive the unpadded cumulative lengths variable
(the unpadded cu_seqlens) while cu_seqlens_q_padded and cu_seqlens_kv_padded
keep cu_seqlens_padded, ensuring the PackedSeqParams instance (constructed in
data.py) uses unpadded values for cu_seqlens_q/cu_seqlens_kv and padded values
for cu_seqlens_q_padded/cu_seqlens_kv_padded.

In `@nemo_rl/models/policy/workers/megatron_policy_worker.py`:
- Around line 768-777: The code calls .to("cuda") on processed_mb (from
get_microbatch_iterator) which is a dataclass and has no .to method; remove the
.to("cuda") call and use processed_mb directly (processed_mb =
next(data_iterator)) since make_processed_microbatch_iterator already moves
tensors to CUDA; keep subsequent references to processed_mb.data_dict,
processed_mb.input_ids, processed_mb.input_ids_cp_sharded,
processed_mb.attention_mask, processed_mb.position_ids,
processed_mb.packed_seq_params, processed_mb.cu_seqlens_padded, and
unpacked_input_ids = data_dict["input_ids"] unchanged.

In `@tests/unit/algorithms/test_sequence_packing_gradients.py`:
- Around line 331-347: The failing call to make_processed_microbatch_iterator is
missing the newly required straggler_timer argument; update the test invocation
(the call to make_processed_microbatch_iterator) to supply a suitable
straggler_timer (e.g., a simple timer/fixture or a no-op/dummy timer instance
used elsewhere in tests) so the call matches the new signature; ensure you
create or import the same timer type used by other tests and pass it as the
straggler_timer parameter when constructing the data_iterator.

In `@tests/unit/models/megatron/test_megatron_data.py`:
- Around line 292-308: The test incorrectly unpacks process_global_batch as a
tuple; update it to capture the dict return and access keys instead: call result
= process_global_batch(...) (keeping the same args), then replace uses of batch,
global_valid_seqs, global_valid_toks with result["batch"],
result["global_valid_seqs"], result["global_valid_toks"] respectively, and
adjust assertions and mock verifications to reference result["batch"] (e.g.,
result["batch"]["sample_mask"] and result["batch"]["input_ids"]) while keeping
the existing mock_data.get_batch and mock_all_reduce assertions.
♻️ Duplicate comments (2)
nemo_rl/models/megatron/data.py (2)

289-311: Return type/docstring mismatch for process_global_batch.
The function returns a dict, but the annotation and docstring suggest otherwise.

✏️ Proposed fix
-def process_global_batch(
+def process_global_batch(
     data: BatchedDataDict[Any],
     loss_fn: LossFunction,
     dp_group: torch.distributed.ProcessGroup,
     *,
     batch_idx: int,
     batch_size: int,
-) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
+) -> dict[str, torch.Tensor | BatchedDataDict[Any]]:
@@
-        dp_mesh: Data parallel mesh
+        dp_group: Data parallel process group

342-351: Return type annotation doesn’t match actual return.
The function returns five values, but the annotation only lists four.

✏️ Proposed fix
-) -> tuple[torch.Tensor, PackedSeqParams, torch.Tensor, Optional[torch.Tensor]]:
+) -> tuple[
+    torch.Tensor,  # all_input_ids
+    torch.Tensor,  # packed_input_ids
+    PackedSeqParams,
+    torch.Tensor,  # cu_seqlens
+    Optional[torch.Tensor],  # cu_seqlens_padded
+]:
🧹 Nitpick comments (2)
nemo_rl/models/megatron/common.py (1)

83-90: Trim unused processed_mb fields.
input_ids and cu_seqlens_padded are assigned but not used.

♻️ Proposed cleanup
-    input_ids = processed_mb.input_ids
     input_ids_cp_sharded = processed_mb.input_ids_cp_sharded
     attention_mask = processed_mb.attention_mask
     position_ids = processed_mb.position_ids
     packed_seq_params = processed_mb.packed_seq_params
-    cu_seqlens_padded = processed_mb.cu_seqlens_padded
nemo_rl/models/megatron/data.py (1)

630-635: Drop unused padded_seq_len.
This local is computed but never used.

♻️ Proposed cleanup
-            padded_seq_len = ((seq_len + pad_factor - 1) // pad_factor) * pad_factor
             start_idx = cu_seqlens_padded[b].item()

Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
@ashors1 ashors1 added CI:L1 Run doctests, unit tests, and functional tests and removed CI:L1 Run doctests, unit tests, and functional tests labels Jan 30, 2026
Signed-off-by: ashors1 <ashors@nvidia.com>
@ashors1 ashors1 added CI:L1 Run doctests, unit tests, and functional tests and removed CI:L1 Run doctests, unit tests, and functional tests labels Jan 30, 2026
Signed-off-by: ashors1 <ashors@nvidia.com>
@ashors1 ashors1 added CI:L1 Run doctests, unit tests, and functional tests and removed CI:L1 Run doctests, unit tests, and functional tests labels Jan 30, 2026
@terrykong terrykong enabled auto-merge (squash) January 31, 2026 02:02
@terrykong terrykong merged commit c265076 into main Jan 31, 2026
40 of 42 checks passed
@terrykong terrykong deleted the ashors/mcore-data branch January 31, 2026 02:02
yuanhangsu1986 pushed a commit to yuanhangsu1986/RL-Nemontron-Edge-Omni that referenced this pull request Feb 12, 2026
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: Anna Shors <ashors@nvidia.com>
Signed-off-by: yuanhangs <yuanhangs@nvidia.com>
yuanhangsu1986 pushed a commit to yuanhangsu1986/RL-Nemontron-Edge-Omni that referenced this pull request Feb 21, 2026
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: Anna Shors <ashors@nvidia.com>
Signed-off-by: yuanhangs <yuanhangs@nvidia.com>
yuanhangsu1986 pushed a commit to yuanhangsu1986/RL-Nemontron-Edge-Omni that referenced this pull request Feb 21, 2026
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: Anna Shors <ashors@nvidia.com>
Signed-off-by: yuanhangs <yuanhangs@nvidia.com>
seonjinn pushed a commit that referenced this pull request Mar 8, 2026
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: Anna Shors <ashors@nvidia.com>
seonjinn pushed a commit that referenced this pull request Mar 8, 2026
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: Anna Shors <ashors@nvidia.com>
seonjinn pushed a commit that referenced this pull request Mar 9, 2026
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: Anna Shors <ashors@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CI:L1 Run doctests, unit tests, and functional tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants