Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
df7e912
init commit
kamran-nvidia Feb 19, 2026
fd806d2
fix: Add missing import of io in task_encoder and test_task_encoder
kamran-nvidia Feb 19, 2026
1cf57ee
Merge branch 'main' into kamran/qwen3_vl_energon
kamran-nvidia Feb 19, 2026
6fcf48f
fix: Update file permissions for peft_seq_unpacked.sh
kamran-nvidia Feb 19, 2026
374252a
fix: Improve logging and comments in EnergonMultiModalDataModule for …
kamran-nvidia Feb 19, 2026
d2ec77a
fix: Remove unnecessary blank lines in task_encoder and test_task_enc…
kamran-nvidia Feb 19, 2026
0aace0e
feat: Add comprehensive tests for Context Parallelism handling in Ene…
kamran-nvidia Feb 19, 2026
21d2628
fix: Remove unused import of 'io' in task_encoder.py
kamran-nvidia Feb 19, 2026
6b6e273
Update src/megatron/bridge/recipes/qwen_vl/data/energon/task_encoder.py
kamran-nvidia Feb 19, 2026
1477d91
Merge branch 'main' into kamran/qwen3_vl_energon
kamran-nvidia Feb 19, 2026
2fb1b96
fix: Remove unused '__subflavor__' attribute from QwenVLTaskEncoder t…
kamran-nvidia Feb 21, 2026
258dc4b
fix: Add missing '__subflavors__' attribute to ChatMLSample in QwenVL…
kamran-nvidia Feb 21, 2026
f5187af
Merge branch 'main' into kamran/qwen3_vl_energon
kamran-nvidia Feb 23, 2026
706fe9b
fix: Add pack_sequences_in_batch attribute to EnergonProvider
kamran-nvidia Feb 23, 2026
10feab5
feat: Add dataset_type argument for VLM recipes in run_recipe.py
kamran-nvidia Feb 23, 2026
70df6af
fix: Update videos attribute type in ChatMLSample to support nested l…
kamran-nvidia Feb 24, 2026
5c4b45a
feat: Add energon_test.sh for LoRA finetuning with sequence packing c…
kamran-nvidia Feb 24, 2026
612ae77
Merge branch 'main' into kamran/qwen3_vl_energon
kamran-nvidia Feb 24, 2026
5a0685f
fix: Update copyright year and modify command for running LoRA finetu…
kamran-nvidia Feb 24, 2026
3445703
docs: Add finetuning instructions for Energon dataset in README.md
kamran-nvidia Feb 24, 2026
19ca029
feat: Extend video handler to support additional video extensions and…
kamran-nvidia Feb 24, 2026
d42610f
Merge branch 'main' into kamran/qwen3_vl_energon
kamran-nvidia Feb 25, 2026
4f35c68
fix: correct typos in README.md and task_encoder.py
kamran-nvidia Feb 25, 2026
fee00b2
Merge branch 'main' into kamran/qwen3_vl_energon
kamran-nvidia Feb 26, 2026
550cba7
Merge branch 'kamran/qwen3_vl_energon' of github.com:NVIDIA-NeMo/Mega…
kamran-nvidia Feb 26, 2026
4926295
feat: integrate ProcessGroupCollection for distributed training in En…
kamran-nvidia Feb 26, 2026
853dc57
Merge branch 'main' into kamran/qwen3_vl_energon
kamran-nvidia Feb 26, 2026
5047bd6
feat: update image and video processing to use PIL format in QwenVLTa…
kamran-nvidia Feb 26, 2026
9390e19
fix: update input_ids extraction to handle BatchEncoding type in Qwen…
kamran-nvidia Feb 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions examples/models/vlm/qwen3_vl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ W&B report coming soon.

**Note:** LoRA/DoRA significantly reduces memory requirements, allowing for larger batch sizes and fewer GPUs.

## Finetuning with Energon Dataset

