From aedd445c58adcb7fc4f2866dc03d6cfc7b386632 Mon Sep 17 00:00:00 2001 From: Ahmad Kiswani Date: Thu, 19 Feb 2026 11:44:03 +0200 Subject: [PATCH] chore: Switching mcore to upstream main Signed-off-by: Ahmad Kiswani --- .gitmodules | 4 +- .../Megatron-Bridge-workspace/Megatron-Bridge | 2 +- 3rdparty/Megatron-Bridge-workspace/setup.py | 6 +- 3rdparty/Megatron-LM-workspace/Megatron-LM | 2 +- 3rdparty/Megatron-LM-workspace/setup.py | 7 +- .../policy/workers/megatron_policy_worker.py | 75 +- pyproject.toml | 8 +- .../sequence_packing_gradient_actor.py | 380 +++++++ .../test_sequence_packing_gradients.py | 363 +------ .../models/megatron/megatron_data_actors.py | 982 ++++++++++++++++++ .../models/megatron/test_megatron_data.py | 964 +---------------- uv.lock | 112 +- 12 files changed, 1493 insertions(+), 1412 deletions(-) create mode 100644 tests/unit/algorithms/sequence_packing_gradient_actor.py create mode 100644 tests/unit/models/megatron/megatron_data_actors.py diff --git a/.gitmodules b/.gitmodules index c1b0c5a56f..8d7c7be7e5 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,7 +1,7 @@ [submodule "3rdparty/Megatron-LM"] path = 3rdparty/Megatron-LM-workspace/Megatron-LM - url = https://github.com/yaoyu-33/Megatron-LM.git - branch = yifu/remove_do_not_average_loss + url = https://github.com/NVIDIA/Megatron-LM.git + branch = main shallow = true [submodule "3rdparty/Megatron-Bridge"] path = 3rdparty/Megatron-Bridge-workspace/Megatron-Bridge diff --git a/3rdparty/Megatron-Bridge-workspace/Megatron-Bridge b/3rdparty/Megatron-Bridge-workspace/Megatron-Bridge index 15398e08fc..f91542b909 160000 --- a/3rdparty/Megatron-Bridge-workspace/Megatron-Bridge +++ b/3rdparty/Megatron-Bridge-workspace/Megatron-Bridge @@ -1 +1 @@ -Subproject commit 15398e08fc86be3de084c7382116527246ab1852 +Subproject commit f91542b90908ad08b7e13672feea03e27bedee27 diff --git a/3rdparty/Megatron-Bridge-workspace/setup.py b/3rdparty/Megatron-Bridge-workspace/setup.py index a0beea9449..d0bdd8711f 100644 --- a/3rdparty/Megatron-Bridge-workspace/setup.py +++ b/3rdparty/Megatron-Bridge-workspace/setup.py @@ -27,7 +27,7 @@ CACHED_DEPENDENCIES = [ "transformers<5.0.0", - "datasets", + "datasets>=2.20.0", "accelerate", "omegaconf>=2.3.0", "tensorboard>=2.19.0", @@ -41,13 +41,15 @@ "hydra-core>1.3,<=1.3.2", "megatron-core[dev,mlm]>=0.15.0a0,<0.17.0", "qwen-vl-utils", - "transformer-engine[pytorch]>=2.10.0a0,<2.12.0", + "transformer-engine[pytorch,core_cu13]>=2.10.0a0,<2.13.0", "mamba-ssm", "nvidia-resiliency-ext", "causal-conv1d", "flash-linear-attention", "timm", "open-clip-torch>=3.2.0", + "mlflow>=3.5.0", + "torch>=2.6.0", ] # If the bridge source exists, compare cached dependencies with the submodule's pyproject diff --git a/3rdparty/Megatron-LM-workspace/Megatron-LM b/3rdparty/Megatron-LM-workspace/Megatron-LM index b12071b947..23dd639cf3 160000 --- a/3rdparty/Megatron-LM-workspace/Megatron-LM +++ b/3rdparty/Megatron-LM-workspace/Megatron-LM @@ -1 +1 @@ -Subproject commit b12071b947f9ee3c6616306662069fc4ca77be4c +Subproject commit 23dd639cf3de30f3b9d8d0fae71ee31180be9ddd diff --git a/3rdparty/Megatron-LM-workspace/setup.py b/3rdparty/Megatron-LM-workspace/setup.py index fb0a7cf92e..380864cc25 100644 --- a/3rdparty/Megatron-LM-workspace/setup.py +++ b/3rdparty/Megatron-LM-workspace/setup.py @@ -43,7 +43,7 @@ # VCS dependencies use full "pkg @ git+URL@rev" format matching pyproject.toml [tool.uv.sources] CACHED_DEPENDENCIES = [ # Default dependencies from pyproject.toml - "torch", + "torch>=2.6.0", "numpy", "packaging>=24.2", # Dev dependencies from pyproject.toml @@ -58,7 +58,7 @@ "opentelemetry-api~=1.33.1", "mamba-ssm~=2.2", "causal-conv1d~=1.5", - "flash-linear-attention~=0.3.2", + "flash-linear-attention~=0.4.0", "nv-grouped-gemm~=1.1", "megatron-energon[av_decode]~=6.0", "av", @@ -69,6 +69,9 @@ "emerging_optimizers @ git+https://github.com/NVIDIA-NeMo/Emerging-Optimizers.git@v0.1.0", "datasets", "fastapi~=0.50", + "flask[async]", + "hypercorn", + "openai", ] diff --git a/nemo_rl/models/policy/workers/megatron_policy_worker.py b/nemo_rl/models/policy/workers/megatron_policy_worker.py index 5f1483ed9a..5a6a683765 100644 --- a/nemo_rl/models/policy/workers/megatron_policy_worker.py +++ b/nemo_rl/models/policy/workers/megatron_policy_worker.py @@ -36,9 +36,7 @@ from megatron.core.distributed.fsdp.mcore_fsdp_adapter import ( FullyShardedDataParallel as custom_FSDP, ) -from megatron.core.inference.model_inference_wrappers.inference_wrapper_config import ( - InferenceWrapperConfig, -) +from megatron.core.inference.config import InferenceConfig from megatron.core.inference.text_generation_controllers.text_generation_controller import ( TextGenerationController, ) @@ -702,14 +700,8 @@ def generate( ) model_cfg = self.megatron_cfg.model - inference_wrapper_config = InferenceWrapperConfig( - hidden_size=model_cfg.hidden_size, - inference_batch_times_seqlen_threshold=1000000, - fp32_residual_connection=model_cfg.fp32_residual_connection, - params_dtype=model_cfg.params_dtype, - padded_vocab_size=self.final_padded_vocab_size, # Use the potentially updated value - inference_max_seq_length=self.cfg["generation"]["max_new_tokens"], # type: ignore - inference_max_requests=self.cfg["generation_batch_size"], + mcore_generation_config = cast( + MegatronGenerationConfig, self.cfg["generation"]["mcore_generation_config"] ) from megatron.core.inference.contexts.dynamic_context import ( @@ -723,45 +715,32 @@ def generate( ) from megatron.core.inference.sampling_params import SamplingParams - mcore_generation_config = cast( - MegatronGenerationConfig, self.cfg["generation"]["mcore_generation_config"] - ) - buffer_size_gb = mcore_generation_config["buffer_size_gb"] - - num_cuda_graphs = mcore_generation_config["num_cuda_graphs"] - block_size_tokens = mcore_generation_config["block_size_tokens"] - use_cuda_graphs_for_non_decode_steps = mcore_generation_config[ - "use_cuda_graphs_for_non_decode_steps" - ] - enable_chunked_prefill = mcore_generation_config["enable_chunked_prefill"] - unified_memory_level = mcore_generation_config["unified_memory_level"] - max_tokens = mcore_generation_config["max_tokens"] - model_config = self.model.config model_config.cuda_graph_impl = "local" - dynamic_context = DynamicInferenceContext( - params_dtype=inference_wrapper_config.params_dtype, - num_layers=model_config.num_layers, - kv_channels=model_config.kv_channels, - num_attention_heads=model_config.num_query_groups, + local_rank = torch.cuda.current_device() + num_gpus_per_node = torch.cuda.device_count() + node_idx = self.rank // num_gpus_per_node if num_gpus_per_node > 0 else 0 + model_config.inference_sampling_seed = (node_idx * 1024) + local_rank + + inference_config = InferenceConfig( max_sequence_length=self.cfg["generation"]["max_new_tokens"], - buffer_size_gb=buffer_size_gb, - materialize_only_last_token_logits=False, - num_cuda_graphs=num_cuda_graphs, - block_size_tokens=block_size_tokens, - tensor_model_parallel_size=self.cfg["megatron_cfg"][ - "tensor_model_parallel_size" + buffer_size_gb=mcore_generation_config["buffer_size_gb"], + num_cuda_graphs=mcore_generation_config["num_cuda_graphs"], + block_size_tokens=mcore_generation_config["block_size_tokens"], + use_cuda_graphs_for_non_decode_steps=mcore_generation_config[ + "use_cuda_graphs_for_non_decode_steps" ], - use_cuda_graphs_for_non_decode_steps=use_cuda_graphs_for_non_decode_steps, + enable_chunked_prefill=mcore_generation_config["enable_chunked_prefill"], + unified_memory_level=mcore_generation_config["unified_memory_level"], + max_tokens=mcore_generation_config["max_tokens"], + materialize_only_last_token_logits=False, use_flashinfer_fused_rope=False, - unified_memory_level=unified_memory_level, - max_tokens=max_tokens, - ) - inference_wrapped_model = GPTInferenceWrapper( - self.model, inference_wrapper_config, dynamic_context ) + dynamic_context = DynamicInferenceContext(model_config, inference_config) + inference_wrapped_model = GPTInferenceWrapper(self.model, dynamic_context) + inference_wrapped_model.prep_model_for_inference() # Set pipeline parallel flag inference_wrapped_model.model_is_pipeline_parallel = ( @@ -773,21 +752,9 @@ def generate( tokenizer=self.megatron_tokenizer, ) - # Calculate seed based on node and rank to ensure reproducibility across workers - local_rank = torch.cuda.current_device() # Local GPU index on the node - num_gpus_per_node = torch.cuda.device_count() - node_idx = self.rank // num_gpus_per_node if num_gpus_per_node > 0 else 0 - seed = (node_idx * 1024) + local_rank - - # New API: DynamicInferenceEngine has additional parameters dynamic_engine = DynamicInferenceEngine( text_generation_controller, dynamic_context, - enable_cuda_graph=True, - random_seed=seed, - track_paused_request_events=False, - enable_chunked_prefill=enable_chunked_prefill, - inference_logging_step_interval=0, ) # Handle None values for top_k - convert to integer as required by Megatron diff --git a/pyproject.toml b/pyproject.toml index 7b702d2662..7b1ced085c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,7 @@ automodel = [ "mamba-ssm", "causal-conv1d", "nv-grouped-gemm", - "transformer-engine[pytorch]==2.8.0", + "transformer-engine[pytorch]>=2.9.0a0,<2.12.0", "deep_ep @ git+https://github.com/deepseek-ai/DeepEP.git@bfded34800dfec415b71503f8205181de90b2480", ] vllm = [ @@ -108,7 +108,7 @@ mcore = [ # This dependency also needs to be compatible with the spec in Megatron-Bridge/pyproject.toml. # It is specified here since we don't directly use Megatron-Bridge/pyproject.toml, but a proxy setup.py+pyproject.toml combo # outside to allow "optionally" installing the megatron path. It's simpler to deal with transformer-engine here in the NeMo RL pyproject.toml - "transformer-engine[pytorch]==2.8.0", + "transformer-engine[pytorch]>=2.9.0a0,<2.12.0", "megatron-core", "megatron-bridge", # Flash-attn version should be selected to satisfy both TE + vLLM requirements (xformers in particular) @@ -235,12 +235,12 @@ default-groups = ["dev", "build"] # --link-mode=copy (slower but more reliable; supresses warning) # --link-mode=symlink (fastest option when uv cache and venv on different file-system; caveat: venv is brittle since it depends on the environment/container) link-mode = "copy" -# The TE override is needed because automodel/mbridge we are on is still on 2.5.0 +# The TE override is needed because automodel/mbridge we are on is still on an older version # The opencv-python-headless override is needed because automodel pins it to 4.10.0.84, whereas vllm>=0.11.0 needs >= 4.11.0 # The timm override is needed because current automodel pins to 1.0.16. This can be removed once we move ToT automodel # The nvidia-modelopt override is needed because mcore is still on 0.33 override-dependencies = [ - "transformer-engine[pytorch]==2.8.0", + "transformer-engine[pytorch]>=2.9.0a0,<2.12.0", "opencv-python-headless>=4.11.0", "timm<=1.0.22", "nvidia-modelopt[torch]>=0.39.0", diff --git a/tests/unit/algorithms/sequence_packing_gradient_actor.py b/tests/unit/algorithms/sequence_packing_gradient_actor.py new file mode 100644 index 0000000000..20564d77af --- /dev/null +++ b/tests/unit/algorithms/sequence_packing_gradient_actor.py @@ -0,0 +1,380 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Ray actor for sequence packing gradient tests. + +Separated from test_sequence_packing_gradients.py to avoid importing pytest +in Ray worker environments that use PY_EXECUTABLES.MCORE. +""" + +import os +from unittest.mock import MagicMock + +import ray +import torch + +from nemo_rl.algorithms.loss_functions import ( + ClippedPGLossFn, + SequencePackingLossWrapper, +) +from nemo_rl.distributed.batched_data_dict import BatchedDataDict + + +@ray.remote(num_gpus=1) +class SequencePackingGradientTestActor: + def __init__(self, cp_size): + self.cp_size = cp_size + self.env_vars = dict(os.environ) + + def test_sequence_packing_gradients(self): + from nemo_rl.distributed.model_utils import _get_tokens_on_this_cp_rank + from nemo_rl.models.megatron.data import ( + _pack_sequences_for_megatron, + make_processed_microbatch_iterator, + ) + from nemo_rl.models.megatron.train import ( + LossPostProcessor, + forward_with_post_processing_fn, + ) + + # Initialize process group + torch.distributed.init_process_group(backend="nccl") + + rank = int(os.environ["RANK"]) + world_size = int(os.environ["WORLD_SIZE"]) + + # Create CP group - all ranks participate in CP + cp_group = torch.distributed.new_group(ranks=list(range(world_size))) + + # Patch get_context_parallel_group to always return cp_group + # (Assume it's imported from nemo_rl.models.megatron.common) + import megatron.core.parallel_state as parallel_state + + parallel_state._CONTEXT_PARALLEL_GROUP = cp_group + parallel_state._TENSOR_MODEL_PARALLEL_GROUP = torch.distributed.new_group( + ranks=[rank] + ) + + # Test parameters + batch_size = 4 + max_seq_len = 512 + vocab_size = 1000 + cp_size = self.cp_size + + # Ensure sequence length is compatible with CP load balancing + if max_seq_len % (2 * cp_size) != 0: + max_seq_len = (max_seq_len // (2 * cp_size) + 1) * (2 * cp_size) + + # Create test data with varying sequence lengths + torch.manual_seed(42) # For reproducibility + seq_lengths = torch.tensor( + [ + max_seq_len // 4, + max_seq_len * 1 // 4, + max_seq_len // 4, + max_seq_len * 3 // 4, + ] + ) + + # Create input data + input_ids = torch.zeros( + batch_size, max_seq_len, dtype=torch.long, device="cuda" + ) + token_mask = torch.zeros( + batch_size, max_seq_len, dtype=torch.float, device="cuda" + ) + + # Fill with random tokens up to seq_length + for i in range(batch_size): + length = seq_lengths[i] + input_ids[i, :length] = torch.randint( + 0, vocab_size, (length,), device="cuda" + ) + token_mask[i, :length] = 1.0 + + # Create other required tensors + sample_mask = torch.ones(batch_size, dtype=torch.float, device="cuda") + advantages = torch.randn(batch_size, max_seq_len, device="cuda") + prev_logprobs = torch.randn(batch_size, max_seq_len, device="cuda") + generation_logprobs = torch.randn(batch_size, max_seq_len, device="cuda") + reference_policy_logprobs = generation_logprobs.clone() + + original_data = { + "input_ids": input_ids, + "input_lengths": seq_lengths, + "token_mask": token_mask, + "sample_mask": sample_mask, + "advantages": advantages, + "prev_logprobs": prev_logprobs, + "generation_logprobs": generation_logprobs, + "reference_policy_logprobs": reference_policy_logprobs, + } + + # ===== TEST 1: Baseline (no sequence packing) ===== + print(f"Rank {rank}: Testing baseline (no sequence packing)") + + baseline_logits = torch.randn( + batch_size, max_seq_len, vocab_size, requires_grad=True, device="cuda" + ) + + loss_config = { + "reference_policy_kl_penalty": 0.1, + "reference_policy_kl_type": "k3", + "kl_input_clamp_value": 20.0, + "kl_output_clamp_value": 10.0, + "ratio_clip_min": 0.2, + "ratio_clip_max": 0.2, + "ratio_clip_c": 3.0, + "use_on_policy_kl_approximation": False, + "use_importance_sampling_correction": False, + "truncated_importance_sampling_ratio": None, + "sequence_level_importance_ratios": False, + "token_level_loss": True, + "force_on_policy_ratio": False, + } + + base_loss_fn = ClippedPGLossFn(loss_config) + data_dict = BatchedDataDict(original_data) + + global_valid_toks = torch.tensor( + sum(seq_lengths).item(), dtype=torch.float, device="cuda" + ) + global_valid_seqs = torch.tensor(batch_size, dtype=torch.float, device="cuda") + + # Forward pass + baseline_loss, baseline_metrics = base_loss_fn( + baseline_logits, + data_dict, + global_valid_seqs, + global_valid_toks, + ) + + # Backward pass + baseline_loss.backward() + + # Check baseline gradients + baseline_grad_norm = torch.norm(baseline_logits.grad).item() + baseline_grad_max = torch.max(torch.abs(baseline_logits.grad)).item() + baseline_grad_mean = torch.mean(torch.abs(baseline_logits.grad)).item() + baseline_grad_store = baseline_logits.grad.clone() + baseline_logits.grad.zero_() + + print( + f"Rank {rank}: Baseline gradient stats - norm: {baseline_grad_norm:.4f}, max: {baseline_grad_max:.4f}, mean: {baseline_grad_mean:.4f}" + ) + + # ===== TEST 2: Sequence packing with context parallelism ===== + print(f"Rank {rank}: Testing with sequence packing + CP") + + # Pack sequences + pad_to_multiple = cp_size * 2 # Common requirement for CP + ( + packed_input_ids, + packed_input_ids_cp, + packed_seq_params, + cu_seqlens, + cu_seqlens_padded, + ) = _pack_sequences_for_megatron( + input_ids, + seq_lengths, + pad_individual_seqs_to_multiple_of=pad_to_multiple, + pad_packed_seq_to=max_seq_len * batch_size if cp_size > 1 else None, + cp_rank=rank, + cp_size=cp_size, + ) + + # For CP, logits are sharded across context parallel ranks + def make_packed_logits(logits): + packed_logits = torch.zeros( + 1, packed_input_ids_cp.shape[1], vocab_size, device="cuda" + ) + run_seq = 0 + for i, seq_len in enumerate(seq_lengths): + padded_seqlen = cu_seqlens_padded[i + 1] - cu_seqlens_padded[i] + if padded_seqlen > baseline_logits.shape[1]: + # pad the logits with zeros + tmp_logits = torch.zeros( + 1, padded_seqlen, vocab_size, device="cuda" + ) + tmp_logits[:, :seq_len] = baseline_logits[i : i + 1, :seq_len] + else: + tmp_logits = baseline_logits[i : i + 1, :padded_seqlen] + packed_logits[ + :, run_seq // cp_size : (run_seq + padded_seqlen) // cp_size, : + ] = _get_tokens_on_this_cp_rank(tmp_logits, rank, cp_size) + run_seq += padded_seqlen + return packed_logits + + packed_logits = make_packed_logits(baseline_logits) + + # Create sequence packing wrapper + wrapper = SequencePackingLossWrapper( + loss_fn=base_loss_fn, + cu_seqlens_q=cu_seqlens, + cu_seqlens_q_padded=cu_seqlens_padded, + ) + + # Create data dict for packed sequences + packed_data_dict = BatchedDataDict(original_data) + + tp_group = torch.distributed.new_group(ranks=[rank]) + + # Forward pass + packed_loss, packed_metrics = wrapper( + packed_logits, + packed_data_dict, + global_valid_seqs, + global_valid_toks, + vocab_parallel_rank=0, + vocab_parallel_group=tp_group, + context_parallel_group=cp_group, + ) + + # Backward pass + packed_loss /= cp_size + packed_loss.backward() + + # Check packed gradients + packed_grad = baseline_logits.grad.clone() + # all-reduce across cp ranks + torch.distributed.all_reduce(packed_grad, op=torch.distributed.ReduceOp.SUM) + + packed_grad_norm = torch.norm(packed_grad).item() + packed_grad_max = torch.max(torch.abs(packed_grad)).item() + packed_grad_mean = torch.mean(torch.abs(packed_grad)).item() + + print( + f"Rank {rank}: Packed gradient stats - norm: {packed_grad_norm:.4f}, max: {packed_grad_max:.4f}, mean: {packed_grad_mean:.4f}" + ) + + # ===== ANALYSIS ===== + gradient_ratio_norm = ( + packed_grad_norm / baseline_grad_norm + if baseline_grad_norm > 0 + else float("inf") + ) + gradient_ratio_max = ( + packed_grad_max / baseline_grad_max + if baseline_grad_max > 0 + else float("inf") + ) + gradient_ratio_mean = ( + packed_grad_mean / baseline_grad_mean + if baseline_grad_mean > 0 + else float("inf") + ) + + print( + f"Rank {rank}: Gradient ratios - norm: {gradient_ratio_norm:.4f}, max: {gradient_ratio_max:.4f}, mean: {gradient_ratio_mean:.4f}" + ) + + print( + f"differences by token: {torch.sum(torch.abs(packed_grad - baseline_grad_store), dim=-1)}" + ) + + torch.testing.assert_close( + packed_grad, baseline_grad_store, atol=1e-5, rtol=1e-5 + ) + + # test 3: with forward_with_post_processing_fn + # reset grad + baseline_logits.grad.zero_() + packed_logits = make_packed_logits(baseline_logits) + + # mock straggler detector with dummy context manager + mock_straggler_timer = MagicMock() + mock_straggler_timer.return_value = MagicMock( + __enter__=MagicMock(return_value=None), + __exit__=MagicMock(return_value=False), + ) + + # mock model forward + class MockModel: + def __init__(self): + self.logits = packed_logits + + def __call__(self, *args, **kwargs): + return self.logits + + def forward( + self, input_ids, position_ids, attention_mask, packed_seq_params=None + ): + return self.logits + + cfg = { + "sequence_packing": {"enabled": True}, + "dynamic_batching": {"enabled": False}, + "megatron_cfg": { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 1, + "context_parallel_size": cp_size, + }, + } + + post_processor = LossPostProcessor( + loss_fn=base_loss_fn, + cfg=cfg, + cp_normalize=True, + ) + + output_tensor, wrapped_loss_fn = forward_with_post_processing_fn( + data_iterator=make_processed_microbatch_iterator( + iter([packed_data_dict]), + cfg=cfg, + seq_length_key="input_lengths", + pad_individual_seqs_to_multiple_of=pad_to_multiple, + pad_packed_seq_to_multiple_of=1, + straggler_timer=mock_straggler_timer, + pad_full_seq_to=max_seq_len * batch_size if cp_size > 1 else None, + ), + model=MockModel(), + cfg=cfg, + post_processing_fn=post_processor, + global_valid_seqs=global_valid_seqs, + global_valid_toks=global_valid_toks, + straggler_timer=mock_straggler_timer, + ) + loss, metrics = wrapped_loss_fn(output_tensor) + + loss.backward() + + # Check packed gradients + packed_grad = baseline_logits.grad.clone() + # all-reduce across cp ranks + torch.distributed.all_reduce(packed_grad, op=torch.distributed.ReduceOp.SUM) + + packed_grad_norm = torch.norm(packed_grad).item() + packed_grad_max = torch.max(torch.abs(packed_grad)).item() + packed_grad_mean = torch.mean(torch.abs(packed_grad)).item() + print( + f"Rank {rank}: Packed gradient stats - norm: {packed_grad_norm:.4f}, max: {packed_grad_max:.4f}, mean: {packed_grad_mean:.4f}" + ) + + gradient_ratio_norm = ( + packed_grad_norm / baseline_grad_norm + if baseline_grad_norm > 0 + else float("inf") + ) + gradient_ratio_max = ( + packed_grad_max / baseline_grad_max + if baseline_grad_max > 0 + else float("inf") + ) + + print( + f"Rank {rank}: Gradient ratios - norm: {gradient_ratio_norm:.4f}, max: {gradient_ratio_max:.4f}" + ) + print( + f"differences by token: {torch.sum(torch.abs(packed_grad - baseline_grad_store), dim=-1)}" + ) diff --git a/tests/unit/algorithms/test_sequence_packing_gradients.py b/tests/unit/algorithms/test_sequence_packing_gradients.py index f0ce832eb0..88ec9ce1b4 100644 --- a/tests/unit/algorithms/test_sequence_packing_gradients.py +++ b/tests/unit/algorithms/test_sequence_packing_gradients.py @@ -13,18 +13,10 @@ # limitations under the License. """Test script to debug high gradients with sequence packing + context parallelism.""" -import os -from unittest.mock import MagicMock - import pytest import ray import torch -from nemo_rl.algorithms.loss_functions import ( - ClippedPGLossFn, - SequencePackingLossWrapper, -) -from nemo_rl.distributed.batched_data_dict import BatchedDataDict from nemo_rl.distributed.named_sharding import NamedSharding from nemo_rl.distributed.ray_actor_environment_registry import ( ACTOR_ENVIRONMENT_REGISTRY, @@ -32,358 +24,9 @@ ) from nemo_rl.distributed.virtual_cluster import RayVirtualCluster from nemo_rl.distributed.worker_groups import RayWorkerBuilder, RayWorkerGroup - - -@ray.remote(num_gpus=1) -class SequencePackingGradientTestActor: - def __init__(self, cp_size): - self.cp_size = cp_size - self.env_vars = dict(os.environ) - - def test_sequence_packing_gradients(self): - from nemo_rl.distributed.model_utils import _get_tokens_on_this_cp_rank - from nemo_rl.models.megatron.data import ( - _pack_sequences_for_megatron, - make_processed_microbatch_iterator, - ) - from nemo_rl.models.megatron.train import ( - LossPostProcessor, - forward_with_post_processing_fn, - ) - - # Initialize process group - torch.distributed.init_process_group(backend="nccl") - - rank = int(os.environ["RANK"]) - world_size = int(os.environ["WORLD_SIZE"]) - - # Create CP group - all ranks participate in CP - cp_group = torch.distributed.new_group(ranks=list(range(world_size))) - - # Patch get_context_parallel_group to always return cp_group - # (Assume it's imported from nemo_rl.models.megatron.common) - import megatron.core.parallel_state as parallel_state - - parallel_state._CONTEXT_PARALLEL_GROUP = cp_group - parallel_state._TENSOR_MODEL_PARALLEL_GROUP = torch.distributed.new_group( - ranks=[rank] - ) - - # Test parameters - batch_size = 4 - max_seq_len = 512 - vocab_size = 1000 - cp_size = self.cp_size - - # Ensure sequence length is compatible with CP load balancing - if max_seq_len % (2 * cp_size) != 0: - max_seq_len = (max_seq_len // (2 * cp_size) + 1) * (2 * cp_size) - - # Create test data with varying sequence lengths - torch.manual_seed(42) # For reproducibility - seq_lengths = torch.tensor( - [ - max_seq_len // 4, - max_seq_len * 1 // 4, - max_seq_len // 4, - max_seq_len * 3 // 4, - ] - ) - - # Create input data - input_ids = torch.zeros( - batch_size, max_seq_len, dtype=torch.long, device="cuda" - ) - token_mask = torch.zeros( - batch_size, max_seq_len, dtype=torch.float, device="cuda" - ) - - # Fill with random tokens up to seq_length - for i in range(batch_size): - length = seq_lengths[i] - input_ids[i, :length] = torch.randint( - 0, vocab_size, (length,), device="cuda" - ) - token_mask[i, :length] = 1.0 - - # Create other required tensors - sample_mask = torch.ones(batch_size, dtype=torch.float, device="cuda") - advantages = torch.randn(batch_size, max_seq_len, device="cuda") - prev_logprobs = torch.randn(batch_size, max_seq_len, device="cuda") - generation_logprobs = torch.randn(batch_size, max_seq_len, device="cuda") - reference_policy_logprobs = generation_logprobs.clone() - - original_data = { - "input_ids": input_ids, - "input_lengths": seq_lengths, - "token_mask": token_mask, - "sample_mask": sample_mask, - "advantages": advantages, - "prev_logprobs": prev_logprobs, - "generation_logprobs": generation_logprobs, - "reference_policy_logprobs": reference_policy_logprobs, - } - - # ===== TEST 1: Baseline (no sequence packing) ===== - print(f"Rank {rank}: Testing baseline (no sequence packing)") - - baseline_logits = torch.randn( - batch_size, max_seq_len, vocab_size, requires_grad=True, device="cuda" - ) - - loss_config = { - "reference_policy_kl_penalty": 0.1, - "reference_policy_kl_type": "k3", - "kl_input_clamp_value": 20.0, - "kl_output_clamp_value": 10.0, - "ratio_clip_min": 0.2, - "ratio_clip_max": 0.2, - "ratio_clip_c": 3.0, - "use_on_policy_kl_approximation": False, - "use_importance_sampling_correction": False, - "truncated_importance_sampling_ratio": None, - "sequence_level_importance_ratios": False, - "token_level_loss": True, - "force_on_policy_ratio": False, - } - - base_loss_fn = ClippedPGLossFn(loss_config) - data_dict = BatchedDataDict(original_data) - - global_valid_toks = torch.tensor( - sum(seq_lengths).item(), dtype=torch.float, device="cuda" - ) - global_valid_seqs = torch.tensor(batch_size, dtype=torch.float, device="cuda") - - # Forward pass - baseline_loss, baseline_metrics = base_loss_fn( - baseline_logits, - data_dict, - global_valid_seqs, - global_valid_toks, - ) - - # Backward pass - baseline_loss.backward() - - # Check baseline gradients - baseline_grad_norm = torch.norm(baseline_logits.grad).item() - baseline_grad_max = torch.max(torch.abs(baseline_logits.grad)).item() - baseline_grad_mean = torch.mean(torch.abs(baseline_logits.grad)).item() - baseline_grad_store = baseline_logits.grad.clone() - baseline_logits.grad.zero_() - - print( - f"Rank {rank}: Baseline gradient stats - norm: {baseline_grad_norm:.4f}, max: {baseline_grad_max:.4f}, mean: {baseline_grad_mean:.4f}" - ) - - # ===== TEST 2: Sequence packing with context parallelism ===== - print(f"Rank {rank}: Testing with sequence packing + CP") - - # Pack sequences - pad_to_multiple = cp_size * 2 # Common requirement for CP - ( - packed_input_ids, - packed_input_ids_cp, - packed_seq_params, - cu_seqlens, - cu_seqlens_padded, - ) = _pack_sequences_for_megatron( - input_ids, - seq_lengths, - pad_individual_seqs_to_multiple_of=pad_to_multiple, - pad_packed_seq_to=max_seq_len * batch_size if cp_size > 1 else None, - cp_rank=rank, - cp_size=cp_size, - ) - - # For CP, logits are sharded across context parallel ranks - def make_packed_logits(logits): - packed_logits = torch.zeros( - 1, packed_input_ids_cp.shape[1], vocab_size, device="cuda" - ) - run_seq = 0 - for i, seq_len in enumerate(seq_lengths): - padded_seqlen = cu_seqlens_padded[i + 1] - cu_seqlens_padded[i] - if padded_seqlen > baseline_logits.shape[1]: - # pad the logits with zeros - tmp_logits = torch.zeros( - 1, padded_seqlen, vocab_size, device="cuda" - ) - tmp_logits[:, :seq_len] = baseline_logits[i : i + 1, :seq_len] - else: - tmp_logits = baseline_logits[i : i + 1, :padded_seqlen] - packed_logits[ - :, run_seq // cp_size : (run_seq + padded_seqlen) // cp_size, : - ] = _get_tokens_on_this_cp_rank(tmp_logits, rank, cp_size) - run_seq += padded_seqlen - return packed_logits - - packed_logits = make_packed_logits(baseline_logits) - - # Create sequence packing wrapper - wrapper = SequencePackingLossWrapper( - loss_fn=base_loss_fn, - cu_seqlens_q=cu_seqlens, - cu_seqlens_q_padded=cu_seqlens_padded, - ) - - # Create data dict for packed sequences - packed_data_dict = BatchedDataDict(original_data) - - tp_group = torch.distributed.new_group(ranks=[rank]) - - # Forward pass - packed_loss, packed_metrics = wrapper( - packed_logits, - packed_data_dict, - global_valid_seqs, - global_valid_toks, - vocab_parallel_rank=0, - vocab_parallel_group=tp_group, - context_parallel_group=cp_group, - ) - - # Backward pass - packed_loss /= cp_size - packed_loss.backward() - - # Check packed gradients - packed_grad = baseline_logits.grad.clone() - # all-reduce across cp ranks - torch.distributed.all_reduce(packed_grad, op=torch.distributed.ReduceOp.SUM) - - packed_grad_norm = torch.norm(packed_grad).item() - packed_grad_max = torch.max(torch.abs(packed_grad)).item() - packed_grad_mean = torch.mean(torch.abs(packed_grad)).item() - # print(f"max grad on dims {torch.max(torch.abs(packed_grad), dim=0)}, {torch.max(torch.abs(packed_grad), dim=1)}, {torch.max(torch.abs(packed_grad), dim=2)}") - - print( - f"Rank {rank}: Packed gradient stats - norm: {packed_grad_norm:.4f}, max: {packed_grad_max:.4f}, mean: {packed_grad_mean:.4f}" - ) - - # ===== ANALYSIS ===== - gradient_ratio_norm = ( - packed_grad_norm / baseline_grad_norm - if baseline_grad_norm > 0 - else float("inf") - ) - gradient_ratio_max = ( - packed_grad_max / baseline_grad_max - if baseline_grad_max > 0 - else float("inf") - ) - gradient_ratio_mean = ( - packed_grad_mean / baseline_grad_mean - if baseline_grad_mean > 0 - else float("inf") - ) - - print( - f"Rank {rank}: Gradient ratios - norm: {gradient_ratio_norm:.4f}, max: {gradient_ratio_max:.4f}, mean: {gradient_ratio_mean:.4f}" - ) - - print( - f"differences by token: {torch.sum(torch.abs(packed_grad - baseline_grad_store), dim=-1)}" - ) - - torch.testing.assert_close( - packed_grad, baseline_grad_store, atol=1e-5, rtol=1e-5 - ) - - # test 3: with forward_with_post_processing_fn - # reset grad - baseline_logits.grad.zero_() - packed_logits = make_packed_logits(baseline_logits) - - # mock straggler detector with dummy context manager - mock_straggler_timer = MagicMock() - mock_straggler_timer.return_value = MagicMock( - __enter__=MagicMock(return_value=None), - __exit__=MagicMock(return_value=False), - ) - - # mock model forward - class MockModel: - def __init__(self): - self.logits = packed_logits - - def __call__(self, *args, **kwargs): - return self.logits - - def forward( - self, input_ids, position_ids, attention_mask, packed_seq_params=None - ): - return self.logits - - cfg = { - "sequence_packing": {"enabled": True}, - "dynamic_batching": {"enabled": False}, - "megatron_cfg": { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 1, - "context_parallel_size": cp_size, - }, - } - - post_processor = LossPostProcessor( - loss_fn=base_loss_fn, - cfg=cfg, - cp_normalize=True, - ) - - output_tensor, wrapped_loss_fn = forward_with_post_processing_fn( - data_iterator=make_processed_microbatch_iterator( - iter([packed_data_dict]), - cfg=cfg, - seq_length_key="input_lengths", - pad_individual_seqs_to_multiple_of=pad_to_multiple, - pad_packed_seq_to_multiple_of=1, - straggler_timer=mock_straggler_timer, - pad_full_seq_to=max_seq_len * batch_size if cp_size > 1 else None, - ), - model=MockModel(), - cfg=cfg, - post_processing_fn=post_processor, - global_valid_seqs=global_valid_seqs, - global_valid_toks=global_valid_toks, - straggler_timer=mock_straggler_timer, - ) - loss, metrics = wrapped_loss_fn(output_tensor) - - loss.backward() - - # Check packed gradients - packed_grad = baseline_logits.grad.clone() - # all-reduce across cp ranks - torch.distributed.all_reduce(packed_grad, op=torch.distributed.ReduceOp.SUM) - - packed_grad_norm = torch.norm(packed_grad).item() - packed_grad_max = torch.max(torch.abs(packed_grad)).item() - packed_grad_mean = torch.mean(torch.abs(packed_grad)).item() - print( - f"Rank {rank}: Packed gradient stats - norm: {packed_grad_norm:.4f}, max: {packed_grad_max:.4f}, mean: {packed_grad_mean:.4f}" - ) - - gradient_ratio_norm = ( - packed_grad_norm / baseline_grad_norm - if baseline_grad_norm > 0 - else float("inf") - ) - gradient_ratio_max = ( - packed_grad_max / baseline_grad_max - if baseline_grad_max > 0 - else float("inf") - ) - - print( - f"Rank {rank}: Gradient ratios - norm: {gradient_ratio_norm:.4f}, max: {gradient_ratio_max:.4f}" - ) - print( - f"differences by token: {torch.sum(torch.abs(packed_grad - baseline_grad_store), dim=-1)}" - ) - +from tests.unit.algorithms.sequence_packing_gradient_actor import ( + SequencePackingGradientTestActor, +) SEQUENCE_PACKING_GRADIENT_TEST_ACTOR_FQN = ( f"{SequencePackingGradientTestActor.__module__}.SequencePackingGradientTestActor" diff --git a/tests/unit/models/megatron/megatron_data_actors.py b/tests/unit/models/megatron/megatron_data_actors.py new file mode 100644 index 0000000000..0687cf076b --- /dev/null +++ b/tests/unit/models/megatron/megatron_data_actors.py @@ -0,0 +1,982 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Ray actors for megatron data tests. + +Separated from test_megatron_data.py to avoid importing pytest +in Ray worker environments that use PY_EXECUTABLES.MCORE. +""" + +import os + +import ray +import torch + + +@ray.remote(num_gpus=1) +class PackSequencesTestActor: + def __init__(self, cp_size): + self.cp_size = cp_size + self.env_vars = dict(os.environ) + + def run_all_pack_sequences_tests(self): + """Run all sequence packing tests in a single call to avoid expensive reinitializations.""" + from nemo_rl.distributed.model_utils import _get_tokens_on_this_cp_rank + from nemo_rl.models.megatron.data import _pack_sequences_for_megatron + + # Initialize process group if CP > 1 + if self.cp_size > 1: + torch.distributed.init_process_group(backend="nccl") + rank = int(os.environ["RANK"]) + else: + rank = 0 + + results = {} + + # Test 1: Basic packing functionality + results["basic"] = self._test_basic_packing(_pack_sequences_for_megatron) + if not results["basic"]["success"]: + return results["basic"] + + # Test 2: Variable sequence lengths + results["variable_lengths"] = self._test_variable_lengths( + _pack_sequences_for_megatron + ) + if not results["variable_lengths"]["success"]: + return results["variable_lengths"] + + # Test 3: Content preservation and consistency + results["consistency"] = self._test_consistency(_pack_sequences_for_megatron) + if not results["consistency"]["success"]: + return results["consistency"] + + # Test 4: Edge cases + results["edge_cases"] = self._test_edge_cases(_pack_sequences_for_megatron) + if not results["edge_cases"]["success"]: + return results["edge_cases"] + + # Test 5: Context parallelism (only if CP > 1) + if self.cp_size > 1: + results["context_parallel"] = self._test_context_parallel( + _pack_sequences_for_megatron, _get_tokens_on_this_cp_rank, rank + ) + if not results["context_parallel"]["success"]: + return results["context_parallel"] + else: + results["context_parallel"] = { + "success": True, + "error": None, + "skipped": "CP=1", + } + + return {"success": True, "error": None, "detailed_results": results} + + def _test_basic_packing(self, _pack_sequences_for_megatron): + """Test basic sequence packing without context parallelism.""" + try: + # Test parameters + batch_size = 3 + max_seq_len = 10 + vocab_size = 100 + + # Create test data with variable sequence lengths + input_ids = torch.randint( + 0, vocab_size, (batch_size, max_seq_len), device="cuda" + ) + seq_lengths = torch.tensor([8, 5, 7], device="cuda") + + # Test 1: Basic packing without CP + packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( + _pack_sequences_for_megatron( + input_ids, seq_lengths, cp_rank=0, cp_size=1 + ) + ) + + # Verify shapes + expected_total_tokens = seq_lengths.sum().item() + if packed_input_ids.shape != (1, expected_total_tokens): + return { + "success": False, + "error": f"Basic packing shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids.shape}", + } + + # Verify cu_seqlens + expected_cu_seqlens = torch.tensor( + [0, 8, 13, 20], device="cuda", dtype=torch.int32 + ) + if not torch.equal(cu_seqlens, expected_cu_seqlens): + return { + "success": False, + "error": f"cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", + } + + # Verify PackedSeqParams + if packed_seq_params.qkv_format != "thd": + return { + "success": False, + "error": f"Wrong qkv_format: expected 'thd', got {packed_seq_params.qkv_format}", + } + + if packed_seq_params.max_seqlen_q != 8: + return { + "success": False, + "error": f"Wrong max_seqlen_q: expected 8, got {packed_seq_params.max_seqlen_q}", + } + + # Test 2: Packing with individual sequence padding + ( + packed_input_ids_pad, + _, + packed_seq_params_pad, + cu_seqlens_pad, + cu_seqlens_padded_pad, + ) = _pack_sequences_for_megatron( + input_ids, + seq_lengths, + pad_individual_seqs_to_multiple_of=4, + cp_rank=0, + cp_size=1, + ) + + # With padding to multiple of 4: [8, 5, 7] -> [8, 8, 8] = 24 tokens + expected_total_tokens_pad = 24 + if packed_input_ids_pad.shape != (1, expected_total_tokens_pad): + return { + "success": False, + "error": f"Padded packing shape mismatch: expected (1, {expected_total_tokens_pad}), got {packed_input_ids_pad.shape}", + } + + # Verify padded cu_seqlens + expected_cu_seqlens_padded = torch.tensor( + [0, 8, 16, 24], device="cuda", dtype=torch.int32 + ) + if not torch.equal(cu_seqlens_padded_pad, expected_cu_seqlens_padded): + return { + "success": False, + "error": f"Padded cu_seqlens mismatch: expected {expected_cu_seqlens_padded}, got {cu_seqlens_padded_pad}", + } + + return {"success": True, "error": None} + + except Exception as e: + return {"success": False, "error": f"Basic packing test failed: {str(e)}"} + + def _test_variable_lengths(self, _pack_sequences_for_megatron): + """Test sequence packing with variable sequence lengths.""" + try: + # Test parameters + batch_size = 4 + max_seq_len = 12 + vocab_size = 50 + + # Create test data with highly variable sequence lengths + input_ids = torch.randint( + 0, vocab_size, (batch_size, max_seq_len), device="cuda" + ) + seq_lengths = torch.tensor([12, 3, 8, 1], device="cuda") + + # Test 1: Variable lengths without padding + packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( + _pack_sequences_for_megatron( + input_ids, seq_lengths, cp_rank=0, cp_size=1 + ) + ) + + # Verify total tokens + expected_total_tokens = seq_lengths.sum().item() # 12 + 3 + 8 + 1 = 24 + if packed_input_ids.shape != (1, expected_total_tokens): + return { + "success": False, + "error": f"Variable lengths shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids.shape}", + } + + # Verify cu_seqlens + expected_cu_seqlens = torch.tensor( + [0, 12, 15, 23, 24], device="cuda", dtype=torch.int32 + ) + if not torch.equal(cu_seqlens, expected_cu_seqlens): + return { + "success": False, + "error": f"Variable lengths cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", + } + + # Test 2: Variable lengths with padding + ( + packed_input_ids_pad, + _, + packed_seq_params_pad, + cu_seqlens_pad, + cu_seqlens_padded_pad, + ) = _pack_sequences_for_megatron( + input_ids, + seq_lengths, + pad_individual_seqs_to_multiple_of=4, + cp_rank=0, + cp_size=1, + ) + + # With padding to multiple of 4: [12, 3, 8, 1] -> [12, 4, 8, 4] = 28 tokens + expected_total_tokens_pad = 28 + if packed_input_ids_pad.shape != (1, expected_total_tokens_pad): + return { + "success": False, + "error": f"Variable lengths padded shape mismatch: expected (1, {expected_total_tokens_pad}), got {packed_input_ids_pad.shape}", + } + + # Verify padded cu_seqlens + expected_cu_seqlens_padded = torch.tensor( + [0, 12, 16, 24, 28], device="cuda", dtype=torch.int32 + ) + if not torch.equal(cu_seqlens_padded_pad, expected_cu_seqlens_padded): + return { + "success": False, + "error": f"Variable lengths padded cu_seqlens mismatch: expected {expected_cu_seqlens_padded}, got {cu_seqlens_padded_pad}", + } + + # Verify max_seqlen + if packed_seq_params.max_seqlen_q != 12: + return { + "success": False, + "error": f"Variable lengths wrong max_seqlen_q: expected 12, got {packed_seq_params.max_seqlen_q}", + } + + if packed_seq_params_pad.max_seqlen_q != 12: + return { + "success": False, + "error": f"Variable lengths padded wrong max_seqlen_q: expected 12, got {packed_seq_params_pad.max_seqlen_q}", + } + + return {"success": True, "error": None} + + except Exception as e: + return { + "success": False, + "error": f"Variable lengths test failed: {str(e)}", + } + + def _test_consistency(self, _pack_sequences_for_megatron): + """Test that packing produces consistent results and that content is preserved.""" + try: + # Test parameters + batch_size = 2 + seq_len = 8 + vocab_size = 20 + + # Create deterministic test data + torch.manual_seed(123) + input_ids = torch.randint( + 0, vocab_size, (batch_size, seq_len), device="cuda" + ) + seq_lengths = torch.tensor([6, 4], device="cuda") + + # Test consistency between multiple calls + ( + packed_input_ids_1, + _, + packed_seq_params_1, + cu_seqlens_1, + cu_seqlens_padded_1, + ) = _pack_sequences_for_megatron( + input_ids, seq_lengths, cp_rank=0, cp_size=1 + ) + + ( + packed_input_ids_2, + _, + packed_seq_params_2, + cu_seqlens_2, + cu_seqlens_padded_2, + ) = _pack_sequences_for_megatron( + input_ids, seq_lengths, cp_rank=0, cp_size=1 + ) + + # Verify consistency + if not torch.equal(packed_input_ids_1, packed_input_ids_2): + return { + "success": False, + "error": "Inconsistent packed_input_ids between calls", + } + + if not torch.equal(cu_seqlens_1, cu_seqlens_2): + return { + "success": False, + "error": "Inconsistent cu_seqlens between calls", + } + + # Verify content preservation + # Extract the first sequence (length 6) and compare with original + first_seq_packed = packed_input_ids_1[0, :6] + first_seq_original = input_ids[0, :6] + + if not torch.equal(first_seq_packed, first_seq_original): + return { + "success": False, + "error": "Content not preserved in first sequence", + } + + # Extract the second sequence (length 4) and compare with original + second_seq_packed = packed_input_ids_1[0, 6:10] + second_seq_original = input_ids[1, :4] + + if not torch.equal(second_seq_packed, second_seq_original): + return { + "success": False, + "error": "Content not preserved in second sequence", + } + + return {"success": True, "error": None} + + except Exception as e: + return {"success": False, "error": f"Consistency test failed: {str(e)}"} + + def _test_edge_cases(self, _pack_sequences_for_megatron): + """Test edge cases and error conditions.""" + try: + # Test 1: Single sequence + batch_size = 1 + seq_len = 10 + vocab_size = 50 + + input_ids = torch.randint( + 0, vocab_size, (batch_size, seq_len), device="cuda" + ) + seq_lengths = torch.tensor([seq_len], device="cuda") + + packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( + _pack_sequences_for_megatron( + input_ids, seq_lengths, cp_rank=0, cp_size=1 + ) + ) + + # Verify single sequence packing + if packed_input_ids.shape != (1, seq_len): + return { + "success": False, + "error": f"Single sequence shape mismatch: expected (1, {seq_len}), got {packed_input_ids.shape}", + } + + expected_cu_seqlens = torch.tensor( + [0, seq_len], device="cuda", dtype=torch.int32 + ) + if not torch.equal(cu_seqlens, expected_cu_seqlens): + return { + "success": False, + "error": f"Single sequence cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", + } + + # Test 2: Empty sequences (length 0) + batch_size = 3 + max_seq_len = 5 + input_ids = torch.randint( + 0, vocab_size, (batch_size, max_seq_len), device="cuda" + ) + seq_lengths = torch.tensor([3, 0, 2], device="cuda") + + packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( + _pack_sequences_for_megatron( + input_ids, seq_lengths, cp_rank=0, cp_size=1 + ) + ) + + # Should handle empty sequences gracefully + expected_total_tokens = 5 # 3 + 0 + 2 + if packed_input_ids.shape != (1, expected_total_tokens): + return { + "success": False, + "error": f"Empty sequence shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids.shape}", + } + + expected_cu_seqlens = torch.tensor( + [0, 3, 3, 5], device="cuda", dtype=torch.int32 + ) + if not torch.equal(cu_seqlens, expected_cu_seqlens): + return { + "success": False, + "error": f"Empty sequence cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", + } + + # Test 3: Large padding values + batch_size = 2 + seq_len = 4 + input_ids = torch.randint( + 0, vocab_size, (batch_size, seq_len), device="cuda" + ) + seq_lengths = torch.tensor([3, 2], device="cuda") + + packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( + _pack_sequences_for_megatron( + input_ids, + seq_lengths, + pad_individual_seqs_to_multiple_of=8, + cp_rank=0, + cp_size=1, + ) + ) + + # With padding to multiple of 8: [3, 2] -> [8, 8] = 16 tokens + expected_total_tokens = 16 + if packed_input_ids.shape != (1, expected_total_tokens): + return { + "success": False, + "error": f"Large padding shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids.shape}", + } + + return {"success": True, "error": None} + + except Exception as e: + return {"success": False, "error": f"Edge cases test failed: {str(e)}"} + + def _test_context_parallel( + self, _pack_sequences_for_megatron, _get_tokens_on_this_cp_rank, rank + ): + """Test sequence packing with context parallelism.""" + # Test parameters + batch_size = 2 + seq_len = 16 # Ensure divisible by cp_size * 2 + vocab_size = 100 + + # Ensure sequence length is compatible with CP + if seq_len % (2 * self.cp_size) != 0: + seq_len = (seq_len // (2 * self.cp_size) + 1) * (2 * self.cp_size) + + # Create test data + torch.manual_seed(42) # For reproducibility + input_ids = torch.arange(seq_len * batch_size, device="cuda").reshape( + batch_size, seq_len + ) + seq_lengths = torch.tensor([seq_len, seq_len], device="cuda") + + # Test 1: CP packing with individual sequence padding + ( + packed_input_ids, + packed_input_ids_cp_sharded, + packed_seq_params, + cu_seqlens, + cu_seqlens_padded, + ) = _pack_sequences_for_megatron( + input_ids, + seq_lengths, + pad_individual_seqs_to_multiple_of=self.cp_size * 2, + cp_rank=rank, + cp_size=self.cp_size, + ) + + # Verify the packed tensor shape + expected_tokens_per_rank = seq_len // self.cp_size + expected_total_tokens = batch_size * expected_tokens_per_rank + if packed_input_ids_cp_sharded.shape != (1, expected_total_tokens): + return { + "success": False, + "error": f"CP packing shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids_cp_sharded.shape}", + } + + # Verify cu_seqlens for original sequences + expected_cu_seqlens = torch.tensor( + [0, seq_len, seq_len * 2], device="cuda", dtype=torch.int32 + ) + if not torch.equal(cu_seqlens, expected_cu_seqlens): + return { + "success": False, + "error": f"CP cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", + } + + # Verify PackedSeqParams + if packed_seq_params.qkv_format != "thd": + return { + "success": False, + "error": f"CP wrong qkv_format: expected 'thd', got {packed_seq_params.qkv_format}", + } + + # Test 2: CP packing with full sequence padding + pad_full_seq_to = (batch_size * seq_len) + 8 # Add some padding + ( + packed_input_ids_full, + packed_input_ids_cp_sharded, + packed_seq_params_full, + cu_seqlens_full, + cu_seqlens_padded_full, + ) = _pack_sequences_for_megatron( + input_ids, + seq_lengths, + pad_individual_seqs_to_multiple_of=self.cp_size * 2, + pad_packed_seq_to=pad_full_seq_to, + cp_rank=rank, + cp_size=self.cp_size, + ) + + # Verify the packed tensor shape with full padding + expected_tokens_per_rank_full = pad_full_seq_to // self.cp_size + if packed_input_ids_cp_sharded.shape != (1, expected_tokens_per_rank_full): + return { + "success": False, + "error": f"CP full padding shape mismatch: expected (1, {expected_tokens_per_rank_full}), got {packed_input_ids_cp_sharded.shape}", + } + + # Verify cu_seqlens_padded for full padding + expected_cu_seqlens_padded_full = torch.tensor( + [0, seq_len, pad_full_seq_to], device="cuda", dtype=torch.int32 + ) + if not torch.equal(cu_seqlens_padded_full, expected_cu_seqlens_padded_full): + return { + "success": False, + "error": f"CP full padding cu_seqlens_padded mismatch: expected {expected_cu_seqlens_padded_full}, got {cu_seqlens_padded_full}", + } + + correct_ids_0 = torch.tensor( + [0, 1, 2, 3, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 0, 0, 0, 0, 0, 0], + device="cuda", + ) + correct_ids_1 = torch.tensor( + [4, 5, 6, 7, 8, 9, 10, 11, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 0, 0], + device="cuda", + ) + + if ( + rank == 0 + and torch.sum(torch.abs(packed_input_ids_cp_sharded - correct_ids_0)).item() + != 0 + ): + return { + "success": False, + "error": f"CP full padding ids mismatch: expected {correct_ids_0}, got {packed_input_ids_cp_sharded[0, :20]}", + } + if ( + rank == 1 + and torch.sum(torch.abs(packed_input_ids_cp_sharded - correct_ids_1)).item() + != 0 + ): + return { + "success": False, + "error": f"CP full padding ids mismatch: expected {correct_ids_1}, got {packed_input_ids_cp_sharded[0, 20:]}", + } + + return {"success": True, "error": None} + + +@ray.remote(num_gpus=1) +class GetPackSequenceParametersTestActor: + def __init__(self): + pass + + def run_all_get_pack_sequence_parameters_for_megatron_tests(self): + """Test _get_pack_sequence_parameters_for_megatron function with various configurations.""" + from nemo_rl.models.megatron.data import ( + _get_pack_sequence_parameters_for_megatron, + ) + + # Test 1: Basic configuration - no parallelism, no FP8 + megatron_cfg = { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 1, + } + max_seq_len = 1023 + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + if pad_individual != 1 or pad_packed != 1 or pad_to is not None: + return { + "success": False, + "error": f"Expected pad_individual=1, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 2: Context parallelism only + megatron_cfg = { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 4, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + if pad_individual != 4 * 2 or pad_packed != 1 or pad_to is not None: + return { + "success": False, + "error": f"Expected pad_individual=4*2, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 3: Tensor parallelism with sequence parallelism + megatron_cfg = { + "tensor_model_parallel_size": 2, + "sequence_parallel": True, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 1, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + expected_individual = 2 # tp_size when SP is enabled + if pad_individual != 2 or pad_packed != 1 or pad_to is not None: + return { + "success": False, + "error": f"Expected pad_individual=2, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 4: Tensor parallelism without sequence parallelism + megatron_cfg = { + "tensor_model_parallel_size": 2, + "sequence_parallel": False, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 1, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + if pad_individual != 1 or pad_packed != 1 or pad_to is not None: + return { + "success": False, + "error": f"Expected pad_individual=1, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 5: Pipeline parallelism + megatron_cfg = { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 4, + "context_parallel_size": 1, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + if pad_individual != 1 or pad_packed != 1 or pad_to != max_seq_len: + return { + "success": False, + "error": f"Expected pad_individual=1, pad_packed=1, pad_to={max_seq_len}, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 6: Combined CP and TP with SP + megatron_cfg = { + "tensor_model_parallel_size": 2, + "sequence_parallel": True, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 4, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + expected_individual = 4 * 2 * 2 # cp_size * 2 * tp_size + if ( + pad_individual != expected_individual + or pad_packed != 1 + or pad_to is not None + ): + return { + "success": False, + "error": f"Expected pad_individual={expected_individual}, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 7: FP8 enabled with default recipe + megatron_cfg = { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 1, + "fp8_cfg": { + "enabled": True, + "fp8": "hybrid", + "fp8_recipe": "tensorwise", + "fp8_param": False, + }, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + if pad_individual != 1 or pad_packed != 16 or pad_to is not None: + return { + "success": False, + "error": f"Expected pad_individual=1, pad_packed=16, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 8: FP8 enabled with blockwise recipe + megatron_cfg = { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 1, + "fp8_cfg": { + "enabled": True, + "fp8": "e4m3", + "fp8_recipe": "blockwise", + "fp8_param": False, + }, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + if pad_individual != 1 or pad_packed != 128 or pad_to is not None: + return { + "success": False, + "error": f"Expected pad_individual=1, pad_packed=128, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 9: FP8 with CP and TP+SP + megatron_cfg = { + "tensor_model_parallel_size": 2, + "sequence_parallel": True, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 4, + "fp8_cfg": { + "enabled": True, + "fp8": "e4m3", + "fp8_recipe": "blockwise", + "fp8_param": False, + }, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + expected_individual = 4 * 2 * 2 # cp_size * 2 * tp_size + expected_packed = 128 * 4 * 2 * 2 # divisor * cp_size * 2 * tp_size + if ( + pad_individual != expected_individual + or pad_packed != expected_packed + or pad_to is not None + ): + return { + "success": False, + "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 10: All parallelism types with FP8 and PP + megatron_cfg = { + "tensor_model_parallel_size": 2, + "sequence_parallel": True, + "pipeline_model_parallel_size": 4, + "context_parallel_size": 2, + "fp8_cfg": { + "enabled": True, + "fp8": "hybrid", + "fp8_recipe": "tensorwise", + "fp8_param": False, + }, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + expected_individual = 2 * 2 * 2 # cp_size * 2 * tp_size + expected_packed = 16 * 2 * 2 * 2 # divisor * cp_size * 2 * tp_size + + def _round_up_to_multiple_of(x, y): + return (x + y - 1) // y * y + + if ( + pad_individual != expected_individual + or pad_packed != expected_packed + or pad_to != _round_up_to_multiple_of(max_seq_len, expected_packed) + ): + return { + "success": False, + "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to={max_seq_len}, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 11: FP8 disabled explicitly + megatron_cfg = { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 1, + "fp8_cfg": { + "enabled": False, + "fp8": "e4m3", + "fp8_recipe": "blockwise", + "fp8_param": False, + }, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + if pad_individual != 1 or pad_packed != 1 or pad_to is not None: + return { + "success": False, + "error": f"Expected pad_individual=1, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 12: Missing fp8_cfg (should default to disabled) + megatron_cfg = { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 1, + # No fp8_cfg key + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + if pad_individual != 1 or pad_packed != 1 or pad_to is not None: + return { + "success": False, + "error": f"Expected pad_individual=1, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 13: Edge case - very large parallelism values + megatron_cfg = { + "tensor_model_parallel_size": 8, + "sequence_parallel": True, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 8, + "fp8_cfg": { + "enabled": True, + "fp8": "e4m3", + "fp8_recipe": "blockwise", + "fp8_param": False, + }, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + expected_individual = 8 * 2 * 8 # cp_size * 2 * tp_size = 128 + expected_packed = 128 * 8 * 2 * 8 # divisor * cp_size * 2 * tp_size = 16384 + if ( + pad_individual != expected_individual + or pad_packed != expected_packed + or pad_to is not None + ): + return { + "success": False, + "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 14: Edge case - different max_seq_len values with PP + for test_seq_len in [512, 2048, 4096]: + megatron_cfg = { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 2, + "context_parallel_size": 1, + } + + pad_individual, pad_packed, pad_to = ( + _get_pack_sequence_parameters_for_megatron(megatron_cfg, test_seq_len) + ) + + if pad_individual != 1 or pad_packed != 1 or pad_to != test_seq_len: + return { + "success": False, + "error": f"Expected pad_individual=1, pad_packed=1, pad_to={test_seq_len}, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 15: FP8 with MXFP8 recipe + megatron_cfg = { + "tensor_model_parallel_size": 1, + "sequence_parallel": False, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 1, + "fp8_cfg": { + "enabled": True, + "fp8": "e4m3", + "fp8_recipe": "mxfp8", + "fp8_param": False, + }, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + if pad_individual != 1 or pad_packed != 32 or pad_to is not None: + return { + "success": False, + "error": f"Expected pad_individual=1, pad_packed=32, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 16: FP8 with MXFP8 recipe, CP, and TP+SP + megatron_cfg = { + "tensor_model_parallel_size": 2, + "sequence_parallel": True, + "pipeline_model_parallel_size": 1, + "context_parallel_size": 4, + "fp8_cfg": { + "enabled": True, + "fp8": "e4m3", + "fp8_recipe": "mxfp8", + "fp8_param": False, + }, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + expected_individual = 4 * 2 * 2 # cp_size * 2 * tp_size + expected_packed = 32 * 4 * 2 * 2 # divisor * cp_size * 2 * tp_size + + if ( + pad_individual != expected_individual + or pad_packed != expected_packed + or pad_to is not None + ): + return { + "success": False, + "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + # Test 17: FP8 with MXFP8 recipe, CP, TP+SP, and PP + megatron_cfg = { + "tensor_model_parallel_size": 2, + "sequence_parallel": True, + "pipeline_model_parallel_size": 4, + "context_parallel_size": 4, + "fp8_cfg": { + "enabled": True, + "fp8": "e4m3", + "fp8_recipe": "mxfp8", + "fp8_param": False, + }, + } + + pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( + megatron_cfg, max_seq_len + ) + + expected_individual = 4 * 2 * 2 # cp_size * 2 * tp_size + expected_packed = 32 * 4 * 2 * 2 # divisor * cp_size * 2 * tp_size * pp_size + + if ( + pad_individual != expected_individual + or pad_packed != expected_packed + or pad_to != _round_up_to_multiple_of(max_seq_len, expected_packed) + ): + return { + "success": False, + "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to={max_seq_len}, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", + } + + return {"success": True, "error": None} diff --git a/tests/unit/models/megatron/test_megatron_data.py b/tests/unit/models/megatron/test_megatron_data.py index 6e381d2933..3610b77d9f 100644 --- a/tests/unit/models/megatron/test_megatron_data.py +++ b/tests/unit/models/megatron/test_megatron_data.py @@ -23,7 +23,6 @@ - Sequence dimension validation """ -import os from unittest.mock import MagicMock, patch import pytest @@ -38,6 +37,10 @@ ) from nemo_rl.distributed.virtual_cluster import RayVirtualCluster from nemo_rl.distributed.worker_groups import RayWorkerBuilder, RayWorkerGroup +from tests.unit.models.megatron.megatron_data_actors import ( + GetPackSequenceParametersTestActor, + PackSequencesTestActor, +) @pytest.mark.mcore @@ -630,546 +633,6 @@ def test_make_processed_microbatch_iterator_with_packing(self, mock_process): assert call_kwargs["pad_full_seq_to"] == 1024 -@ray.remote(num_gpus=1) -class PackSequencesTestActor: - def __init__(self, cp_size): - self.cp_size = cp_size - self.env_vars = dict(os.environ) - - def run_all_pack_sequences_tests(self): - """Run all sequence packing tests in a single call to avoid expensive reinitializations.""" - from nemo_rl.distributed.model_utils import _get_tokens_on_this_cp_rank - from nemo_rl.models.megatron.data import _pack_sequences_for_megatron - - # Initialize process group if CP > 1 - if self.cp_size > 1: - torch.distributed.init_process_group(backend="nccl") - rank = int(os.environ["RANK"]) - else: - rank = 0 - - results = {} - - # Test 1: Basic packing functionality - results["basic"] = self._test_basic_packing(_pack_sequences_for_megatron) - if not results["basic"]["success"]: - return results["basic"] - - # Test 2: Variable sequence lengths - results["variable_lengths"] = self._test_variable_lengths( - _pack_sequences_for_megatron - ) - if not results["variable_lengths"]["success"]: - return results["variable_lengths"] - - # Test 3: Content preservation and consistency - results["consistency"] = self._test_consistency(_pack_sequences_for_megatron) - if not results["consistency"]["success"]: - return results["consistency"] - - # Test 4: Edge cases - results["edge_cases"] = self._test_edge_cases(_pack_sequences_for_megatron) - if not results["edge_cases"]["success"]: - return results["edge_cases"] - - # Test 5: Context parallelism (only if CP > 1) - if self.cp_size > 1: - results["context_parallel"] = self._test_context_parallel( - _pack_sequences_for_megatron, _get_tokens_on_this_cp_rank, rank - ) - if not results["context_parallel"]["success"]: - return results["context_parallel"] - else: - results["context_parallel"] = { - "success": True, - "error": None, - "skipped": "CP=1", - } - - return {"success": True, "error": None, "detailed_results": results} - - def _test_basic_packing(self, _pack_sequences_for_megatron): - """Test basic sequence packing without context parallelism.""" - try: - # Test parameters - batch_size = 3 - max_seq_len = 10 - vocab_size = 100 - - # Create test data with variable sequence lengths - input_ids = torch.randint( - 0, vocab_size, (batch_size, max_seq_len), device="cuda" - ) - seq_lengths = torch.tensor([8, 5, 7], device="cuda") - - # Test 1: Basic packing without CP - packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( - _pack_sequences_for_megatron( - input_ids, seq_lengths, cp_rank=0, cp_size=1 - ) - ) - - # Verify shapes - expected_total_tokens = seq_lengths.sum().item() - if packed_input_ids.shape != (1, expected_total_tokens): - return { - "success": False, - "error": f"Basic packing shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids.shape}", - } - - # Verify cu_seqlens - expected_cu_seqlens = torch.tensor( - [0, 8, 13, 20], device="cuda", dtype=torch.int32 - ) - if not torch.equal(cu_seqlens, expected_cu_seqlens): - return { - "success": False, - "error": f"cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", - } - - # Verify PackedSeqParams - if packed_seq_params.qkv_format != "thd": - return { - "success": False, - "error": f"Wrong qkv_format: expected 'thd', got {packed_seq_params.qkv_format}", - } - - if packed_seq_params.max_seqlen_q != 8: - return { - "success": False, - "error": f"Wrong max_seqlen_q: expected 8, got {packed_seq_params.max_seqlen_q}", - } - - # Test 2: Packing with individual sequence padding - ( - packed_input_ids_pad, - _, - packed_seq_params_pad, - cu_seqlens_pad, - cu_seqlens_padded_pad, - ) = _pack_sequences_for_megatron( - input_ids, - seq_lengths, - pad_individual_seqs_to_multiple_of=4, - cp_rank=0, - cp_size=1, - ) - - # With padding to multiple of 4: [8, 5, 7] -> [8, 8, 8] = 24 tokens - expected_total_tokens_pad = 24 - if packed_input_ids_pad.shape != (1, expected_total_tokens_pad): - return { - "success": False, - "error": f"Padded packing shape mismatch: expected (1, {expected_total_tokens_pad}), got {packed_input_ids_pad.shape}", - } - - # Verify padded cu_seqlens - expected_cu_seqlens_padded = torch.tensor( - [0, 8, 16, 24], device="cuda", dtype=torch.int32 - ) - if not torch.equal(cu_seqlens_padded_pad, expected_cu_seqlens_padded): - return { - "success": False, - "error": f"Padded cu_seqlens mismatch: expected {expected_cu_seqlens_padded}, got {cu_seqlens_padded_pad}", - } - - return {"success": True, "error": None} - - except Exception as e: - return {"success": False, "error": f"Basic packing test failed: {str(e)}"} - - def _test_variable_lengths(self, _pack_sequences_for_megatron): - """Test sequence packing with variable sequence lengths.""" - try: - # Test parameters - batch_size = 4 - max_seq_len = 12 - vocab_size = 50 - - # Create test data with highly variable sequence lengths - input_ids = torch.randint( - 0, vocab_size, (batch_size, max_seq_len), device="cuda" - ) - seq_lengths = torch.tensor([12, 3, 8, 1], device="cuda") - - # Test 1: Variable lengths without padding - packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( - _pack_sequences_for_megatron( - input_ids, seq_lengths, cp_rank=0, cp_size=1 - ) - ) - - # Verify total tokens - expected_total_tokens = seq_lengths.sum().item() # 12 + 3 + 8 + 1 = 24 - if packed_input_ids.shape != (1, expected_total_tokens): - return { - "success": False, - "error": f"Variable lengths shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids.shape}", - } - - # Verify cu_seqlens - expected_cu_seqlens = torch.tensor( - [0, 12, 15, 23, 24], device="cuda", dtype=torch.int32 - ) - if not torch.equal(cu_seqlens, expected_cu_seqlens): - return { - "success": False, - "error": f"Variable lengths cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", - } - - # Test 2: Variable lengths with padding - ( - packed_input_ids_pad, - _, - packed_seq_params_pad, - cu_seqlens_pad, - cu_seqlens_padded_pad, - ) = _pack_sequences_for_megatron( - input_ids, - seq_lengths, - pad_individual_seqs_to_multiple_of=4, - cp_rank=0, - cp_size=1, - ) - - # With padding to multiple of 4: [12, 3, 8, 1] -> [12, 4, 8, 4] = 28 tokens - expected_total_tokens_pad = 28 - if packed_input_ids_pad.shape != (1, expected_total_tokens_pad): - return { - "success": False, - "error": f"Variable lengths padded shape mismatch: expected (1, {expected_total_tokens_pad}), got {packed_input_ids_pad.shape}", - } - - # Verify padded cu_seqlens - expected_cu_seqlens_padded = torch.tensor( - [0, 12, 16, 24, 28], device="cuda", dtype=torch.int32 - ) - if not torch.equal(cu_seqlens_padded_pad, expected_cu_seqlens_padded): - return { - "success": False, - "error": f"Variable lengths padded cu_seqlens mismatch: expected {expected_cu_seqlens_padded}, got {cu_seqlens_padded_pad}", - } - - # Verify max_seqlen - if packed_seq_params.max_seqlen_q != 12: - return { - "success": False, - "error": f"Variable lengths wrong max_seqlen_q: expected 12, got {packed_seq_params.max_seqlen_q}", - } - - if packed_seq_params_pad.max_seqlen_q != 12: - return { - "success": False, - "error": f"Variable lengths padded wrong max_seqlen_q: expected 12, got {packed_seq_params_pad.max_seqlen_q}", - } - - return {"success": True, "error": None} - - except Exception as e: - return { - "success": False, - "error": f"Variable lengths test failed: {str(e)}", - } - - def _test_consistency(self, _pack_sequences_for_megatron): - """Test that packing produces consistent results and that content is preserved.""" - try: - # Test parameters - batch_size = 2 - seq_len = 8 - vocab_size = 20 - - # Create deterministic test data - torch.manual_seed(123) - input_ids = torch.randint( - 0, vocab_size, (batch_size, seq_len), device="cuda" - ) - seq_lengths = torch.tensor([6, 4], device="cuda") - - # Test consistency between multiple calls - ( - packed_input_ids_1, - _, - packed_seq_params_1, - cu_seqlens_1, - cu_seqlens_padded_1, - ) = _pack_sequences_for_megatron( - input_ids, seq_lengths, cp_rank=0, cp_size=1 - ) - - ( - packed_input_ids_2, - _, - packed_seq_params_2, - cu_seqlens_2, - cu_seqlens_padded_2, - ) = _pack_sequences_for_megatron( - input_ids, seq_lengths, cp_rank=0, cp_size=1 - ) - - # Verify consistency - if not torch.equal(packed_input_ids_1, packed_input_ids_2): - return { - "success": False, - "error": "Inconsistent packed_input_ids between calls", - } - - if not torch.equal(cu_seqlens_1, cu_seqlens_2): - return { - "success": False, - "error": "Inconsistent cu_seqlens between calls", - } - - # Verify content preservation - # Extract the first sequence (length 6) and compare with original - first_seq_packed = packed_input_ids_1[0, :6] - first_seq_original = input_ids[0, :6] - - if not torch.equal(first_seq_packed, first_seq_original): - return { - "success": False, - "error": "Content not preserved in first sequence", - } - - # Extract the second sequence (length 4) and compare with original - second_seq_packed = packed_input_ids_1[0, 6:10] - second_seq_original = input_ids[1, :4] - - if not torch.equal(second_seq_packed, second_seq_original): - return { - "success": False, - "error": "Content not preserved in second sequence", - } - - return {"success": True, "error": None} - - except Exception as e: - return {"success": False, "error": f"Consistency test failed: {str(e)}"} - - def _test_edge_cases(self, _pack_sequences_for_megatron): - """Test edge cases and error conditions.""" - try: - # Test 1: Single sequence - batch_size = 1 - seq_len = 10 - vocab_size = 50 - - input_ids = torch.randint( - 0, vocab_size, (batch_size, seq_len), device="cuda" - ) - seq_lengths = torch.tensor([seq_len], device="cuda") - - packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( - _pack_sequences_for_megatron( - input_ids, seq_lengths, cp_rank=0, cp_size=1 - ) - ) - - # Verify single sequence packing - if packed_input_ids.shape != (1, seq_len): - return { - "success": False, - "error": f"Single sequence shape mismatch: expected (1, {seq_len}), got {packed_input_ids.shape}", - } - - expected_cu_seqlens = torch.tensor( - [0, seq_len], device="cuda", dtype=torch.int32 - ) - if not torch.equal(cu_seqlens, expected_cu_seqlens): - return { - "success": False, - "error": f"Single sequence cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", - } - - # Test 2: Empty sequences (length 0) - batch_size = 3 - max_seq_len = 5 - input_ids = torch.randint( - 0, vocab_size, (batch_size, max_seq_len), device="cuda" - ) - seq_lengths = torch.tensor([3, 0, 2], device="cuda") - - packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( - _pack_sequences_for_megatron( - input_ids, seq_lengths, cp_rank=0, cp_size=1 - ) - ) - - # Should handle empty sequences gracefully - expected_total_tokens = 5 # 3 + 0 + 2 - if packed_input_ids.shape != (1, expected_total_tokens): - return { - "success": False, - "error": f"Empty sequence shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids.shape}", - } - - expected_cu_seqlens = torch.tensor( - [0, 3, 3, 5], device="cuda", dtype=torch.int32 - ) - if not torch.equal(cu_seqlens, expected_cu_seqlens): - return { - "success": False, - "error": f"Empty sequence cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", - } - - # Test 3: Large padding values - batch_size = 2 - seq_len = 4 - input_ids = torch.randint( - 0, vocab_size, (batch_size, seq_len), device="cuda" - ) - seq_lengths = torch.tensor([3, 2], device="cuda") - - packed_input_ids, _, packed_seq_params, cu_seqlens, cu_seqlens_padded = ( - _pack_sequences_for_megatron( - input_ids, - seq_lengths, - pad_individual_seqs_to_multiple_of=8, - cp_rank=0, - cp_size=1, - ) - ) - - # With padding to multiple of 8: [3, 2] -> [8, 8] = 16 tokens - expected_total_tokens = 16 - if packed_input_ids.shape != (1, expected_total_tokens): - return { - "success": False, - "error": f"Large padding shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids.shape}", - } - - return {"success": True, "error": None} - - except Exception as e: - return {"success": False, "error": f"Edge cases test failed: {str(e)}"} - - def _test_context_parallel( - self, _pack_sequences_for_megatron, _get_tokens_on_this_cp_rank, rank - ): - """Test sequence packing with context parallelism.""" - # Test parameters - batch_size = 2 - seq_len = 16 # Ensure divisible by cp_size * 2 - vocab_size = 100 - - # Ensure sequence length is compatible with CP - if seq_len % (2 * self.cp_size) != 0: - seq_len = (seq_len // (2 * self.cp_size) + 1) * (2 * self.cp_size) - - # Create test data - torch.manual_seed(42) # For reproducibility - input_ids = torch.arange(seq_len * batch_size, device="cuda").reshape( - batch_size, seq_len - ) - seq_lengths = torch.tensor([seq_len, seq_len], device="cuda") - - # Test 1: CP packing with individual sequence padding - ( - packed_input_ids, - packed_input_ids_cp_sharded, - packed_seq_params, - cu_seqlens, - cu_seqlens_padded, - ) = _pack_sequences_for_megatron( - input_ids, - seq_lengths, - pad_individual_seqs_to_multiple_of=self.cp_size * 2, - cp_rank=rank, - cp_size=self.cp_size, - ) - - # Verify the packed tensor shape - expected_tokens_per_rank = seq_len // self.cp_size - expected_total_tokens = batch_size * expected_tokens_per_rank - if packed_input_ids_cp_sharded.shape != (1, expected_total_tokens): - return { - "success": False, - "error": f"CP packing shape mismatch: expected (1, {expected_total_tokens}), got {packed_input_ids_cp_sharded.shape}", - } - - # Verify cu_seqlens for original sequences - expected_cu_seqlens = torch.tensor( - [0, seq_len, seq_len * 2], device="cuda", dtype=torch.int32 - ) - if not torch.equal(cu_seqlens, expected_cu_seqlens): - return { - "success": False, - "error": f"CP cu_seqlens mismatch: expected {expected_cu_seqlens}, got {cu_seqlens}", - } - - # Verify PackedSeqParams - if packed_seq_params.qkv_format != "thd": - return { - "success": False, - "error": f"CP wrong qkv_format: expected 'thd', got {packed_seq_params.qkv_format}", - } - - # Test 2: CP packing with full sequence padding - pad_full_seq_to = (batch_size * seq_len) + 8 # Add some padding - ( - packed_input_ids_full, - packed_input_ids_cp_sharded, - packed_seq_params_full, - cu_seqlens_full, - cu_seqlens_padded_full, - ) = _pack_sequences_for_megatron( - input_ids, - seq_lengths, - pad_individual_seqs_to_multiple_of=self.cp_size * 2, - pad_packed_seq_to=pad_full_seq_to, - cp_rank=rank, - cp_size=self.cp_size, - ) - - # Verify the packed tensor shape with full padding - expected_tokens_per_rank_full = pad_full_seq_to // self.cp_size - if packed_input_ids_cp_sharded.shape != (1, expected_tokens_per_rank_full): - return { - "success": False, - "error": f"CP full padding shape mismatch: expected (1, {expected_tokens_per_rank_full}), got {packed_input_ids_cp_sharded.shape}", - } - - # Verify cu_seqlens_padded for full padding - expected_cu_seqlens_padded_full = torch.tensor( - [0, seq_len, pad_full_seq_to], device="cuda", dtype=torch.int32 - ) - if not torch.equal(cu_seqlens_padded_full, expected_cu_seqlens_padded_full): - return { - "success": False, - "error": f"CP full padding cu_seqlens_padded mismatch: expected {expected_cu_seqlens_padded_full}, got {cu_seqlens_padded_full}", - } - - correct_ids_0 = torch.tensor( - [0, 1, 2, 3, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 0, 0, 0, 0, 0, 0], - device="cuda", - ) - correct_ids_1 = torch.tensor( - [4, 5, 6, 7, 8, 9, 10, 11, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 0, 0], - device="cuda", - ) - - if ( - rank == 0 - and torch.sum(torch.abs(packed_input_ids_cp_sharded - correct_ids_0)).item() - != 0 - ): - return { - "success": False, - "error": f"CP full padding ids mismatch: expected {correct_ids_0}, got {packed_input_ids_cp_sharded[0, :20]}", - } - if ( - rank == 1 - and torch.sum(torch.abs(packed_input_ids_cp_sharded - correct_ids_1)).item() - != 0 - ): - return { - "success": False, - "error": f"CP full padding ids mismatch: expected {correct_ids_1}, got {packed_input_ids_cp_sharded[0, 20:]}", - } - - return {"success": True, "error": None} - - PACK_SEQUENCES_TEST_ACTOR_FQN = ( f"{PackSequencesTestActor.__module__}.PackSequencesTestActor" ) @@ -1311,425 +774,6 @@ def test_pack_sequences_with_context_parallel(pack_sequences_setup): print(f" Error: {test_result['error']}") -@ray.remote(num_gpus=1) -class GetPackSequenceParametersTestActor: - def __init__(self): - pass - - def run_all_get_pack_sequence_parameters_for_megatron_tests(self): - """Test _get_pack_sequence_parameters_for_megatron function with various configurations.""" - from nemo_rl.models.megatron.data import ( - _get_pack_sequence_parameters_for_megatron, - ) - - # Test 1: Basic configuration - no parallelism, no FP8 - megatron_cfg = { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 1, - } - max_seq_len = 1023 - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - if pad_individual != 1 or pad_packed != 1 or pad_to is not None: - return { - "success": False, - "error": f"Expected pad_individual=1, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 2: Context parallelism only - megatron_cfg = { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 4, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - if pad_individual != 4 * 2 or pad_packed != 1 or pad_to is not None: - return { - "success": False, - "error": f"Expected pad_individual=4*2, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 3: Tensor parallelism with sequence parallelism - megatron_cfg = { - "tensor_model_parallel_size": 2, - "sequence_parallel": True, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 1, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - expected_individual = 2 # tp_size when SP is enabled - if pad_individual != 2 or pad_packed != 1 or pad_to is not None: - return { - "success": False, - "error": f"Expected pad_individual=2, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 4: Tensor parallelism without sequence parallelism - megatron_cfg = { - "tensor_model_parallel_size": 2, - "sequence_parallel": False, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 1, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - if pad_individual != 1 or pad_packed != 1 or pad_to is not None: - return { - "success": False, - "error": f"Expected pad_individual=1, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 5: Pipeline parallelism - megatron_cfg = { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 4, - "context_parallel_size": 1, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - if pad_individual != 1 or pad_packed != 1 or pad_to != max_seq_len: - return { - "success": False, - "error": f"Expected pad_individual=1, pad_packed=1, pad_to={max_seq_len}, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 6: Combined CP and TP with SP - megatron_cfg = { - "tensor_model_parallel_size": 2, - "sequence_parallel": True, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 4, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - expected_individual = 4 * 2 * 2 # cp_size * 2 * tp_size - if ( - pad_individual != expected_individual - or pad_packed != 1 - or pad_to is not None - ): - return { - "success": False, - "error": f"Expected pad_individual={expected_individual}, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 7: FP8 enabled with default recipe - megatron_cfg = { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 1, - "fp8_cfg": { - "enabled": True, - "fp8": "hybrid", - "fp8_recipe": "tensorwise", - "fp8_param": False, - }, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - if pad_individual != 1 or pad_packed != 16 or pad_to is not None: - return { - "success": False, - "error": f"Expected pad_individual=1, pad_packed=16, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 8: FP8 enabled with blockwise recipe - megatron_cfg = { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 1, - "fp8_cfg": { - "enabled": True, - "fp8": "e4m3", - "fp8_recipe": "blockwise", - "fp8_param": False, - }, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - if pad_individual != 1 or pad_packed != 128 or pad_to is not None: - return { - "success": False, - "error": f"Expected pad_individual=1, pad_packed=128, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 9: FP8 with CP and TP+SP - megatron_cfg = { - "tensor_model_parallel_size": 2, - "sequence_parallel": True, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 4, - "fp8_cfg": { - "enabled": True, - "fp8": "e4m3", - "fp8_recipe": "blockwise", - "fp8_param": False, - }, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - expected_individual = 4 * 2 * 2 # cp_size * 2 * tp_size - expected_packed = 128 * 4 * 2 * 2 # divisor * cp_size * 2 * tp_size - if ( - pad_individual != expected_individual - or pad_packed != expected_packed - or pad_to is not None - ): - return { - "success": False, - "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 10: All parallelism types with FP8 and PP - megatron_cfg = { - "tensor_model_parallel_size": 2, - "sequence_parallel": True, - "pipeline_model_parallel_size": 4, - "context_parallel_size": 2, - "fp8_cfg": { - "enabled": True, - "fp8": "hybrid", - "fp8_recipe": "tensorwise", - "fp8_param": False, - }, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - expected_individual = 2 * 2 * 2 # cp_size * 2 * tp_size - expected_packed = 16 * 2 * 2 * 2 # divisor * cp_size * 2 * tp_size - - def _round_up_to_multiple_of(x, y): - return (x + y - 1) // y * y - - if ( - pad_individual != expected_individual - or pad_packed != expected_packed - or pad_to != _round_up_to_multiple_of(max_seq_len, expected_packed) - ): - return { - "success": False, - "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to={max_seq_len}, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 11: FP8 disabled explicitly - megatron_cfg = { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 1, - "fp8_cfg": { - "enabled": False, - "fp8": "e4m3", - "fp8_recipe": "blockwise", - "fp8_param": False, - }, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - if pad_individual != 1 or pad_packed != 1 or pad_to is not None: - return { - "success": False, - "error": f"Expected pad_individual=1, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 12: Missing fp8_cfg (should default to disabled) - megatron_cfg = { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 1, - # No fp8_cfg key - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - if pad_individual != 1 or pad_packed != 1 or pad_to is not None: - return { - "success": False, - "error": f"Expected pad_individual=1, pad_packed=1, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 13: Edge case - very large parallelism values - megatron_cfg = { - "tensor_model_parallel_size": 8, - "sequence_parallel": True, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 8, - "fp8_cfg": { - "enabled": True, - "fp8": "e4m3", - "fp8_recipe": "blockwise", - "fp8_param": False, - }, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - expected_individual = 8 * 2 * 8 # cp_size * 2 * tp_size = 128 - expected_packed = 128 * 8 * 2 * 8 # divisor * cp_size * 2 * tp_size = 16384 - if ( - pad_individual != expected_individual - or pad_packed != expected_packed - or pad_to is not None - ): - return { - "success": False, - "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 14: Edge case - different max_seq_len values with PP - for test_seq_len in [512, 2048, 4096]: - megatron_cfg = { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 2, - "context_parallel_size": 1, - } - - pad_individual, pad_packed, pad_to = ( - _get_pack_sequence_parameters_for_megatron(megatron_cfg, test_seq_len) - ) - - if pad_individual != 1 or pad_packed != 1 or pad_to != test_seq_len: - return { - "success": False, - "error": f"Expected pad_individual=1, pad_packed=1, pad_to={test_seq_len}, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 15: FP8 with MXFP8 recipe - megatron_cfg = { - "tensor_model_parallel_size": 1, - "sequence_parallel": False, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 1, - "fp8_cfg": { - "enabled": True, - "fp8": "e4m3", - "fp8_recipe": "mxfp8", - "fp8_param": False, - }, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - if pad_individual != 1 or pad_packed != 32 or pad_to is not None: - return { - "success": False, - "error": f"Expected pad_individual=1, pad_packed=32, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 16: FP8 with MXFP8 recipe, CP, and TP+SP - megatron_cfg = { - "tensor_model_parallel_size": 2, - "sequence_parallel": True, - "pipeline_model_parallel_size": 1, - "context_parallel_size": 4, - "fp8_cfg": { - "enabled": True, - "fp8": "e4m3", - "fp8_recipe": "mxfp8", - "fp8_param": False, - }, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - expected_individual = 4 * 2 * 2 # cp_size * 2 * tp_size - expected_packed = 32 * 4 * 2 * 2 # divisor * cp_size * 2 * tp_size - - if ( - pad_individual != expected_individual - or pad_packed != expected_packed - or pad_to is not None - ): - return { - "success": False, - "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to=None, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - # Test 17: FP8 with MXFP8 recipe, CP, TP+SP, and PP - megatron_cfg = { - "tensor_model_parallel_size": 2, - "sequence_parallel": True, - "pipeline_model_parallel_size": 4, - "context_parallel_size": 4, - "fp8_cfg": { - "enabled": True, - "fp8": "e4m3", - "fp8_recipe": "mxfp8", - "fp8_param": False, - }, - } - - pad_individual, pad_packed, pad_to = _get_pack_sequence_parameters_for_megatron( - megatron_cfg, max_seq_len - ) - - expected_individual = 4 * 2 * 2 # cp_size * 2 * tp_size - expected_packed = 32 * 4 * 2 * 2 # divisor * cp_size * 2 * tp_size * pp_size - - if ( - pad_individual != expected_individual - or pad_packed != expected_packed - or pad_to != _round_up_to_multiple_of(max_seq_len, expected_packed) - ): - return { - "success": False, - "error": f"Expected pad_individual={expected_individual}, pad_packed={expected_packed}, pad_to={max_seq_len}, got pad_individual={pad_individual}, pad_packed={pad_packed}, pad_to={pad_to}", - } - - return {"success": True, "error": None} - - GET_PACK_SEQUENCE_PARAMETERS_TEST_ACTOR_FQN = f"{GetPackSequenceParametersTestActor.__module__}.GetPackSequenceParametersTestActor" diff --git a/uv.lock b/uv.lock index e0c3cda97f..8c95e36ee9 100644 --- a/uv.lock +++ b/uv.lock @@ -211,7 +211,7 @@ overrides = [ { name = "torch", marker = "sys_platform != 'darwin'", specifier = "==2.9.0", index = "https://download.pytorch.org/whl/cu129" }, { name = "torch", marker = "sys_platform == 'darwin'", specifier = "==2.9.0", index = "https://pypi.org/simple" }, { name = "torchaudio", specifier = "==2.9.0" }, - { name = "transformer-engine", extras = ["pytorch"], specifier = "==2.8.0" }, + { name = "transformer-engine", extras = ["pytorch"], specifier = ">=2.9.0a0,<2.12.0" }, ] [[manifest.dependency-metadata]] @@ -555,6 +555,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ae/69/fe387b0f70ed608a363a90036e08ef8c1e844e5c98145502160661012dc0/apache_tvm_ffi-0.1.4-cp314-cp314t-win_amd64.whl", hash = "sha256:1bceda57240d03a3cf026334521c0595d097ab92b6d0df7485cbb37b2c056c27", size = 1794928, upload-time = "2025-11-30T07:21:25.234Z" }, ] +[[package]] +name = "asgiref" +version = "3.11.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/63/40/f03da1264ae8f7cfdbf9146542e5e7e8100a4c66ab48e791df9a03d3f6c0/asgiref-3.11.1.tar.gz", hash = "sha256:5f184dc43b7e763efe848065441eac62229c9f7b0475f41f80e207a114eda4ce", size = 38550, upload-time = "2026-02-03T13:30:14.33Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/0a/a72d10ed65068e115044937873362e6e32fab1b7dce0046aeb224682c989/asgiref-3.11.1-py3-none-any.whl", hash = "sha256:e8667a091e69529631969fd45dc268fa79b99c92c5fcdda727757e52146ec133", size = 24345, upload-time = "2026-02-03T13:30:13.039Z" }, +] + [[package]] name = "astor" version = "0.8.1" @@ -1924,16 +1933,16 @@ wheels = [ [[package]] name = "fla-core" -version = "0.3.2" +version = "0.4.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "einops" }, { name = "torch", version = "2.9.0", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'darwin' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, { name = "torch", version = "2.9.0+cu129", source = { registry = "https://download.pytorch.org/whl/cu129" }, marker = "sys_platform != 'darwin' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/67/c6/10a1149b07e6bab45b2cb2d07f6b827716c2baf5f3404161753f25c6389b/fla_core-0.3.2.tar.gz", hash = "sha256:d38db16bc4e1c6fa8c04df442f246da1e6926a209426bc6ef703d41bfbc37c92", size = 296725, upload-time = "2025-09-10T07:43:40.155Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f1/de/0d6bd5664ba2e711cabdde11ccb41ddcdd866c531e40900af3601bd7b8c6/fla_core-0.4.1.tar.gz", hash = "sha256:38ab28966eeadc2141b29e87c2bf72a8a4851e00af9d25bbbc3596b1fb53450d", size = 319608, upload-time = "2025-12-24T18:07:37.669Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/7e/f5/74947b33c07682280e65adbdf17c4ee94b30232df2f728bafecf13d1d820/fla_core-0.3.2-py3-none-any.whl", hash = "sha256:e751d5a41e33eee721a6fb6588bd857f6f36e0d14719a23b1ebdbd617d307209", size = 413594, upload-time = "2025-09-10T07:43:37.786Z" }, + { url = "https://files.pythonhosted.org/packages/f6/43/945ef69eb48a14c30fd7323d3e0b560c821ae71e6d3ef979e06a901bc3b9/fla_core-0.4.1-py3-none-any.whl", hash = "sha256:93c6afe4c80fc7bc705fa8aeea6a46d2cf2d77383f9619a41863c7114c801bab", size = 437282, upload-time = "2025-12-24T18:07:34.41Z" }, ] [[package]] @@ -1952,17 +1961,15 @@ sdist = { url = "https://files.pythonhosted.org/packages/e8/6d/7066d160bdffa2f9d [[package]] name = "flash-linear-attention" -version = "0.3.2" +version = "0.4.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "datasets" }, { name = "fla-core" }, - { name = "pytest" }, { name = "transformers" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/84/f6/e62c1e562a288557eba7f06f168a7615813d1a227327b8beb8ba426da2c5/flash_linear_attention-0.3.2.tar.gz", hash = "sha256:9147747316c2951fed4ebeb4fa87977c05d807dc70c93b46250b68a6eb1183e2", size = 150880, upload-time = "2025-09-10T07:43:41.37Z" } +sdist = { url = "https://files.pythonhosted.org/packages/46/83/7d8ec7ffb5229080b1c9b772338ff588cbd63282ac355ede2a12a6e174a8/flash_linear_attention-0.4.1.tar.gz", hash = "sha256:127ee7273ed15ac17f72bcf4c75e1051719d8fbe0a2d1d047e59406f36d81ee2", size = 158280, upload-time = "2025-12-24T18:07:38.812Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/a0/d0/35ce9eac5f52c72005095aaa12a393d2656ed7ffedf925b2381a6b76d10c/flash_linear_attention-0.3.2-py3-none-any.whl", hash = "sha256:604e73361437ba786420ab195e2caa3fd19280503761e703fa353c5ce5c65376", size = 274592, upload-time = "2025-09-10T07:43:39.107Z" }, + { url = "https://files.pythonhosted.org/packages/63/d5/6327559a9d5b9243b10c3984f1bcef256ed2ad06d105a3bb8f7b2979659c/flash_linear_attention-0.4.1-py3-none-any.whl", hash = "sha256:d18bdfe9d1f4b424676444eac9d50fb8433b70e5d4e0e0878b20bcbcdbea57ce", size = 287415, upload-time = "2025-12-24T18:07:35.815Z" }, ] [[package]] @@ -2141,6 +2148,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ec/f9/7f9263c5695f4bd0023734af91bedb2ff8209e8de6ead162f35d8dc762fd/flask-3.1.2-py3-none-any.whl", hash = "sha256:ca1d8112ec8a6158cc29ea4858963350011b5c846a414cdb7a954aa9e967d03c", size = 103308, upload-time = "2025-08-19T21:03:19.499Z" }, ] +[package.optional-dependencies] +async = [ + { name = "asgiref" }, +] + [[package]] name = "flask-cors" version = "6.0.1" @@ -2851,6 +2863,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c6/50/e0edd38dcd63fb26a8547f13d28f7a008bc4a3fd4eb4ff030673f22ad41a/hydra_core-1.3.2-py3-none-any.whl", hash = "sha256:fa0238a9e31df3373b35b0bfb672c34cc92718d21f81311d8996a16de1141d8b", size = 154547, upload-time = "2023-02-23T18:33:40.801Z" }, ] +[[package]] +name = "hypercorn" +version = "0.18.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "h11" }, + { name = "h2" }, + { name = "priority" }, + { name = "wsproto" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/44/01/39f41a014b83dd5c795217362f2ca9071cf243e6a75bdcd6cd5b944658cc/hypercorn-0.18.0.tar.gz", hash = "sha256:d63267548939c46b0247dc8e5b45a9947590e35e64ee73a23c074aa3cf88e9da", size = 68420, upload-time = "2025-11-08T13:54:04.78Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/93/35/850277d1b17b206bd10874c8a9a3f52e059452fb49bb0d22cbb908f6038b/hypercorn-0.18.0-py3-none-any.whl", hash = "sha256:225e268f2c1c2f28f6d8f6db8f40cb8c992963610c5725e13ccfcddccb24b1cd", size = 61640, upload-time = "2025-11-08T13:54:03.202Z" }, +] + [[package]] name = "hyperframe" version = "6.1.0" @@ -3596,6 +3623,7 @@ dependencies = [ { name = "hydra-core" }, { name = "mamba-ssm" }, { name = "megatron-core" }, + { name = "mlflow" }, { name = "nvidia-resiliency-ext" }, { name = "omegaconf" }, { name = "open-clip-torch" }, @@ -3606,6 +3634,8 @@ dependencies = [ { name = "six" }, { name = "tensorboard" }, { name = "timm" }, + { name = "torch", version = "2.9.0", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'darwin' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, + { name = "torch", version = "2.9.0+cu129", source = { registry = "https://download.pytorch.org/whl/cu129" }, marker = "sys_platform != 'darwin' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, { name = "tqdm" }, { name = "transformer-engine", extra = ["pytorch"] }, { name = "transformers" }, @@ -3617,11 +3647,12 @@ dependencies = [ requires-dist = [ { name = "accelerate" }, { name = "causal-conv1d", git = "https://github.com/Dao-AILab/causal-conv1d?rev=67e0a9dfe1518fc0036444e9ab5fe06ab78299e0" }, - { name = "datasets" }, + { name = "datasets", specifier = ">=2.20.0" }, { name = "flash-linear-attention" }, { name = "hydra-core", specifier = ">1.3,<=1.3.2" }, { name = "mamba-ssm", git = "https://github.com/state-spaces/mamba.git?rev=d68d16ed7d5d5164eb5a57c0285f3b7eb8394ec1" }, { name = "megatron-core", extras = ["dev", "mlm"], editable = "3rdparty/Megatron-LM-workspace" }, + { name = "mlflow", specifier = ">=3.5.0" }, { name = "nvidia-resiliency-ext" }, { name = "omegaconf", specifier = ">=2.3.0" }, { name = "open-clip-torch", specifier = ">=3.2.0" }, @@ -3632,8 +3663,10 @@ requires-dist = [ { name = "six", specifier = ">=1.17.0" }, { name = "tensorboard", specifier = ">=2.19.0" }, { name = "timm" }, + { name = "torch", marker = "sys_platform != 'darwin'", specifier = ">=2.6.0", index = "https://download.pytorch.org/whl/cu129" }, + { name = "torch", marker = "sys_platform == 'darwin'", specifier = ">=2.6.0", index = "https://pypi.org/simple" }, { name = "tqdm", specifier = ">=4.67.1" }, - { name = "transformer-engine", extras = ["pytorch"], specifier = ">=2.10.0a0,<2.12.0" }, + { name = "transformer-engine", extras = ["core-cu13", "pytorch"], specifier = ">=2.10.0a0,<2.13.0" }, { name = "transformers", specifier = "<5.0.0" }, { name = "typing-extensions" }, { name = "wandb", specifier = ">=0.19.10" }, @@ -3652,6 +3685,8 @@ dependencies = [ { name = "flash-linear-attention" }, { name = "flashinfer-python", version = "0.5.2", source = { registry = "https://pypi.org/simple" }, marker = "extra == 'extra-7-nemo-rl-vllm' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang')" }, { name = "flashinfer-python", version = "0.5.3", source = { registry = "https://pypi.org/simple" }, marker = "extra == 'extra-7-nemo-rl-sglang' or extra != 'extra-7-nemo-rl-vllm'" }, + { name = "flask", extra = ["async"] }, + { name = "hypercorn" }, { name = "mamba-ssm" }, { name = "megatron-energon", extra = ["av-decode"] }, { name = "multi-storage-client" }, @@ -3661,6 +3696,7 @@ dependencies = [ { name = "nvidia-resiliency-ext" }, { name = "nvtx" }, { name = "onnxscript" }, + { name = "openai" }, { name = "opentelemetry-api" }, { name = "packaging" }, { name = "tensorstore", version = "0.1.74", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.13' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, @@ -3680,8 +3716,10 @@ requires-dist = [ { name = "einops", specifier = "~=0.8" }, { name = "emerging-optimizers", git = "https://github.com/NVIDIA-NeMo/Emerging-Optimizers.git?rev=v0.1.0" }, { name = "fastapi", specifier = "~=0.50" }, - { name = "flash-linear-attention", specifier = "~=0.3.2" }, + { name = "flash-linear-attention", specifier = "~=0.4.0" }, { name = "flashinfer-python", specifier = "~=0.5.0" }, + { name = "flask", extras = ["async"] }, + { name = "hypercorn" }, { name = "mamba-ssm", git = "https://github.com/state-spaces/mamba.git?rev=d68d16ed7d5d5164eb5a57c0285f3b7eb8394ec1" }, { name = "megatron-energon", extras = ["av-decode"], specifier = "~=6.0" }, { name = "multi-storage-client", specifier = "~=0.27" }, @@ -3691,11 +3729,12 @@ requires-dist = [ { name = "nvidia-resiliency-ext" }, { name = "nvtx", specifier = "~=0.2" }, { name = "onnxscript" }, + { name = "openai" }, { name = "opentelemetry-api", specifier = "~=1.33.1" }, { name = "packaging", specifier = ">=24.2" }, { name = "tensorstore", specifier = "~=0.1,!=0.1.46,!=0.1.72" }, - { name = "torch", marker = "sys_platform != 'darwin'", index = "https://download.pytorch.org/whl/cu129" }, - { name = "torch", marker = "sys_platform == 'darwin'", index = "https://pypi.org/simple" }, + { name = "torch", marker = "sys_platform != 'darwin'", specifier = ">=2.6.0", index = "https://download.pytorch.org/whl/cu129" }, + { name = "torch", marker = "sys_platform == 'darwin'", specifier = ">=2.6.0", index = "https://pypi.org/simple" }, { name = "tqdm" }, { name = "transformer-engine", extras = ["core-cu13", "pytorch"], specifier = ">=2.9.0a0,<2.12.0" }, { name = "wget" }, @@ -4855,8 +4894,8 @@ requires-dist = [ { name = "torchdata" }, { name = "torchvision", marker = "sys_platform != 'darwin'", specifier = ">=0.22.0", index = "https://download.pytorch.org/whl/cu129" }, { name = "torchvision", marker = "sys_platform == 'darwin'", specifier = ">=0.22.0", index = "https://pypi.org/simple" }, - { name = "transformer-engine", extras = ["pytorch"], marker = "extra == 'automodel'", specifier = "==2.8.0" }, - { name = "transformer-engine", extras = ["pytorch"], marker = "extra == 'mcore'", specifier = "==2.8.0" }, + { name = "transformer-engine", extras = ["pytorch"], marker = "extra == 'automodel'", specifier = ">=2.9.0a0,<2.12.0" }, + { name = "transformer-engine", extras = ["pytorch"], marker = "extra == 'mcore'", specifier = ">=2.9.0a0,<2.12.0" }, { name = "transformers", specifier = "==4.57.1" }, { name = "triton", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')", index = "https://download.pytorch.org/whl/cu129" }, { name = "uvloop", marker = "extra == 'sglang'" }, @@ -6132,6 +6171,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/c7/5613524e606ea1688b3bdbf48aa64bafb6d0a4ac3750274c43b6158a390f/prettytable-3.16.0-py3-none-any.whl", hash = "sha256:b5eccfabb82222f5aa46b798ff02a8452cf530a352c31bddfa29be41242863aa", size = 33863, upload-time = "2025-03-24T19:39:02.359Z" }, ] +[[package]] +name = "priority" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f5/3c/eb7c35f4dcede96fca1842dac5f4f5d15511aa4b52f3a961219e68ae9204/priority-2.0.0.tar.gz", hash = "sha256:c965d54f1b8d0d0b19479db3924c7c36cf672dbf2aec92d43fbdaf4492ba18c0", size = 24792, upload-time = "2021-06-27T10:15:05.487Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5e/5f/82c8074f7e84978129347c2c6ec8b6c59f3584ff1a20bc3c940a3e061790/priority-2.0.0-py3-none-any.whl", hash = "sha256:6f8eefce5f3ad59baf2c080a664037bb4725cd0a790d53d59ab4059288faf6aa", size = 8946, upload-time = "2021-06-27T10:15:03.856Z" }, +] + [[package]] name = "prometheus-client" version = "0.22.1" @@ -9178,13 +9226,10 @@ wheels = [ [[package]] name = "transformer-engine" -version = "2.8.0" +version = "2.11.0" source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "transformer-engine-cu12" }, -] wheels = [ - { url = "https://files.pythonhosted.org/packages/72/be/a7cf5f28b7abbe966956217b18208fc34cc9bfaf62fa9472c0603db74899/transformer_engine-2.8.0-py3-none-any.whl", hash = "sha256:795b056d31b0f67f5d7432725177782dd5084090c1e3c52532577070947fe9a7", size = 638319, upload-time = "2025-10-07T04:55:34.115Z" }, + { url = "https://files.pythonhosted.org/packages/00/33/44571ec584c88e1715f4c2afefc0ddd45064c7065ac1c6ffc8e832bc3ba3/transformer_engine-2.11.0-py3-none-any.whl", hash = "sha256:7ee1eae8fa6b0cb471c6066aa3555304fda8537174e5019929dc0c8655071df3", size = 723110, upload-time = "2026-01-02T09:58:23.245Z" }, ] [package.optional-dependencies] @@ -9194,7 +9239,7 @@ pytorch = [ [[package]] name = "transformer-engine-cu12" -version = "2.8.0" +version = "2.11.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "importlib-metadata" }, @@ -9202,22 +9247,25 @@ dependencies = [ { name = "pydantic" }, ] wheels = [ - { url = "https://files.pythonhosted.org/packages/53/db/cde3e772cf5cd7e941b64d37e4a61e2762f36ecc2e6508525af536076f8d/transformer_engine_cu12-2.8.0-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:3a92c781fc3c1a3a6a0009871a36903fa364b2d51ce06b06641d29aeefd59310", size = 480373707, upload-time = "2025-10-07T05:03:05.392Z" }, - { url = "https://files.pythonhosted.org/packages/b9/14/67860f2f1f9d0eca4a8e5e0cef5a0de5c4fc26340625051f032d16913d8c/transformer_engine_cu12-2.8.0-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:8ee5f9df586491a35fd1a01cb95b6970a9f01a5e8f935ecdacd56173c44d6a67", size = 480875025, upload-time = "2025-10-07T04:54:43.762Z" }, + { url = "https://files.pythonhosted.org/packages/05/27/5c4c27cb245a3513e5ad7ccef50e2e9688996e2cc558edbbb575dfcca276/transformer_engine_cu12-2.11.0-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:ed5fda0925cb304d6864b451d8d012c579d5bd097bfefefca769b2704b06381a", size = 287630565, upload-time = "2026-01-02T09:56:43.645Z" }, + { url = "https://files.pythonhosted.org/packages/fa/a2/1439bbb6bc7d4d6045bad7d213884f7be92301c0982f009e3bbafa40e4ff/transformer_engine_cu12-2.11.0-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:6e5c0707583b2a90b2570da6f57409c6802653e069dfec38cf07a3b77ba9b12d", size = 288159349, upload-time = "2026-01-02T09:57:56.435Z" }, ] [[package]] name = "transformer-engine-torch" -version = "2.8.0" +version = "2.11.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "einops" }, { name = "onnx" }, { name = "onnxscript" }, + { name = "packaging" }, + { name = "pydantic" }, { name = "torch", version = "2.9.0", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'darwin' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, { name = "torch", version = "2.9.0+cu129", source = { registry = "https://download.pytorch.org/whl/cu129" }, marker = "sys_platform != 'darwin' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, + { name = "transformer-engine-cu12" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/38/63/1e3953244ed4f318f87889309a56cdd664759f007967eb850ee415a5584d/transformer_engine_torch-2.8.0.tar.gz", hash = "sha256:ce09f1bd9b8e532a5c347b9e9b3a3a771722095daddca673ae82ccce8e68d759", size = 209805, upload-time = "2025-10-07T04:54:11.134Z" } +sdist = { url = "https://files.pythonhosted.org/packages/09/42/068a40f5b213a3a8899e3885eb178776662897abed03cd725953d1106c39/transformer_engine_torch-2.11.0.tar.gz", hash = "sha256:b58d6322bdf885dfab0646da572aff9cf090b332ad470559aa58883c231e1816", size = 242065, upload-time = "2026-01-02T09:58:58.423Z" } [[package]] name = "transformers" @@ -9861,6 +9909,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1f/f6/a933bd70f98e9cf3e08167fc5cd7aaaca49147e48411c0bd5ae701bb2194/wrapt-1.17.3-py3-none-any.whl", hash = "sha256:7171ae35d2c33d326ac19dd8facb1e82e5fd04ef8c6c0e394d7af55a55051c22", size = 23591, upload-time = "2025-08-12T05:53:20.674Z" }, ] +[[package]] +name = "wsproto" +version = "1.3.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c7/79/12135bdf8b9c9367b8701c2c19a14c913c120b882d50b014ca0d38083c2c/wsproto-1.3.2.tar.gz", hash = "sha256:b86885dcf294e15204919950f666e06ffc6c7c114ca900b060d6e16293528294", size = 50116, upload-time = "2025-11-20T18:18:01.871Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a4/f5/10b68b7b1544245097b2a1b8238f66f2fc6dcaeb24ba5d917f52bd2eed4f/wsproto-1.3.2-py3-none-any.whl", hash = "sha256:61eea322cdf56e8cc904bd3ad7573359a242ba65688716b0710a5eb12beab584", size = 24405, upload-time = "2025-11-20T18:18:00.454Z" }, +] + [[package]] name = "xattr" version = "1.3.0"