Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 3rdparty/Automodel-workspace/Automodel
Submodule Automodel updated 111 files
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ What you can expect:

Clone **NeMo RL**.
```sh
git clone git@github.com:NVIDIA-NeMo/RL.git nemo-rl
git clone git@github.com:NVIDIA-NeMo/RL.git nemo-rl --recursive
cd nemo-rl

# If you are using the Megatron backend, download the pinned versions of Megatron-LM and NeMo submodules
# by running (This is not necessary if you are using the pure Pytorch/DTensor path):
# If you are already cloned without the recursive option, you can initialize the submodules recursively
git submodule update --init --recursive

# Different branches of the repo can have different pinned versions of these third-party submodules. Ensure
Expand Down Expand Up @@ -127,7 +126,7 @@ bash tools/build-flash-attn-in-uv-cache.sh
> The NeMo RL Dockerfile will warm the uv cache with flash-attn.
> See https://docs.nvidia.com/nemo/rl/latest/docker.html for instructions if you are looking for the NeMo RL container.

If sucessful, you should see `✅ flash-attn successfully added to uv cache`.
If successful, you should see `✅ flash-attn successfully added to uv cache`.

Use `uv run` to launch all commands. It handles pip installing implicitly and ensures your environment is up to date with our lock file.
> [!NOTE]
Expand Down
1 change: 1 addition & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ RUN <<"EOF" bash -exu
uv sync --link-mode symlink --locked --no-install-project
uv sync --link-mode symlink --locked --extra vllm --no-install-project
uv sync --link-mode symlink --locked --extra mcore --no-install-project
uv sync --link-mode symlink --locked --extra automodel --no-install-project
uv sync --link-mode symlink --locked --all-groups --no-install-project
EOF

Expand Down
2 changes: 2 additions & 0 deletions examples/configs/grpo_math_1B.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ checkpointing:
keep_top_k: 3
save_period: 10
checkpoint_must_save_by: null
model_save_format: "safetensors"
save_consolidated: false

policy:
model_name: "Qwen/Qwen2.5-1.5B"
Expand Down
1 change: 1 addition & 0 deletions nemo_rl/algorithms/dpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ def dpo_train(
tokenizer_path=os.path.join(
checkpoint_path, "policy", "tokenizer"
),
checkpointing_cfg=master_config["checkpointing"],
)
torch.save(
train_dataloader.state_dict(),
Expand Down
1 change: 1 addition & 0 deletions nemo_rl/algorithms/grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ def grpo_train(
tokenizer_path=os.path.join(
checkpoint_path, "policy", "tokenizer"
),
checkpointing_cfg=master_config["checkpointing"],
)
torch.save(
dataloader.state_dict(),
Expand Down
1 change: 1 addition & 0 deletions nemo_rl/algorithms/rm.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ def rm_train(
tokenizer_path=os.path.join(
checkpoint_path, "policy", "tokenizer"
),
checkpointing_cfg=master_config["checkpointing"],
)
torch.save(
train_dataloader.state_dict(),
Expand Down
1 change: 1 addition & 0 deletions nemo_rl/algorithms/sft.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ def sft_train(
tokenizer_path=os.path.join(
checkpoint_path, "policy", "tokenizer"
),
checkpointing_cfg=master_config["checkpointing"],
)
torch.save(
train_dataloader.state_dict(),
Expand Down
38 changes: 36 additions & 2 deletions nemo_rl/models/policy/dtensor_policy_worker_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@
import_class_from_path,
resolve_model_class,
)
from nemo_rl.utils.native_checkpoint import (
from nemo_rl.utils.automodel_checkpoint import (
load_checkpoint,
save_checkpoint,
)
from nemo_rl.utils.checkpoint import CheckpointingConfig
from nemo_rl.utils.nsys import wrap_with_nvtx_name


Expand Down Expand Up @@ -213,17 +214,21 @@ def __init__(
model_class = resolve_model_class(model_config.model_type)

full_state_dict = None
model_state_dict_keys = None
if self.rank == 0:
print(f"[Rank {self.rank}] Loading model {model_name} on CPU...")
model = model_class.from_pretrained(
model_name,
device_map="cpu", # load weights onto CPU initially
trust_remote_code=True,
config=model_config,
use_liger_kernel=False,
torch_dtype=str(model_config.torch_dtype),
)

full_state_dict = model.state_dict()
# Store the original model state dict keys before any parallelization
model_state_dict_keys = list(full_state_dict.keys())
del model

print(f"[Rank {self.rank}] Initializing empty model for FSDP...")
Expand All @@ -239,6 +244,7 @@ def __init__(
attn_implementation="flash_attention_2"
if self.enable_seq_packing
else None,
use_liger_kernel=False,
trust_remote_code=True,
torch_dtype=str(model_config.torch_dtype),
)
Expand Down Expand Up @@ -349,6 +355,11 @@ def __init__(
),
)

# Broadcast model state dict keys to all ranks and store as instance variable
keys_to_broadcast = [model_state_dict_keys]
torch.distributed.broadcast_object_list(keys_to_broadcast, src=0)
self.model_state_dict_keys = keys_to_broadcast[0]

# Handle tied word embeddings after loading the state dict
# We need to actually tie the parameters at the model level
is_tied_lm_head = hasattr(self.model, "lm_head") and getattr(
Expand Down Expand Up @@ -1433,11 +1444,30 @@ def save_checkpoint(
weights_path: str,
optimizer_path: Optional[str] = None,
tokenizer_path: Optional[str] = None,
checkpointing_cfg: Optional[CheckpointingConfig] = None,
) -> None:
"""Save a checkpoint of the model.

the optimizer states are saved only if `optimizer` and `optimizer_path` are provided.
"""
if checkpointing_cfg is None:
raise ValueError(
"checkpointing_cfg must be provided when saving checkpoint"
)

# Extract only the checkpointing configuration keys that exist
checkpoint_kwargs = {
key: value
for key, value in checkpointing_cfg.items()
if key
in {
"model_save_format",
"save_consolidated",
"is_peft",
"peft_config",
}
}

save_checkpoint(
model=self.model,
weights_path=weights_path,
Expand All @@ -1446,10 +1476,14 @@ def save_checkpoint(
optimizer_path=optimizer_path,
tokenizer=self.tokenizer if tokenizer_path else None,
tokenizer_path=tokenizer_path,
model_state_dict_keys=self.model_state_dict_keys,
**checkpoint_kwargs,
)

def load_checkpoint(
self, weights_path: str, optimizer_path: Optional[str] = None
self,
weights_path: str,
optimizer_path: Optional[str] = None,
) -> None:
"""Load a checkpoint into the model."""
load_checkpoint(
Expand Down
52 changes: 40 additions & 12 deletions nemo_rl/models/policy/lm_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
LogprobOutputSpec,
ReferenceLogprobOutputSpec,
)
from nemo_rl.utils.checkpoint import CheckpointingConfig
from nemo_rl.utils.flops_tracker import (
FLOPTracker,
get_default_hf_config,
Expand Down Expand Up @@ -75,7 +76,13 @@ def __init__(
pp_size = 1
cp_size = 1

megatron_enable = "megatron_cfg" in config and config["megatron_cfg"]["enabled"]
megatron_enable = bool(config.get("megatron_cfg", {}).get("enabled", False))
dtensor_enable = bool(config.get("dtensor_cfg", {}).get("enabled", False))
if megatron_enable and dtensor_enable:
raise ValueError(
"Configure either Megatron (policy.megatron_cfg.enabled=true) or "
"DTensor (policy.dtensor_cfg.enabled=true), not both."
)
if megatron_enable:
worker_builder_cls = (
"nemo_rl.models.policy.megatron_policy_worker.MegatronPolicyWorker"
Expand All @@ -86,13 +93,14 @@ def __init__(

env_vars = config["megatron_cfg"].get("env_vars", {})
else:
assert config["dtensor_cfg"]["enabled"], (
"Please either set policy.megatron_cfg.enabled=true to use Megatron training backend "
"or set policy.dtensor_cfg.enabled=true to use DTensor training backend."
)
if not dtensor_enable:
raise ValueError(
"Please either set policy.megatron_cfg.enabled=true to use Megatron training backend "
"or set policy.dtensor_cfg.enabled=true to use DTensor training backend."
)

# Check if _v2 is enabled in dtensor_cfg (defaults to False for backward compatibility)
use_v2 = config["dtensor_cfg"].get("_v2", False)
use_v2 = config.get("dtensor_cfg", {}).get("_v2", False)
if use_v2:
worker_builder_cls = "nemo_rl.models.policy.dtensor_policy_worker_v2.DTensorPolicyWorkerV2"
else:
Expand Down Expand Up @@ -588,14 +596,34 @@ def save_checkpoint(
weights_path: str,
optimizer_path: Optional[str] = None,
tokenizer_path: Optional[str] = None,
checkpointing_cfg: Optional[CheckpointingConfig] = None,
) -> None:
"""Save a checkpoint of the model."""
futures = self.worker_group.run_all_workers_single_data(
"save_checkpoint",
weights_path=weights_path,
optimizer_path=optimizer_path,
tokenizer_path=tokenizer_path,
)
# Only pass checkpointing_cfg for DTensor v2
use_v2 = self.cfg.get("dtensor_cfg", {}).get("_v2", False)

if use_v2:
futures = self.worker_group.run_all_workers_single_data(
"save_checkpoint",
weights_path=weights_path,
optimizer_path=optimizer_path,
tokenizer_path=tokenizer_path,
checkpointing_cfg=checkpointing_cfg,
)
else:
if (
checkpointing_cfg is not None
and checkpointing_cfg.get("model_save_format") == "safetensors"
):
raise ValueError(
"safetensors is only supported with DTensorPolicyWorkerV2 (_v2=true)."
)
futures = self.worker_group.run_all_workers_single_data(
"save_checkpoint",
weights_path=weights_path,
optimizer_path=optimizer_path,
tokenizer_path=tokenizer_path,
)
ray.get(futures)

def shutdown(self) -> bool:
Expand Down
Loading
Loading