Follow the instructions [here](https://github.com/NVIDIA/Megatron-LM/tree/main/examples/multimodal#pretraining) to prepare `LLaVA-Pretrain` dataset in Energon format. Change the file `.nv-meta/dataset.yaml` to the following:

```yaml
__module__: megatron.bridge.recipes.qwen_vl.data.energon.task_encoder
__class__: ChatMLWebdataset
field_map:
imgs: jpg
conversation: json
```

Then, update the dataset path (`dataset.path=/path/to/energon/dataset`) in [energon_test.sh](energon_test.sh) and run the script.

## Evaluation

Coming soon.
79 changes: 79 additions & 0 deletions examples/models/vlm/qwen3_vl/energon_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env bash
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Workspace directory for checkpoints and results
WORKSPACE=${WORKSPACE:-/workspace}

# Before training, make sure to set WANDB_API_KEY or disable wandb logging
# export WANDB_API_KEY=<your_wandb_api_key>
# export WANDB_MODE=disabled

# Test Seq Packing configurations for LoRA finetuning on the dense model
PRETRAINED_CHECKPOINT=${WORKSPACE}/models/Qwen3-VL-8B-Instruct
MODEL_NAME=qwen3_vl_8b
DATASET_NAME=energon
SEQ_LENGTH=4096
TRAIN_ITERS=50
GLOBAL_BATCH_SIZE=32
MICRO_BATCH_SIZE=2
EVAL_ITERS=10
LR=0.00005
MIN_LR=0.000005
LR_WARMUP_ITERS=10
LOG_INTERVAL=1
WANDB_PROJECT=megatron-bridge-${DATASET_NAME}

SEQ_PACKING_CONFIGS=(False True)

# EP/TP/PP/CP/N_PROC combinations: "EP,TP,PP,CP,N_PROC" configurations
# N_PROC is the total number of processes (GPUs) used for training
# N_PROC is used to control DP size, to make the loss curves comparable
PARALLELISM_CONFIGS=("1,1,1,4,8" "1,1,1,2,4" "1,1,1,1,2")

for pack_config in "${SEQ_PACKING_CONFIGS[@]}"; do
for par_config in "${PARALLELISM_CONFIGS[@]}"; do
IFS=',' read -r EP TP PP CP N_PROC <<< "$par_config"
echo "Running LoRA finetuning pack_sequences_in_batch=$pack_config with EP=$EP TP=$TP PP=$PP CP=$CP N_PROC=$N_PROC"
uv run python -m torch.distributed.run --nproc_per_node=$N_PROC scripts/training/run_recipe.py \
--recipe ${MODEL_NAME}_finetune_config \
--step_func qwen3_vl_step \
--peft_scheme lora \
--dataset_type energon \
checkpoint.pretrained_checkpoint=$PRETRAINED_CHECKPOINT \
model.seq_length=$SEQ_LENGTH \
train.train_iters=$TRAIN_ITERS \
train.global_batch_size=$GLOBAL_BATCH_SIZE \
train.micro_batch_size=$MICRO_BATCH_SIZE \
train.eval_iters=$EVAL_ITERS \
optimizer.lr=$LR \
optimizer.min_lr=$MIN_LR \
scheduler.lr_warmup_iters=$LR_WARMUP_ITERS \
checkpoint.save=${WORKSPACE}/results/${MODEL_NAME}_lora_seq_pack_${pack_config}_cp${CP} \
logger.log_interval=$LOG_INTERVAL \
logger.wandb_project=$WANDB_PROJECT \
logger.wandb_exp_name=${MODEL_NAME}_${DATASET_NAME}_lora_seq_pack_${pack_config}_cp${CP} \
dataset.seq_length=$SEQ_LENGTH \
dataset.path=/path/to/energon/dataset \
dataset.pack_sequences_in_batch=$pack_config \
model.expert_model_parallel_size=$EP \
model.tensor_model_parallel_size=$TP \
model.pipeline_model_parallel_size=$PP \
model.context_parallel_size=$CP \
model.calculate_per_token_loss=True \
ddp.average_in_collective=False \
ddp.grad_reduce_in_fp32=True
done
done

Empty file modified examples/models/vlm/qwen3_vl/peft_seq_unpacked.sh
100644 → 100755
Empty file.
13 changes: 13 additions & 0 deletions scripts/training/run_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ def parse_args() -> tuple[argparse.Namespace, list[str]]:
default=None,
help="Sequence length for training",
)
parser.add_argument(
"--dataset_type",
type=str,
default=None,
help="Dataset type for VLM recipes (e.g., 'energon', 'mock', 'hf', 'preloaded').",
)
args, cli_overrides = parser.parse_known_args()
return args, cli_overrides

