Conversation
Signed-off-by: ashors1 <ashors@nvidia.com>
ℹ️ File Consistency CheckCheck based on commit: a828cd2 (PR #1585 from ✅ DTensor Policy Worker Synchronization CheckBoth DTensor policy worker files were modified in this PR:
Please ensure that the changes are consistent between both files where applicable. This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning. |
📝 WalkthroughWalkthroughPolicy worker files are reorganized into a Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Areas requiring extra attention:
Possibly related PRs
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Signed-off-by: ashors1 <ashors@nvidia.com>
ℹ️ File Consistency CheckCheck based on commit: 547f360 (PR #1585 from ✅ DTensor Policy Worker Synchronization CheckBoth DTensor policy worker files were modified in this PR:
Please ensure that the changes are consistent between both files where applicable. This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning. |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
nemo_rl/models/policy/workers/dtensor_policy_worker_v2.py (2)
572-813: Dummy microbatch handling intrain()can corrupt metrics and lossesInside the microbatch loop, dummy batches created for sequence packing are not cleanly excluded from metric aggregation:
loss_metricsandnum_valid_samplesare only set insideif mb_idx < iterator_len:.For dummy batches (
mb_idx >= iterator_len), you doloss *= 0but still later run:if num_valid_samples > 0: mb_losses.append(loss.item()) all_mb_metrics.append(loss_metrics)which reuses the previous real batch’s
loss_metricsandnum_valid_samples, effectively duplicating metrics and adding extra zero‑loss entries.This skews
all_mb_metrics, per‑GB loss accumulation, and the finalglobal_loss, especially when there are many padding/dummy batches.Here’s a minimal, localized fix that keeps gradients zeroed for dummy batches but cleanly skips their metrics:
@@ - for mb_idx, mb in enumerate( - itertools.chain(mb_iterator, dummy_iterator) - ): + for mb_idx, mb in enumerate( + itertools.chain(mb_iterator, dummy_iterator) + ): @@ - with torch.autocast(device_type="cuda", dtype=self.dtype): + with torch.autocast(device_type="cuda", dtype=self.dtype): @@ - # skip the update for dummy batches - if mb_idx < iterator_len: - ## scale by the number of global batches so we get the correct - ## value when summing metrics across all microbatches - for k in loss_metrics.keys(): - loss_metrics[k] /= num_global_batches - num_valid_samples = loss_metrics["num_valid_samples"] - loss_metrics["lr"] = self.optimizer.param_groups[0]["lr"] - loss_metrics["global_valid_seqs"] = global_valid_seqs.item() - loss_metrics["global_valid_toks"] = global_valid_toks.item() - else: - loss *= 0 - - # Backward pass + # skip the update for dummy batches + is_dummy_mb = mb_idx >= iterator_len + if not is_dummy_mb: + # scale by the number of global batches so we get the correct + # value when summing metrics across all microbatches + for k in loss_metrics.keys(): + loss_metrics[k] /= num_global_batches + num_valid_samples = loss_metrics["num_valid_samples"] + loss_metrics["lr"] = self.optimizer.param_groups[0]["lr"] + loss_metrics["global_valid_seqs"] = global_valid_seqs.item() + loss_metrics["global_valid_toks"] = global_valid_toks.item() + else: + # Ensure dummy microbatches contribute neither gradients nor metrics + num_valid_samples = 0 + loss *= 0 + + # Backward pass @@ - if num_valid_samples > 0: + if (mb_idx < iterator_len) and (num_valid_samples > 0): mb_losses.append(loss.item()) all_mb_metrics.append(loss_metrics)This keeps the original gradient semantics (dummy batches still backpropagate zero) while ensuring metrics and loss aggregation only use real microbatches.
1276-1289:score()drops temperature scaling for non‑DTensor logitsIn
score()you correctly apply temperature scaling:if not hasattr(outputs, "logits"): logits = self.model.lm_head(outputs.last_hidden_state) else: logits = outputs.logits # Apply temperature scaling logits = self._apply_temperature_scaling(logits)But immediately after you do:
if isinstance(logits, DTensor): logits = logits.to(torch.float32) else: logits = outputs.logits.to(torch.float32)For the non‑DTensor case this re‑derives
logitsfromoutputs.logits, discarding the temperature scaling that was just applied. It also relies onoutputsoutside the autocast block unnecessarily.You can fix this by always casting from the already‑scaled
logitstensor:- if isinstance(logits, DTensor): - logits = logits.to(torch.float32) - else: - logits = outputs.logits.to(torch.float32) + # Always cast from the already temperature‑scaled logits + if isinstance(logits, DTensor): + logits = logits.to(torch.float32) + else: + logits = logits.to(torch.float32)Optionally
del outputsright after extractinglogitsto free memory earlier.
🧹 Nitpick comments (3)
tests/unit/environments/test_reward_model_environment.py (1)
74-81: DTensor worker path selection looks consistent with registry and PolicySwitching
reward_model_py_executable_classto the new workers subpackage and gating between V2/non‑V2 ondtensor_cfg["_v2"]matches the updated registry andPolicyworker path selection. Consider adding/keeping a test that exercises the_v2=Falsebranch as well so both worker classes stay covered.nemo_rl/models/policy/workers/dtensor_policy_worker_v2.py (1)
465-476:init_collectiveduplicates BasePolicyWorker logic
DTensorPolicyWorkerV2.init_collective()re‑implements exactly the same NCCL communicator setup that already exists inBasePolicyWorker.init_collective.To reduce duplication and keep behavior consistent if the base implementation evolves, consider either:
- Removing this override entirely and using the base method, or
- Having this method delegate directly to
super().init_collective(...)if you want to keep the docstring locally.nemo_rl/models/policy/workers/base_policy_worker.py (1)
155-161: Consider adding return type hint for context manager.The docstring indicates this is a context manager, but the signature lacks a return type hint. Based on the subclass implementations in the relevant snippets, this should return
Generator[None, None, None].+from typing import Generator + @abstractmethod -def use_reference_model(self): +def use_reference_model(self) -> Generator[None, None, None]: """Context manager that temporarily swaps the reference model and active model. On entry: Moves model to CPU, moves reference_model to CUDA. Swaps the references On exit: Restores original references and re-flips cuda/cpu """ ...
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
.github/workflows/_automodel_integration_check.yml(1 hunks)docs/fp8.md(1 hunks)nemo_rl/distributed/ray_actor_environment_registry.py(1 hunks)nemo_rl/models/policy/lm_policy.py(2 hunks)nemo_rl/models/policy/workers/base_policy_worker.py(1 hunks)nemo_rl/models/policy/workers/dtensor_policy_worker_v2.py(3 hunks)nemo_rl/models/policy/workers/megatron_policy_worker.py(2 hunks)tests/unit/_plugins/remote_select.py(2 hunks)tests/unit/environments/test_reward_model_environment.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
**/*.py: Conform code to Python 3.12+
Indent code with 4 spaces. Do not use tabs
Use snake_case for file names
Use PascalCase for class names
Use snake_case for function and method names
Use snake_case for local variables
Prefix variable names that start with a number with 'k' (e.g., k_99th_percentile)
Use upper snake_case with 'G' prefix for global variables (e.g., G_MY_GLOBAL)
Use upper snake_case for constants
Avoid shadowing variables declared in an outer scope
Initialize all externally visible members of a class in the constructor
Prefer docstrings over comments for interfaces that may be used outside a file
Reserve comments for code within a function or interfaces that are local to a file
If a piece of code is commented out, include a comment describing its usage and why it's commented out. Remove debug comments before merging
Use Google style docstrings for classes and functions in Python, which can be parsed by Sphinx
Avoid using reflection when functionality can be easily achieved without reflection
When using try-except blocks, limit the except clause to the smallest set of specific errors possible
When using try-except blocks for duck-typing, keep the body of the try as small as possible and use the else block for logic
YAML is the single source of truth for configuration defaults. Do not set non-None defaults in code for configuration values
For required configuration attributes, access config directly and expect presence (e.g., policy_cfg['precision']) without hidden defaults
Use typing.NotRequired to mark optional attributes in TypedDict for configuration
When adding a new config key to a TypedDict subclass, document the key's purpose, valid values/types, and recommended default, and reflect the default in exemplar YAMLs under examples/configs/*.yaml
Follow the Google Python Style Guide for Python code
Files:
nemo_rl/models/policy/workers/dtensor_policy_worker_v2.pynemo_rl/distributed/ray_actor_environment_registry.pytests/unit/environments/test_reward_model_environment.pynemo_rl/models/policy/workers/base_policy_worker.pynemo_rl/models/policy/workers/megatron_policy_worker.pytests/unit/_plugins/remote_select.pynemo_rl/models/policy/lm_policy.py
nemo_rl/**/*.py
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
For any source file under nemo_rl/*.py that defines a class or function decorated with @ray.remote, add a coverage pragma (# pragma: no cover) because these run in separate Ray processes
Files:
nemo_rl/models/policy/workers/dtensor_policy_worker_v2.pynemo_rl/distributed/ray_actor_environment_registry.pynemo_rl/models/policy/workers/base_policy_worker.pynemo_rl/models/policy/workers/megatron_policy_worker.pynemo_rl/models/policy/lm_policy.py
!(**/tests/**|**/test_*.py|**/test_*.sh)
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
Add the NVIDIA copyright header to all Python files and shell scripts (excluding tests). The header should include the current year
Files:
nemo_rl/models/policy/workers/dtensor_policy_worker_v2.pynemo_rl/distributed/ray_actor_environment_registry.py.github/workflows/_automodel_integration_check.ymltests/unit/environments/test_reward_model_environment.pydocs/fp8.mdnemo_rl/models/policy/workers/base_policy_worker.pynemo_rl/models/policy/workers/megatron_policy_worker.pytests/unit/_plugins/remote_select.pynemo_rl/models/policy/lm_policy.py
**/*.{py,sh}
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
The NVIDIA copyright header should appear at the top of all Python files and shell scripts (excluding tests)
Files:
nemo_rl/models/policy/workers/dtensor_policy_worker_v2.pynemo_rl/distributed/ray_actor_environment_registry.pytests/unit/environments/test_reward_model_environment.pynemo_rl/models/policy/workers/base_policy_worker.pynemo_rl/models/policy/workers/megatron_policy_worker.pytests/unit/_plugins/remote_select.pynemo_rl/models/policy/lm_policy.py
docs/**/*.md
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
Update docs/index.md when a new markdown doc is added under docs/**/*.md or a markdown file is renamed, ensuring the document appears in the most appropriate section
Files:
docs/fp8.md
🧬 Code graph analysis (3)
nemo_rl/distributed/ray_actor_environment_registry.py (1)
nemo_rl/distributed/virtual_cluster.py (1)
PY_EXECUTABLES(43-59)
nemo_rl/models/policy/workers/base_policy_worker.py (5)
nemo_rl/distributed/batched_data_dict.py (1)
BatchedDataDict(75-860)nemo_rl/models/policy/interfaces.py (2)
LogprobOutputSpec(25-28)ReferenceLogprobOutputSpec(31-34)nemo_rl/models/policy/workers/dtensor_policy_worker_v2.py (3)
init_collective(465-475)use_reference_model(1581-1607)train(489-865)nemo_rl/utils/nvml.py (1)
get_device_uuid(55-77)nemo_rl/models/policy/workers/megatron_policy_worker.py (2)
use_reference_model(1358-1406)train(900-1150)
nemo_rl/models/policy/workers/megatron_policy_worker.py (1)
nemo_rl/models/policy/workers/base_policy_worker.py (1)
BasePolicyWorker(15-218)
🪛 GitHub Actions: Copyright check
nemo_rl/models/policy/workers/base_policy_worker.py
[error] 1-1: Copyright check failed: Found files with missing copyright notices.
🪛 Ruff (0.14.7)
nemo_rl/models/policy/workers/base_policy_worker.py
19-19: Unused method argument: train_world_size
(ARG002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Lint check
- GitHub Check: Post submodule check comment / Comment on PR
🔇 Additional comments (12)
docs/fp8.md (1)
73-80: Updated error trace path matches new Megatron worker moduleThe FP8 training error example now points at
nemo_rl.models.policy.workers.megatron_policy_worker.MegatronPolicyWorker, which matches the new workers subpackage layout and class name. No further changes needed.tests/unit/_plugins/remote_select.py (1)
63-74: Remote‑select example updated to new workers layoutThe example nodeid and mapped file path now reference
models/policy/workers/dtensor_policy_worker.py, which aligns with the new directory structure. No behavioral impact..github/workflows/_automodel_integration_check.yml (1)
133-139: Automodel consistency check paths correctly point to workers subpackageThe DTensor worker file variables now reference
nemo_rl/models/policy/workers/dtensor_policy_worker*.py, matching the code move. The synchronization logic and messages remain valid.nemo_rl/distributed/ray_actor_environment_registry.py (1)
27-46: Actor registry updates align with new worker locations and backendsThe registry now maps:
workers.dtensor_policy_worker.DTensorPolicyWorker→VLLM_EXECUTABLE(as per existing vLLM coupling comment)workers.dtensor_policy_worker_v2.DTensorPolicyWorkerV2→PY_EXECUTABLES.AUTOMODELworkers.megatron_policy_worker.MegatronPolicyWorker→MCORE_EXECUTABLEThese FQNs match the new module structure and the corresponding backends. The reward‑model environment test and
Policyalso use the same strings.nemo_rl/models/policy/workers/dtensor_policy_worker_v2.py (1)
88-101: BasePolicyWorker integration for DTensorPolicyWorkerV2 looks structurally soundSwitching
DTensorPolicyWorkerV2to subclassBasePolicyWorkerreuses shared logic for ZMQ setup, GPU info, free‑memory queries, reference‑policy logprobs, and profiling. The@ray.remote(... ) # pragma: no coverannotation also matches the coverage guideline for Ray actors undernemo_rl/.nemo_rl/models/policy/lm_policy.py (1)
82-125: Worker class paths and_v2switch match new workers layoutThe
Policyconstructor now selects:
- Megatron:
"nemo_rl.models.policy.workers.megatron_policy_worker.MegatronPolicyWorker"- DTensor v2:
"nemo_rl.models.policy.workers.dtensor_policy_worker_v2.DTensorPolicyWorkerV2"whendtensor_cfg["_v2"]is true- DTensor v1:
"nemo_rl.models.policy.workers.dtensor_policy_worker.DTensorPolicyWorker"otherwiseThese strings are consistent with the new module locations and the updated actor registry and tests.
If
_v2and any newdtensor_cfgkeys (e.g., those used inDTensorPolicyWorkerV2for cache clearing) were just introduced, ensure their TypedDict definitions and example YAML configs are updated to document purpose and valid values.nemo_rl/models/policy/workers/megatron_policy_worker.py (1)
120-127: MegatronPolicyWorker cleanly migrates to BasePolicyWorkerImporting and subclassing
BasePolicyWorkercentralizes:
- collective initialization (
init_collectiveandmodel_update_group),- basic liveness/memory/profiling utilities,
- ZMQ socket setup and
get_free_memory_bytes, and- reference‑policy logprobs via
get_reference_policy_logprobs+use_reference_model.Within this worker you already set
self.rankbefore any potentialinit_collectiveuse and rely onmaybe_init_zmq()/model_update_grouponly in methods that are now implemented in the base, so the refactor looks consistent with the Policy- and registry‑side changes.nemo_rl/models/policy/workers/base_policy_worker.py (5)
18-36: Unusedtrain_world_sizeparameter.The
train_world_sizeparameter is declared but never used in the method body. The docstring mentions it's "used in inference cluster" but that usage isn't implemented here.If this parameter is reserved for future use or API consistency with subclasses, consider adding a comment or using
_ = train_world_sizeto explicitly acknowledge it's intentionally unused. Otherwise, remove it.
38-49: LGTM with a note on implicit attributes.These utility methods are straightforward. Note that
get_gpu_infoassumesself.modelexists, which is an implicit contract subclasses must fulfill. This is acceptable for an ABC but could benefit from a class-level docstring documenting required attributes.
67-79: LGTM - ZMQ initialization is reasonable.The lazy initialization pattern and timeout/linger settings are appropriate. Note that REQ sockets with
bind()is an unusual pattern (typically REQ connects and REP binds), but this may be intentional for your IPC architecture.
88-93: LGTM!The shutdown method correctly cleans up ZMQ resources with proper existence checks.
129-218: LGTM - Abstract interface is well-defined.The abstract methods provide a clean contract for policy worker implementations. The signatures align with the existing subclass implementations shown in the relevant code snippets.
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
jgerh
left a comment
There was a problem hiding this comment.
Completed tech pubs review and provided a few copyedits.
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
ℹ️ File Consistency CheckCheck based on commit: 39c5f36 (PR #1585 from ✅ DTensor Policy Worker Synchronization CheckBoth DTensor policy worker files were modified in this PR:
Please ensure that the changes are consistent between both files where applicable. This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning. |
Signed-off-by: ashors1 <ashors@nvidia.com>
ℹ️ File Consistency CheckCheck based on commit: a93db16 (PR #1585 from ✅ DTensor Policy Worker Synchronization CheckBoth DTensor policy worker files were modified in this PR:
Please ensure that the changes are consistent between both files where applicable. This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning. |
Signed-off-by: ashors1 <ashors@nvidia.com>
ℹ️ File Consistency CheckCheck based on commit: 678e0ee (PR #1585 from ✅ DTensor Policy Worker Synchronization CheckBoth DTensor policy worker files were modified in this PR:
Please ensure that the changes are consistent between both files where applicable. This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning. |
ℹ️ File Consistency CheckCheck based on commit: 81b085b (PR #1585 from ✅ DTensor Policy Worker Synchronization CheckBoth DTensor policy worker files were modified in this PR:
Please ensure that the changes are consistent between both files where applicable. This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning. |
ℹ️ File Consistency CheckCheck based on commit: 6a2ee7c (PR #1585 from ✅ DTensor Policy Worker Synchronization CheckBoth DTensor policy worker files were modified in this PR:
Please ensure that the changes are consistent between both files where applicable. This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning. |
Signed-off-by: ashors1 <ashors@nvidia.com>
ℹ️ File Consistency CheckCheck based on commit: 0b7e18b (PR #1585 from ✅ DTensor Policy Worker Synchronization CheckBoth DTensor policy worker files were modified in this PR:
Please ensure that the changes are consistent between both files where applicable. This check ensures that related file implementations remain synchronized across the codebase. If you believe this warning is incorrect or the files should intentionally differ, please add a comment explaining the reasoning. |
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com> Signed-off-by: yuanhangs <yuanhangs@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com> Signed-off-by: yuanhangs <yuanhangs@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
Signed-off-by: ashors1 <ashors@nvidia.com>
What does this PR do ?
Adds
BasePolicyWorkerclass to enforce common APIs between DTensor and Megatron backendsIssues
List issues that this PR closes (syntax):
Usage
# Add a code snippet demonstrating how to use thisBefore your PR is "Ready for review"
Pre checks:
Additional Information
Summary by CodeRabbit
Release Notes
✏️ Tip: You can customize this high-level summary in your review settings.