Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions examples/recipes/decentralized_pg/pretrain_qwen3_vl_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
==============================================================================
Example: Qwen3_VL Pretraining with Decentralized Process Groups (Simple)
==============================================================================

This example demonstrates the simplest way to enable decentralized process groups:
just use an existing recipe and set `cfg.dist.use_decentralized_pg = True`.

The setup() function inside pretrain() will automatically create the
ProcessGroupCollection using HyperCommGrid based on the parallelism settings.

How to Run
----------
# 8 GPUs: EP8
uv run python -m torch.distributed.run --nproc_per_node=8 examples/recipes/decentralized_pg/pretrain_qwen3_vl_simple.py
"""

import torch

from megatron.bridge.recipes.qwen_vl.qwen3_vl import qwen3_vl_30b_a3b_pretrain_config
from megatron.bridge.training.pretrain import pretrain
from megatron.bridge.training.vlm_step import forward_step


def main() -> None:
"""Run Qwen3 pretraining with decentralized process groups enabled."""
# Get the standard Qwen3 4B pretrain config with overrides
cfg = qwen3_vl_30b_a3b_pretrain_config(
# Use mock data for demo
mock=True,
# Parallelism
expert_model_parallel_size=8,
# Training settings (small for demo)
train_iters=100,
seq_length=1024,
global_batch_size=32,
micro_batch_size=1,
# LR schedule (must fit within train_iters)
lr_warmup_iters=10,
lr_decay_iters=100,
)
# known issue with share_embeddings_and_output_weights
cfg.model.share_embeddings_and_output_weights = False

# =========================================================================
# KEY: Enable decentralized process groups
# =========================================================================
cfg.dist.use_decentralized_pg = True
cfg.dist.use_gloo_process_groups = False # Gloo not supported with decentralized PG

pretrain(config=cfg, forward_step_func=forward_step)

# Cleanup
if torch.distributed.is_initialized():
torch.distributed.barrier()
torch.distributed.destroy_process_group()
Comment on lines +68 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the pretrain function to understand its cleanup behavior
fd '.*pretrain.*\.py' | head -20

Repository: NVIDIA-NeMo/Megatron-Bridge

Length of output: 1262


🏁 Script executed:

#!/bin/bash
# Check for destroy_process_group in pretrain-related files
rg -n "destroy_process_group|destroy_model_parallel" --type=py -C3 -g '*pretrain*'

Repository: NVIDIA-NeMo/Megatron-Bridge

Length of output: 8630


🏁 Script executed:

#!/bin/bash
# Also check the specific file mentioned in the review
cat -n examples/recipes/decentralized_pg/pretrain_qwen3_vl_simple.py | head -100

Repository: NVIDIA-NeMo/Megatron-Bridge

Length of output: 3428


Remove manual process group cleanup; pretrain() handles this internally.

The pretrain() function already manages distributed cleanup via _maybe_destroy_process_group(), which destroys the process group only if it was created by pretrain() itself (lines 111, 191 in src/megatron/bridge/training/pretrain.py). The manual barrier() + destroy_process_group() call will cause a runtime error in the typical scenario where the process group is not initialized before pretrain() is called, since pretrain() will have already destroyed it upon return.

Delete lines 68-71 (or the equivalent cleanup block in similar example files).

🤖 Prompt for AI Agents
In `@examples/recipes/decentralized_pg/pretrain_qwen3_vl_simple.py` around lines
68 - 71, Remove the manual distributed cleanup block that calls
torch.distributed.is_initialized(), torch.distributed.barrier(), and
torch.distributed.destroy_process_group() in the example; pretrain() already
performs cleanup via _maybe_destroy_process_group(), so delete the
barrier/destroy_process_group block (the lines around the manual cleanup) to
avoid double-destroy/runtime errors when pretrain() has already torn down the
process group.



if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

model:
seq_length: 4096
use_hf_vision_model: false
tensor_model_parallel_size: 1
pipeline_model_parallel_size: 1
context_parallel_size: 1
cross_entropy_loss_fusion: false

train:
train_iters: 20
global_batch_size: 8
micro_batch_size: 1
eval_iters: 5
train_iters: 1000
global_batch_size: 16
micro_batch_size: 2
eval_iters: 100

optimizer:
lr: 0.00025
Expand All @@ -40,6 +45,8 @@ dist:

logger:
log_interval: 1
log_throughput: true
log_params_norm: true

dataset:
sequence_length: 4096
Expand All @@ -50,4 +57,14 @@ rng:
ddp:
grad_reduce_in_fp32: true

profiling:
memory_snapshot_path: snapshot.pickle
nvtx_ranges: false
profile_ranks: [0]
profile_step_end: 12
profile_step_start: 10
record_memory_history: false
record_shapes: false
use_nsys_profiler: false
use_pytorch_profiler: false

5 changes: 3 additions & 2 deletions examples/recipes/qwen_vl/finetune_qwen_vl.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@
create_omegaconf_dict_config,
parse_hydra_overrides,
)
from megatron.bridge.training.vlm_step import forward_step
from megatron.bridge.training.qwen3vl_step import forward_step
from megatron.bridge.utils.common_utils import get_rank_safe

from functools import partial

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -185,6 +185,7 @@ def parse_cli_args() -> Tuple[argparse.Namespace, list[str]]:
help="Use preloaded dataset provider (enabled automatically when --data-path is set).",
)
parser.add_argument("--debug", action="store_true", help="Enable debug logging")

args, cli_dotlist_overrides = parser.parse_known_args()
return args, cli_dotlist_overrides

Expand Down
2 changes: 1 addition & 1 deletion scripts/performance/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def parse_cli_args():
parser.add_argument(
"--domain",
type=lower_str,
choices=["llm", "vlm"],
choices=["llm", "vlm", "qwen3vl"],
help="Domain to use for experiment.",
default="llm",
)
Expand Down
3 changes: 3 additions & 0 deletions scripts/performance/run_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from megatron.bridge.training.gpt_step import forward_step
from megatron.bridge.training.pretrain import pretrain
from megatron.bridge.training.vlm_step import forward_step as vlm_forward_step
from megatron.bridge.training.qwen3vl_step import forward_step as qwen3vl_forward_step


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -61,6 +62,8 @@ def main():
# Select forward step function based on the model family name.
if args.domain == "vlm":
forward_step_func = vlm_forward_step
elif args.domain == "qwen3vl":
forward_step_func = qwen3vl_forward_step
else:
forward_step_func = forward_step

Expand Down
40 changes: 40 additions & 0 deletions scripts/performance/utils/overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,46 @@ def _set_cuda_graph_overrides(
return recipe


def _set_vision_cuda_graph_overrides(
recipe: ConfigContainer,
vision_cuda_graph_impl: Optional[str] = None,
vision_cuda_graph_scope: Optional[str | List[str]] = None,
) -> ConfigContainer:
"""Set the vision encoder CUDA graph overrides.

