Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 295 additions & 0 deletions .github/workflows/_automodel_integration_check.yml

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions .github/workflows/_submodule_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ jobs:
target_commit=$(git ls-tree origin/${{ inputs.base_ref }} "$submodule_path" | awk '{print $3}')

if [[ -z "$target_commit" ]]; then
echo "❌ Could not find $submodule_name in ${{ inputs.base_ref }} branch"
failed=1
echo "✅ $submodule_name: New submodule being added (not present in ${{ inputs.base_ref }} branch)"
changed=1
success_body+="$submodule_name: ✅ New submodule being added"$'\n'
continue
fi

Expand Down
11 changes: 11 additions & 0 deletions .github/workflows/cicd-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ jobs:
pr_number: ${{ github.event.number }}
head_sha: ${{ github.event.pull_request.head.sha }}

automodel-integration-check:
name: Check if changes in nemo-automodel are in sync with nemo-rl and vice versa
needs: [pre-flight]
if: github.event_name == 'pull_request'
uses: ./.github/workflows/_automodel_integration_check.yml
with:
base_ref: ${{ github.base_ref }}
head_ref: ${{ github.head_ref }}
pr_number: ${{ github.event.number }}
head_sha: ${{ github.event.pull_request.head.sha }}

lint-check:
name: Lint check
needs: [pre-flight]
Expand Down
5 changes: 5 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@
url = https://github.com/terrykong/Megatron-LM.git
branch = sahilj/megatron-external-loss-norm
shallow = true
[submodule "3rdparty/Automodel-workspace/Automodel"]
path = 3rdparty/Automodel-workspace/Automodel
url = https://github.com/NVIDIA-NeMo/Automodel.git
branch = nemo-rl-submodule
shallow = true
1 change: 1 addition & 0 deletions 3rdparty/Automodel-workspace/Automodel
Submodule Automodel added at 09f872
1 change: 1 addition & 0 deletions examples/configs/grpo_math_1B.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ policy:
logprob_chunk_size: null

dtensor_cfg:
_v2: true
enabled: true
cpu_offload: False
sequence_parallel: false
Expand Down
4 changes: 2 additions & 2 deletions nemo_rl/algorithms/loss_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
masked_mean,
)
from nemo_rl.distributed.batched_data_dict import BatchedDataDict
from nemo_rl.distributed.model_utils import from_parallel_logits_to_logprobs
from nemo_rl.models.dtensor.parallelize import (
from nemo_rl.distributed.model_utils import (
from_parallel_logits_to_logprobs,
get_logprobs_from_vocab_parallel_logits,
)

Expand Down
50 changes: 50 additions & 0 deletions nemo_rl/distributed/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,53 @@ def backward(ctx, grad_output):
)

return grad_input, None, None # , None


def get_logprobs_from_vocab_parallel_logits(
vocab_parallel_logits: DTensor,
input_ids: torch.Tensor | DTensor,
seq_index: Optional[torch.Tensor] = None,
chunk_size: Optional[int] = None,
):
"""Computes log probabilities from vocabulary-parallel logits.

This function takes logits that are sharded across the vocabulary dimension (tensor parallel)
and computes the log probabilities for the given input IDs.

Args:
vocab_parallel_logits (DTensor): Logits distributed across tensor parallel workers,
with shape [batch_size, seq_len, vocab_size/tp_size].
input_ids (torch.Tensor | DTensor): Input token IDs for which to compute log probabilities,
with shape [batch_size, seq_len].
seq_index (Optional[torch.Tensor]): Sequence index for the input IDs,
with shape [sequence_length].
chunk_size (Optional[int]): Sequence dimension chunk size for computing log probabilities.

Returns:
torch.Tensor: Log probabilities for the given input IDs.
"""
device_mesh = vocab_parallel_logits.device_mesh
if seq_index is not None:
assert (
device_mesh.mesh_dim_names is not None
and "cp" in device_mesh.mesh_dim_names
), "seq_index must be provided for cp sharded logits"

tp_size = 1

tp_group = device_mesh.get_group("tp")
tp_rank = tp_group.rank()
tp_size = tp_group.size()

vocab_interval_per_rank = vocab_parallel_logits.shape[-1] // tp_size

return dtensor_from_parallel_logits_to_logprobs(
vocab_parallel_logits.to_local(),
input_ids,
vocab_interval_per_rank * tp_rank,
(tp_rank + 1) * vocab_interval_per_rank,
tp_group,
inference_only=not torch.is_grad_enabled(),
seq_index=seq_index,
chunk_size=chunk_size,
)
1 change: 1 addition & 0 deletions nemo_rl/distributed/ray_actor_environment_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# Temporary workaround for the coupled implementation of DTensorPolicyWorker and vLLM.
# This will be reverted to PY_EXECUTABLES.BASE once https://github.com/NVIDIA-NeMo/RL/issues/501 is resolved.
"nemo_rl.models.policy.dtensor_policy_worker.DTensorPolicyWorker": VLLM_EXECUTABLE,
"nemo_rl.models.policy.dtensor_policy_worker_v2.DTensorPolicyWorkerV2": PY_EXECUTABLES.AUTOMODEL,
"nemo_rl.models.policy.megatron_policy_worker.MegatronPolicyWorker": MCORE_EXECUTABLE,
"nemo_rl.environments.math_environment.MathEnvironment": PY_EXECUTABLES.SYSTEM,
"nemo_rl.environments.vlm_environment.VLMEnvironment": PY_EXECUTABLES.SYSTEM,
Expand Down
6 changes: 3 additions & 3 deletions nemo_rl/distributed/virtual_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ class PY_EXECUTABLES:
# Use NeMo-RL direct dependencies.
BASE = "uv run --locked"

