Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions nemo_rl/algorithms/grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -1471,25 +1471,27 @@ def async_grpo_train(
train_results = policy.train(train_data, loss_fn)

print("🔄 Synchronizing policy weights to trajectory collector…")
with timer.time("weight_sync"):
if NEED_REFIT:
# Coordinate with trajectory collector before refit
print(
"🔄 Coordinating with trajectory collector before refit..."
)
if NEED_REFIT:
# Measure pending-generation wait as exposed_generation time
print(
"🔄 Coordinating with trajectory collector before refit..."
)
with timer.time("exposed_generation"):
ray.get(trajectory_collector.prepare_for_refit.remote())

print("🔄 Performing policy generation refit...")
# Only the actual refit/weight transfer should be counted as weight_sync
print("🔄 Performing policy generation refit...")
with timer.time("weight_sync"):
refit_policy_generation(
policy, policy_generation, colocated_inference
)
POLICY_GENERATION_STALE = False

trajectory_collector.resume_after_refit.remote()
trajectory_collector.resume_after_refit.remote()

# Notify collector about the new weight version (post-update)
weight_version += 1
trajectory_collector.set_weight_version.remote(weight_version)
# Notify collector about the new weight version (post-update)
weight_version += 1
trajectory_collector.set_weight_version.remote(weight_version)

# Validation
val_metrics, validation_timings = None, None
Expand Down
Loading