This enables TE CUDA graph for the vision encoder separately from the language model.

Args:
recipe: The config container
vision_cuda_graph_impl: Vision encoder CUDA graph implementation ("none" or "transformer_engine")
vision_cuda_graph_scope: Vision encoder CUDA graph scope (e.g., ["attn"])

Returns:
Updated config container
"""
if isinstance(vision_cuda_graph_scope, str):
vision_cuda_graph_scope = [vision_cuda_graph_scope]

if vision_cuda_graph_impl is not None:
recipe.model.vision_cuda_graph_impl = vision_cuda_graph_impl

if vision_cuda_graph_impl == "transformer_engine":
# Ensure TE RNG tracker is enabled for CUDA graph compatibility
recipe.rng.te_rng_tracker = recipe.model.use_te_rng_tracker = True

valid_te_scopes = ["attn", "mlp"] # Vision encoder typically only has attn and mlp
if vision_cuda_graph_scope:
assert all(scope in valid_te_scopes for scope in vision_cuda_graph_scope), (
f"Invalid vision cuda graph scope: {vision_cuda_graph_scope}. "
f"Valid options for vision encoder are: {valid_te_scopes}"
)

if vision_cuda_graph_scope is not None:
recipe.model.vision_cuda_graph_scope = vision_cuda_graph_scope

return recipe
Comment on lines +130 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Check if _set_vision_cuda_graph_overrides is called anywhere in the codebase
rg -n '_set_vision_cuda_graph_overrides' --type=py

Repository: NVIDIA-NeMo/Megatron-Bridge

Length of output: 151


🏁 Script executed:

# Also check the file to see the full context of both functions and their calls
cat -n scripts/performance/utils/overrides.py | head -270

Repository: NVIDIA-NeMo/Megatron-Bridge

Length of output: 13292


_set_vision_cuda_graph_overrides is defined but never called and lacks critical handling for the "none" case.

The function is not integrated into set_workload_base_configs (line 202) or set_user_overrides (line 252), so vision CUDA graph overrides will not be applied. Compare with _set_cuda_graph_overrides (called at lines 215 and 255).

Additionally, the function is missing the reset logic for vision_cuda_graph_impl == "none". In _set_cuda_graph_overrides (lines 115–116), when the implementation is set to "none", the RNG tracker is explicitly disabled. Without this in the vision variant, stale RNG tracker state could persist if the vision graph is disabled after being enabled.

🤖 Prompt for AI Agents
In `@scripts/performance/utils/overrides.py` around lines 130 - 167, The
_set_vision_cuda_graph_overrides function is never invoked and also lacks
resetting behavior for the "none" case; to fix, call
_set_vision_cuda_graph_overrides from the same two places where
_set_cuda_graph_overrides is called (inside set_workload_base_configs and
set_user_overrides) so vision overrides are applied, and add explicit handling
inside _set_vision_cuda_graph_overrides for vision_cuda_graph_impl == "none" to
disable TE RNG tracking (set recipe.rng.te_rng_tracker =
recipe.model.use_te_rng_tracker = False) mirroring the reset logic in
_set_cuda_graph_overrides; ensure the function still validates
vision_cuda_graph_scope against valid_te_scopes and sets
recipe.model.vision_cuda_graph_scope when provided.



def _set_recompute_overrides(
recipe: ConfigContainer,
cpu_offloading_num_layers: Optional[int] = None,
Expand Down
3 changes: 3 additions & 0 deletions src/megatron/bridge/data/vlm_datasets/hf_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class HFDatasetConversationProvider(DatasetProvider):
# DataloaderConfig fields are inherited (num_workers, dataloader_type, etc.)
dataloader_type: Optional[Literal["single", "cyclic", "external"]] = "single"

# Enable batch-level online sequence packing (dataset-level packing is available in FinetuneDatasetProvider)
pack_sequences_in_batch: bool = False

def _get_maker(self) -> Callable[..., List[Dict[str, Any]]]:
registry: Dict[str, Callable[..., List[Dict[str, Any]]]] = {
"make_rdr_dataset": make_rdr_dataset,
Expand Down
2 changes: 1 addition & 1 deletion src/megatron/bridge/models/conversion/param_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,7 @@ class ConcatenatedQKVMapping(MegatronParamMapping[Dict[str, torch.Tensor]]):
.. code-block:: python

# Create mapping for attention weights
mapping = QKVMapping(
mapping = ConcatenatedQKVMapping(
megatron_param="decoder.layers.*.self_attention.linear_qkv.weight",
qkv="model.layers.*.self_attn.qkv.weight",
)
Expand Down
Loading