# Use NeMo-RL direct dependencies.
AUTOMODEL = "uv run --locked --extra automodel"

# Use NeMo-RL direct dependencies and vllm.
VLLM = "uv run --locked --extra vllm"

# Use NeMo-RL direct dependencies and nemo-automodel.
AUTOMODEL = "uv run --locked --extra automodel"

# Megatron-core (and nemo dependencies)
# We always run with --reinstall to avoid issues where someone runs "uv run ... --extra mcore ..."
# but the submodules are not downloaded yet. This results in errors where it appears Megatron/Nemo
Expand Down
51 changes: 0 additions & 51 deletions nemo_rl/models/dtensor/parallelize.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
from transformers.models.qwen3.modeling_qwen3 import Qwen3ForCausalLM
from transformers.models.smolvlm.modeling_smolvlm import SmolVLMForConditionalGeneration

from nemo_rl.distributed.model_utils import dtensor_from_parallel_logits_to_logprobs
from nemo_rl.models.policy.utils import import_class_from_path


Expand Down Expand Up @@ -805,53 +804,3 @@ def get_grad_norm(
total_norm = total_norm.item() ** (1.0 / norm_type) # type: ignore

return total_norm


def get_logprobs_from_vocab_parallel_logits(
vocab_parallel_logits: DTensor,
input_ids: torch.Tensor | DTensor,
seq_index: Optional[torch.Tensor] = None,
chunk_size: Optional[int] = None,
):
"""Computes log probabilities from vocabulary-parallel logits.

This function takes logits that are sharded across the vocabulary dimension (tensor parallel)
and computes the log probabilities for the given input IDs.

Args:
vocab_parallel_logits (DTensor): Logits distributed across tensor parallel workers,
with shape [batch_size, seq_len, vocab_size/tp_size].
input_ids (torch.Tensor | DTensor): Input token IDs for which to compute log probabilities,
with shape [batch_size, seq_len].
seq_index (Optional[torch.Tensor]): Sequence index for the input IDs,
with shape [sequence_length].
chunk_size (Optional[int]): Sequence dimension chunk size for computing log probabilities.

Returns:
torch.Tensor: Log probabilities for the given input IDs.
"""
device_mesh = vocab_parallel_logits.device_mesh
if seq_index is not None:
assert (
device_mesh.mesh_dim_names is not None
and "cp" in device_mesh.mesh_dim_names
), "seq_index must be provided for cp sharded logits"

tp_size = 1

tp_group = device_mesh.get_group("tp")
tp_rank = tp_group.rank()
tp_size = tp_group.size()

vocab_interval_per_rank = vocab_parallel_logits.shape[-1] // tp_size

return dtensor_from_parallel_logits_to_logprobs(
vocab_parallel_logits.to_local(),
input_ids,
vocab_interval_per_rank * tp_rank,
(tp_rank + 1) * vocab_interval_per_rank,
tp_group,
inference_only=not torch.is_grad_enabled(),
seq_index=seq_index,
chunk_size=chunk_size,
)
1 change: 1 addition & 0 deletions nemo_rl/models/policy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

class DTensorConfig(TypedDict):
enabled: bool
_v2: NotRequired[bool]
cpu_offload: NotRequired[bool]
sequence_parallel: NotRequired[bool]
activation_checkpointing: NotRequired[bool]
Expand Down
10 changes: 9 additions & 1 deletion nemo_rl/models/policy/dtensor_policy_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@
from nemo_rl.algorithms.interfaces import LossFunction, LossType
from nemo_rl.algorithms.loss_functions import SequencePackingLossWrapper
from nemo_rl.distributed.batched_data_dict import BatchedDataDict
from nemo_rl.distributed.model_utils import get_logprobs_from_vocab_parallel_logits
from nemo_rl.models.dtensor.parallelize import (
_parallelize_model,
clip_grad_by_total_norm_,
get_grad_norm,
get_logprobs_from_vocab_parallel_logits,
to_local_if_dtensor,
)
from nemo_rl.models.huggingface.common import (
Expand Down Expand Up @@ -1242,6 +1242,14 @@ def _add_noise_to_weights(self) -> None:
def return_state_dict(self):
return self.model.state_dict()

def return_model_config(self) -> dict[str, Any]:
"""Return the model configuration as a dictionary.

Returns:
dict: Model configuration dictionary
"""
return self.model.config

def report_device_id(self) -> str:
"""Report the UUID of the current CUDA device using NVML.

Expand Down
Loading
Loading