Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions docs/design-docs/training-backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,19 @@ NeMo RL supports multiple training backends to accommodate different model sizes

## Available Backends

- **DTensor (FSDP2)** - PyTorch's next-generation distributed training with improved memory efficiency
- **Megatron** - NVIDIA's high-performance training framework for scaling to large models (>100B parameters)
- **DTensor (FSDP2)** - PyTorch's next-generation distributed training with improved memory efficiency.
- **Megatron** - NVIDIA's high-performance training framework for scaling to large models (>100B parameters).

## Supported Input Checkpoint Format

At this time, NeMo RL only supports Hugging Face checkpoints as inputs to the training scripts. This applies to both
the `DTensor` backend and the `Megatron` backend.

* `DTensor` uses the Hugging Face checkpoint both to initialize the training backend and to configure `vllm`, ensuring the model implementations match exactly. This is crucial for correctness.
* `Megatron` also uses the Hugging Face checkpoint to configure `vllm`, and performs a one-time conversion to a Megatron-format checkpoint to initialize the training backend.

If you would like to see direct support for Megatron checkpoints, please share your use case on
https://github.com/NVIDIA-NeMo/RL/issues/671.

## Backend Selection

Expand Down Expand Up @@ -33,3 +44,34 @@ To enable DTensor (FSDP2) training:
## Configuration Examples