Expand All @@ -141,6 +147,7 @@ def load_recipe(
peft_scheme: str | None,
packed_sequence: bool = False,
seq_length: int | None = None,
dataset_type: str | None = None,
) -> ConfigContainer:
"""
Load recipe by name from megatron.bridge.recipes.
Expand All @@ -150,6 +157,7 @@ def load_recipe(
peft_scheme: PEFT scheme to use ('lora', 'dora', or None)
packed_sequence: Enable packed sequence training (default: False)
seq_length: Sequence length for training (optional)
dataset_type: Dataset type for VLM recipes (e.g., 'energon', 'mock', 'hf', 'preloaded')

Returns:
ConfigContainer from calling the recipe
Expand All @@ -175,11 +183,13 @@ def load_recipe(
accepts_peft = "peft" in params or has_var_keyword
accepts_packed_sequence = "packed_sequence" in params or has_var_keyword
accepts_seq_length = "seq_length" in params or has_var_keyword
accepts_dataset_type = "dataset_type" in params or has_var_keyword
except (ValueError, TypeError):
# If signature inspection fails, fallback conservatively
accepts_peft = True # peft is widely supported, try passing it
accepts_packed_sequence = False # new parameter, don't pass if unsure
accepts_seq_length = False # new parameter, don't pass if unsure
accepts_dataset_type = False # VLM-specific, don't pass if unsure

# Build kwargs dynamically based on what the recipe accepts
kwargs = {}
Expand All @@ -189,6 +199,8 @@ def load_recipe(
kwargs["packed_sequence"] = packed_sequence
if accepts_seq_length and seq_length is not None:
kwargs["seq_length"] = seq_length
if accepts_dataset_type and dataset_type is not None:
kwargs["dataset_type"] = dataset_type

try:
return config_builder(**kwargs)
Expand Down Expand Up @@ -224,6 +236,7 @@ def main() -> None:
args.peft_scheme,
args.packed_sequence,
args.seq_length,
args.dataset_type,
)

config = process_config_with_overrides(
Expand Down
91 changes: 47 additions & 44 deletions src/megatron/bridge/data/energon/base_energon_datamodule.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
from typing import Any, Literal, Optional

from megatron.core import parallel_state
from megatron.core.process_groups_config import ProcessGroupCollection
from megatron.energon import WorkerConfig, get_savable_loader, get_train_dataset


Expand Down Expand Up @@ -64,6 +64,7 @@ def __init__(
decoder_seq_length: Optional[int] = None,
packing_buffer_size: Optional[int] = None,
validation_task_encoder: Optional[Any] = None,
pg_collection: Optional[ProcessGroupCollection] = None,
**kwargs,
) -> None:
"""
Expand All @@ -89,6 +90,8 @@ def __init__(
packing_buffer_size (int, optional): Size of the packing buffer for batched samples. Defaults to None.
validation_task_encoder (MultiModalTaskEncoder, optional): Encoder responsible for encoding
and batching samples for validation. Defaults to None and will be the same as task_encoder.
pg_collection (ProcessGroupCollection, optional): Process group collection for distributed training.
If provided, used instead of the global parallel_state. Defaults to None.
**kwargs: Additional keyword arguments. Will be passed to get_train_dataset() of Energon
"""

Expand All @@ -112,8 +115,49 @@ def __init__(
self.packing_buffer_size = packing_buffer_size
self.validation_task_encoder = validation_task_encoder or self.task_encoder
self.num_val_workers = num_val_workers or self.num_workers
self.pg_collection = pg_collection
self.kwargs = kwargs

def _build_worker_config(self, num_workers: int, split: str = "train") -> WorkerConfig:
"""Build a WorkerConfig using pg_collection, falling back to default_worker_config.

NOTE: We intentionally use the pure DP rank (pg_collection.dp)
rather than the combined DP-CP rank. With Megatron's rank ordering
(default "tp-cp-ep-dp-pp"), all CP ranks within the same DP replica
already share the same pure DP rank. This ensures that CP ranks
processing different sequence portions of the same batch receive
identical data from the dataloader.
Using dp_cp would be INCORRECT here — it would assign each CP rank
a unique rank, causing them to read different data shards.
"""
if self.pg_collection is None or self.pg_collection.dp is None:
logger.info(
f"Multimodal {split} data loader pg_collection is not available, "
f"using default worker config with num_workers {num_workers}"
)
return WorkerConfig.default_worker_config(num_workers)

rank = self.pg_collection.dp.rank()
world_size = self.pg_collection.dp.size()
data_parallel_group = self.pg_collection.dp
cp_rank = self.pg_collection.cp.rank() if self.pg_collection.cp is not None else 0
cp_size = self.pg_collection.cp.size() if self.pg_collection.cp is not None else 1

logger.info(
f"Multimodal {split} dataloader initializing with "
f"dp_rank {rank} dp_world_size {world_size} "
f"cp_rank {cp_rank} cp_size {cp_size} "
f"data_parallel_group {data_parallel_group}"
)
return WorkerConfig(
rank=rank,
world_size=world_size,
num_workers=num_workers,
data_parallel_group=data_parallel_group,
worker_debug_path=None,
worker_log_level=0,
)

def datasets_provider(self, worker_config, split: Literal["train", "val"] = "val"):
"""
Provide the dataset for training or validation.
Expand Down Expand Up @@ -165,28 +209,7 @@ def train_dataloader(self) -> Any:
logger.info(f"Multimodal train dataloader initializing with init_global_step {self.init_global_step}")
if self.train_dataloader_object:
return self.train_dataloader_object
if not parallel_state.is_initialized():
logger.info(
f"Muiltimodal data loader parallel state is not initialized,"
f"using default worker config with no_workers {self.num_workers}"
)
worker_config = WorkerConfig.default_worker_config(self.num_workers)
else:
rank = parallel_state.get_data_parallel_rank()
world_size = parallel_state.get_data_parallel_world_size()
data_parallel_group = parallel_state.get_data_parallel_group()
logger.info(
f" Multimodal train dataloader initializing with"
f"rank {rank} world_size {world_size} data_parallel_group {data_parallel_group} ****** "
)
worker_config = WorkerConfig(
rank=rank,
world_size=world_size,
num_workers=self.num_workers,
data_parallel_group=data_parallel_group,
worker_debug_path=None,
worker_log_level=0,
)
worker_config = self._build_worker_config(self.num_workers, split="train")
train_dataset = self.datasets_provider(worker_config, split="train")
energon_dataloader = get_savable_loader(train_dataset, worker_config=worker_config)
self.train_dataloader_object = energon_dataloader
Expand All @@ -204,27 +227,7 @@ def val_dataloader(self):
"""
if self.val_dataloader_object:
return self.val_dataloader_object

if not parallel_state.is_initialized():
logger.info(
f"Muiltimodal val data loader parallel state is not initialized,"
f"using default worker config with no_workers {self.num_workers}"
)
worker_config = WorkerConfig.default_worker_config(self.num_val_workers)
else:
rank = parallel_state.get_data_parallel_rank()
world_size = parallel_state.get_data_parallel_world_size()
data_parallel_group = parallel_state.get_data_parallel_group()

logger.info(f"rank {rank} world_size {world_size} data_parallel_group {data_parallel_group}")
worker_config = WorkerConfig(
rank=rank,
world_size=world_size,
num_workers=self.num_workers,
data_parallel_group=data_parallel_group,
worker_debug_path=None,
worker_log_level=0,
)
worker_config = self._build_worker_config(self.num_val_workers, split="val")
val_dataset = self.datasets_provider(worker_config, split="val")
energon_loader = get_savable_loader(val_dataset, worker_config=worker_config)
self.val_dataloader_object = energon_loader
Expand Down
3 changes: 3 additions & 0 deletions src/megatron/bridge/data/energon/energon_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class EnergonProvider(DatasetProvider):
num_workers: int_repr
dataloader_type: str = "external"
task_encoder: Optional[Any] = None
# Enable batch-level online sequence packing
pack_sequences_in_batch: bool = False

def build_datasets(self, context: DatasetBuildContext):
dataset = EnergonMultiModalDataModule(
Expand All @@ -44,6 +46,7 @@ def build_datasets(self, context: DatasetBuildContext):
micro_batch_size=self.micro_batch_size,
global_batch_size=self.global_batch_size,
num_workers=self.num_workers,
pg_collection=context.pg_collection,
)
return (
iter(dataset.train_dataloader()),
Expand Down
2 changes: 2 additions & 0 deletions src/megatron/bridge/data/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,15 @@ def protocol_adapter(
train_val_test_num_samples: list[int],
config: DatasetProvider,
tokenizer: Optional[MegatronTokenizer] = None,
pg_collection: Optional[ProcessGroupCollection] = None,
) -> tuple[Optional[Any], Optional[Any], Optional[Any]]:
"""Adapter function that bridges the protocol interface with the legacy interface."""
context = DatasetBuildContext(
train_samples=train_val_test_num_samples[0],
valid_samples=train_val_test_num_samples[1],
test_samples=train_val_test_num_samples[2],
tokenizer=tokenizer,
pg_collection=pg_collection,
)
return config.build_datasets(context)

Expand Down
Loading