For comprehensive examples of each algorithm and backend, see the [examples/configs/recipes/llm](https://github.com/NVIDIA-NeMo/RL/tree/main/examples/configs/recipes/llm) folder. This directory contains ready-to-use configurations for various supported combinations.

## Megatron Configuration

The Megatron backend requires a checkpoint directory for storing converted Hugging Face model weights in Megatron format. This directory must be accessible from all nodes in your distributed training setup.

### Environment Variable Priority (Highest to Lowest) ###

1. **`NRL_MEGATRON_CHECKPOINT_DIR`** - The custom checkpoint directory path.
2. [RECOMMENDED] **`HF_HOME/nemo_rl`** - Uses the Hugging Face cache directory, if available.
3. **`~/.cache/huggingface/nemo_rl`** - The default fallback location.

### Configuration Examples ###

```bash
# Option 1: Set custom checkpoint directory
export NRL_MEGATRON_CHECKPOINT_DIR="/shared/nfs/checkpoints/megatron"

# Option 2: Use HuggingFace home directory (recommended for shared setups)
export HF_HOME="/shared/nfs/huggingface"
# This will use /shared/nfs/huggingface/nemo_rl

# Option 3: Use default (no environment variables needed)
# Uses ~/.cache/huggingface/nemo_rl
```

### Best Practices ###

- **Mount in checkpoint directory**: If you are using Docker, make sure the Megatron checkpoint path is covered by `-v`/`--mount`. Similarly, if you are using SLURM+pyxis, ensure `--container-mounts` includes this path.
- **Use shared storage**: Ensure the checkpoint directory is accessible from all nodes (e.g., NFS, shared filesystem).
- **Prefer HF_HOME**: If you already have `HF_HOME` mounted across nodes, this reduces the number of environment variables to manage.
- **Sufficient space**: Ensure adequate disk space for the converted model checkpoints.
12 changes: 7 additions & 5 deletions nemo_rl/models/policy/megatron_policy_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
from nemo_rl.models.policy.utils import (
configure_expandable_segments,
get_gpu_info,
get_megatron_checkpoint_dir,
get_runtime_env_for_policy_worker,
)

Expand Down Expand Up @@ -356,7 +357,6 @@ def __init__(
*,
worker_sharding_annotations: NamedSharding,
pre_init_communication_queue: Queue,
megatron_checkpoint_home: Optional[str] = None,
**kwargs: Any,
):
self.cfg = config
Expand All @@ -378,10 +378,7 @@ def __init__(
if os.path.exists(hf_model_name):
hf_model_subdir = f"model_{hf_model_subdir.replace('/', '_')}"

if megatron_checkpoint_home is not None:
pretrained_path = f"{megatron_checkpoint_home}/{hf_model_subdir}"
else:
pretrained_path = f"/opt/checkpoints/tron/{hf_model_subdir}"
pretrained_path = f"{get_megatron_checkpoint_dir()}/{hf_model_subdir}"
pt_checkpoint_exists = os.path.exists(pretrained_path) and os.path.exists(
os.path.join(pretrained_path, "iter_0000000")
)
Expand Down Expand Up @@ -435,6 +432,11 @@ def __init__(
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token

if not os.path.exists(pretrained_run_config):
raise FileNotFoundError(
f"Pretrained run config not found at {pretrained_run_config} on rank={get_rank_safe()}. This usually means that the one-time HF->mcore conversion on rank=0 saved to a directory not being mounted on this node. Please check "
)

cfg_from_pretrained = ConfigContainer.from_yaml(pretrained_run_config)
model_cfg = cfg_from_pretrained.model_config
cfg_from_pretrained.logger_config = LoggerConfig()
Expand Down
26 changes: 26 additions & 0 deletions nemo_rl/models/policy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,29 @@ def get_runtime_env_for_policy_worker(policy_worker_name: str) -> dict[str, Any]
}

return runtime_env


def get_megatron_checkpoint_dir() -> str:
"""Gets the default megatron checkpoint directory for initial HF -> Mcore conversion.

Megatron initial checkpoint should be saved to a path available on all nodes. The directory used will take this order of precendence:
1. $NRL_MEGATRON_CHECKPOINT_DIR (if set)
2. $HF_HOME/nemo_rl (if HF_HOME is set)
3. ~/.cache/huggingface/nemo_rl

HF_HOME is preferred since many users will also have that path mounted and it means one less directory
to mount into your runtime environment.
"""
nrl_checkpoint_dir = os.environ.get("NRL_MEGATRON_CHECKPOINT_DIR")
if nrl_checkpoint_dir is not None and nrl_checkpoint_dir.strip():
checkpoint_dir = nrl_checkpoint_dir
else:
hf_home = os.environ.get("HF_HOME")
if hf_home is not None and hf_home.strip():
checkpoint_dir = os.path.join(hf_home, "nemo_rl")
else:
checkpoint_dir = os.path.join(
os.path.expanduser("~"), ".cache", "huggingface", "nemo_rl"
)
print(f"Using default megatron checkpoint dir: {checkpoint_dir}")
return checkpoint_dir
118 changes: 116 additions & 2 deletions tests/unit/models/policy/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
# limitations under the License.

import os
import unittest
import unittest.mock
from unittest.mock import MagicMock, patch

from nemo_rl.models.policy.utils import configure_expandable_segments
from nemo_rl.models.policy.utils import (
configure_expandable_segments,
get_megatron_checkpoint_dir,
)


class TestConfigureExpandableSegments(unittest.TestCase):
Expand Down Expand Up @@ -131,3 +134,114 @@ def test_ampere_gpu_no_existing_config(self, mock_get_device_properties):

# Verify the environment variable was not set
self.assertNotIn("PYTORCH_CUDA_ALLOC_CONF", os.environ)

@patch("torch.cuda.get_device_properties")
def test_ampere_gpu_with_expandable_segments_true_raises_error(
self, mock_get_device_properties
):
"""Test Ampere GPU with expandable_segments:True in config raises RuntimeError."""
# Mock GPU properties for Ampere architecture
mock_device_properties = MagicMock()
mock_device_properties.major = 8 # Ampere
mock_get_device_properties.return_value = mock_device_properties

# Set config with expandable_segments:True
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# Call the function and expect RuntimeError
with self.assertRaises(RuntimeError) as context:
configure_expandable_segments()

# Verify the error message
self.assertIn("expandable_segments is enabled", str(context.exception))
self.assertIn(
"not supported on architectures older than Hopper", str(context.exception)
)


class TestGetMegatronCheckpointDir:
"""Test cases for the get_megatron_checkpoint_dir function."""

def test_nrl_megatron_checkpoint_dir_takes_precedence(self):
"""Test that NRL_MEGATRON_CHECKPOINT_DIR environment variable takes highest precedence."""
expected_dir = "/custom/nrl/checkpoint/path"

with unittest.mock.patch.dict(
os.environ,
{
"NRL_MEGATRON_CHECKPOINT_DIR": expected_dir,
"HF_HOME": "/some/hf/home",
"HOME": "/some/home",
},
):
result = get_megatron_checkpoint_dir()
assert result == expected_dir

def test_hf_home_fallback_when_nrl_not_set(self):
"""Test that HF_HOME/nemo_rl is used when NRL_MEGATRON_CHECKPOINT_DIR is not set."""
hf_home = "/path/to/hf/home"
expected_dir = os.path.join(hf_home, "nemo_rl")

env_vars = {"HF_HOME": hf_home, "HOME": "/some/home"}
# Remove NRL_MEGATRON_CHECKPOINT_DIR if it exists
env_vars.pop("NRL_MEGATRON_CHECKPOINT_DIR", None)

with unittest.mock.patch.dict(os.environ, env_vars, clear=True):
result = get_megatron_checkpoint_dir()
assert result == expected_dir

def test_default_fallback_when_no_env_vars_set(self):
"""Test that ~/.cache/huggingface/nemo_rl is used when no environment variables are set."""
home_dir = "/home/testuser"
expected_dir = os.path.join(home_dir, ".cache", "huggingface", "nemo_rl")

with unittest.mock.patch.dict(os.environ, {"HOME": home_dir}, clear=True):
with unittest.mock.patch("os.path.expanduser") as mock_expanduser:
mock_expanduser.return_value = home_dir
result = get_megatron_checkpoint_dir()
assert result == expected_dir
mock_expanduser.assert_called_once_with("~")

def test_nrl_checkpoint_dir_empty_string_treated_as_unset(self):
"""Test that an empty NRL_MEGATRON_CHECKPOINT_DIR is treated as unset."""
hf_home = "/path/to/hf/home"
expected_dir = os.path.join(hf_home, "nemo_rl")

with unittest.mock.patch.dict(
os.environ,
{
"NRL_MEGATRON_CHECKPOINT_DIR": "",
"HF_HOME": hf_home,
"HOME": "/some/home",
},
):
result = get_megatron_checkpoint_dir()
assert result == expected_dir

def test_hf_home_empty_string_treated_as_unset(self):
"""Test that an empty HF_HOME is treated as unset."""
home_dir = "/home/testuser"
expected_dir = os.path.join(home_dir, ".cache", "huggingface", "nemo_rl")

with unittest.mock.patch.dict(
os.environ, {"HF_HOME": "", "HOME": home_dir}, clear=True
):
with unittest.mock.patch("os.path.expanduser") as mock_expanduser:
mock_expanduser.return_value = home_dir
result = get_megatron_checkpoint_dir()
assert result == expected_dir

def test_function_prints_selected_directory(self, capsys):
"""Test that the function prints the selected directory."""
expected_dir = "/custom/checkpoint/dir"

with unittest.mock.patch.dict(
os.environ, {"NRL_MEGATRON_CHECKPOINT_DIR": expected_dir}
):
result = get_megatron_checkpoint_dir()

captured = capsys.readouterr()
assert (
f"Using default megatron checkpoint dir: {expected_dir}" in captured.out
)
assert result == expected_dir
Loading