Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 289 additions & 0 deletions examples/nemo_gym/grpo_nanov3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
# GRPO Algorithm Configuration for training Nano v3
grpo:
num_prompts_per_step: 128
num_generations_per_prompt: 16
num_val_generations_per_prompt: 4
max_rollout_turns: 1 # for multi-turn rollouts. Math Environments just have 1 turn (answering the question)
max_num_epochs: 1
max_num_steps: 1000000
normalize_rewards: true
use_leave_one_out_baseline: true
val_period: 5
val_at_start: False
val_at_end: False
overlong_filtering: true
max_val_samples: null
val_batch_size: 256
seed: 42
async_grpo:
enabled: false # Set to true to enable async training mode
# Max age (in training steps) for trajectories used in training
max_trajectory_age_steps: 1

batch_multiplier: 1
use_dynamic_sampling: False
reward_shaping:
enabled: False
reward_scaling:
enabled: False

seq_logprob_error_threshold: 2

loss_fn:
reference_policy_kl_penalty: 0
reference_policy_kl_type: k3
kl_input_clamp_value: null
kl_output_clamp_value: null
ratio_clip_min: 0.2
ratio_clip_max: 0.28
ratio_clip_c: null
# (default off) loss formulation improvements (docs/guides/grpo.md#loss)
use_on_policy_kl_approximation: True
use_importance_sampling_correction: True
sequence_level_importance_ratios: False
token_level_loss: True
truncated_importance_sampling_ratio: null

checkpointing:
enabled: true
checkpoint_dir: "results/grpo"
metric_name: "val:total_reward/mean"
higher_is_better: true
keep_top_k: 1000000
save_period: 10
checkpoint_must_save_by: "00:03:40:00"

policy:
model_name: "/path/to/hf_checkpoint"
tokenizer:
name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default
train_global_batch_size: 2048
train_micro_batch_size: 1
generation_batch_size: 64 # Only used when generating using HF backend
logprob_batch_size: 1
max_total_sequence_length: 49152
precision: "bfloat16"
logprob_chunk_size: 2048

dtensor_cfg:
_v2: true
enabled: false
cpu_offload: False
sequence_parallel: false
activation_checkpointing: false
tensor_parallel_size: 1
context_parallel_size: 1
custom_parallel_plan: null

megatron_cfg:
enabled: true
empty_unused_memory_level: 1
activation_checkpointing: true
bias_activation_fusion: False
# converter_type: "Qwen2ForCausalLM"
tensor_model_parallel_size: 2
expert_tensor_parallel_size: 1
expert_model_parallel_size: 8
pipeline_model_parallel_size: 2
num_layers_in_first_pipeline_stage: null
num_layers_in_last_pipeline_stage: null
context_parallel_size: 4
pipeline_dtype: ${policy.precision}
sequence_parallel: true
freeze_moe_router: true
# moe_router_dtype: "fp64"
moe_router_dtype: "fp32"
moe_router_load_balancing_type: "none" # "seq_aux_loss" causes logprob error divergence for grpo
moe_router_bias_update_rate: 1e-3
moe_permute_fusion: true
moe_enable_deepep: false
moe_token_dispatcher_type: "alltoall"
moe_aux_loss_coeff: 0.0
moe_router_enable_expert_bias: true
#gives ~20% training perf speedup with sequence packing
apply_rope_fusion: True
defer_fp32_logits: True
track_moe_metrics: True
moe_per_layer_logging: True
moe_shared_expert_overlap: false

optimizer:
optimizer: "adam"
lr: 3e-6
min_lr: 3e-6
weight_decay: 0.0
bf16: true
fp16: false
params_dtype: "float32"

#adam
adam_beta1: 0.9
adam_beta2: 0.999
adam_eps: 1e-8

#sgd
sgd_momentum: 0.9

clip_grad: ${policy.max_grad_norm}

#distributed optimizer
use_distributed_optimizer: true
use_precision_aware_optimizer: true

optimizer_cpu_offload: False
optimizer_offload_fraction: 0

scheduler:
start_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay}
end_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay}
weight_decay_incr_style: "constant"
lr_decay_style: "constant"
lr_decay_iters: null
lr_warmup_iters: 10
lr_warmup_init: 3.0e-7

distributed_data_parallel_config:
grad_reduce_in_fp32: false
overlap_grad_reduce: true
overlap_param_gather: true
average_in_collective: false
use_custom_fsdp: false
data_parallel_sharding_strategy: "optim_grads_params"

env_vars: null

# See docs/design-docs/sequence-packing-and-dynamic-batching.md
# for more details on dynamic batching and sequence packing.
dynamic_batching:
enabled: False
train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}}
logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}}
sequence_length_round: 64

sequence_packing:
enabled: True
train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}}
logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}}
algorithm: "modified_first_fit_decreasing"
sequence_length_round: 64

# makes the training sequence length divisible by the tensor parallel size
# this is useful for sequence parallel training
make_sequence_length_divisible_by: ${policy.megatron_cfg.tensor_model_parallel_size}
max_grad_norm: 1.0

optimizer: null # remove default FSDP optimizer
scheduler: null

offload_optimizer_for_logprob: False

generation:
backend: "vllm"
max_new_tokens: ${policy.max_total_sequence_length}
temperature: 1.0
top_p: 1.0
top_k: null
stop_token_ids: null
stop_strings: null
vllm_cfg:
# NB: can re-enable prefix cache on vllm >= 0.11.2.
# enable_prefix_caching: false
async_engine: false
kv_cache_dtype: auto
precision: ${policy.precision}
tensor_parallel_size: 4
pipeline_parallel_size: 1
expert_parallel_size: 1 # When EP > 1, EP must be a multiple of TP since vLLM's EP = DP * TP
gpu_memory_utilization: 0.5
max_model_len: ${policy.max_total_sequence_length}
# when enforce_eager is False, it is optional to set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy,
# with the flag, vllm will use the custom CUDA kernels instead of the Triton kernels generated by torch.compile
# for more details, see convergence issue https://github.com/NVIDIA-NeMo/RL/issues/998
enforce_eager: False
use_deep_gemm: False
num_last_layers_in_bf16: 0
num_first_layers_in_bf16: 0
expose_http_server: true
http_server_serving_chat_kwargs:
enable_auto_tools: true
tool_parser: qwen3_coder
reasoning_parser: deepseek_r1

vllm_kwargs:
mamba_ssm_cache_dtype: "float32"
compilation_config:
# when enforce_eager is False, set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy,
# with the flag, vllm will use the custom CUDA kernels instead of the Triton kernels generated by torch.compile
# for more details, see convergence issue https://github.com/NVIDIA-NeMo/RL/issues/998
use_inductor: False
colocated:
# true: generation shares training GPUs
# false: uses dedicated generation resources
enabled: true
# only relevant when enabled is false
resources:
gpus_per_node: null # Decides num gpus to be dedicated to generation when there is one node in the cluster i.e cluster.num_nodes == 1
num_nodes: null # Decides number of nodes to be dedicated to generation

data:
max_input_seq_length: null
shuffle: False
num_workers: 1

train:
data_path: "/path/to/train.jsonl"
validation:
data_path: "/path/to/validation.jsonl"
default:
dataset_name: NemoGymDataset
env_name: "nemo_gym"
prompt_file: null
system_prompt_file: null
processor: "nemo_gym_data_processor"

env:
should_use_nemo_gym: true
nemo_gym:
config_paths:
- responses_api_models/vllm_model/configs/vllm_model_for_training.yaml # Required! And it must be *for_training
- resources_servers/math_with_judge/configs/math_with_judge.yaml
- resources_servers/code_gen/configs/code_gen.yaml
- resources_servers/workplace_assistant/configs/workplace_assistant.yaml
- resources_servers/mcqa/configs/mcqa.yaml
- resources_servers/instruction_following/configs/instruction_following.yaml
- resources_servers/structured_outputs/configs/structured_outputs_json.yaml
math_with_judge:
resources_servers:
math_with_judge:
judge_model_server:
name: policy_model
should_use_judge: false
code_gen:
resources_servers:
code_gen:
num_processes: 1024
unit_test_timeout_secs: 10
debug: false

logger:
log_dir: "logs" # Base directory for all logs
num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal
wandb_enabled: false
tensorboard_enabled: false
mlflow_enabled: false # Disable MLflow logging
monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard
swanlab_enabled: false # Disable SwanLab logging
wandb:
project: "grpo-dev"
name: "grpo-dev-logger"
tensorboard: {}
mlflow:
experiment_name: "grpo-dev"
run_name: "grpo-dev-logger"
gpu_monitoring:
collection_interval: 10 # How often to collect GPU usage metrics (in seconds)
flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds)

cluster:
gpus_per_node: 8
num_nodes: 32
15 changes: 11 additions & 4 deletions nemo_rl/algorithms/grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,6 @@ def grpo_train(
# Get flat advantages and token mask for masked metrics computation
flat_advantages = train_data["advantages"]
flat_token_mask = flat_messages["token_loss_mask"]
del flat_messages

# Filter advantages using token mask (only valid response tokens)
response_advantages = torch.masked_select(
Expand Down Expand Up @@ -1962,21 +1961,29 @@ def grpo_train(
# Log training data
memory_tracker.snapshot_start_of_stage("Logging", dir())
if not _should_log_nemo_gym_responses(master_config):
log_data = {"content": metrics_logging_data["content"]}
log_data = {}
if "agent_ref" in repeated_batch:
log_data["agent_ref"] = repeated_batch["agent_ref"]
log_data["content"] = flat_messages["content"]
log_data["rewards"] = rewards.tolist()
if master_config["grpo"]["use_dynamic_sampling"]:
log_data["filtered_rewards"] = rewards.tolist()
log_data["rewards"] = repeated_batch["total_reward"].tolist()

log_data["input_lengths"] = input_lengths.tolist()
log_data["token_ids"] = train_data["input_ids"].tolist()
log_data["token_loss_mask"] = train_data["token_mask"].tolist()
log_data["sample_loss_mask"] = train_data["sample_mask"].tolist()
log_data["advantages"] = train_data["advantages"].tolist()
log_data["generation_logprobs"] = train_data[
"generation_logprobs"
].tolist()
log_data["prev_logprobs"] = train_data["prev_logprobs"].tolist()
log_data["input_lengths"] = input_lengths.tolist()

logger.log_batched_dict_as_jsonl(
log_data, f"train_data_step{total_steps + 1}.jsonl"
)
del log_data
del flat_messages

timing_metrics: dict[str, float] = timer.get_timing_metrics(
reduction_op="sum"
Expand Down
26 changes: 25 additions & 1 deletion nemo_rl/experience/rollouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,14 @@ async def run_single_sample_with_error_handling(i, sample_state):
return asyncio.run(_async_rollout_implementation())


def _tensorize_by_key(message_logs: list, key: str):
if not message_logs or key not in message_logs[0]:
return

for m in message_logs:
m[key] = torch.tensor(m[key])


@dataclass
class AsyncNemoGymRolloutResult:
input_ids: torch.Tensor
Expand Down Expand Up @@ -1058,6 +1066,15 @@ def run_async_nemo_gym_rollout(
)
)

# Tensorize all token ids
for r in results:
_tensorize_by_key(r["input_message_log"], "token_ids")
_tensorize_by_key(r["message_log"], "token_ids")
_tensorize_by_key(
[m for m in r["message_log"] if m["role"] == "assistant"],
"generation_logprobs",
)

# Prepare for the rollout metrics calculation below. Not strictly necessary here, but good to have parity with `run_async_multi_turn_rollout`
with timer.time(f"{timer_prefix}/prepare_for_metrics_calculation"):
batch_size = len(nemo_gym_rows)
Expand Down Expand Up @@ -1120,8 +1137,10 @@ def run_async_nemo_gym_rollout(
with timer.time(f"{timer_prefix}/per_agent_misc_metrics"):
agent_to_results: dict[str, list[dict]] = defaultdict(list)
for nemo_gym_row, result in zip(nemo_gym_rows, results):
agent_name = nemo_gym_row["agent_ref"]["name"]
agent_ref = nemo_gym_row["agent_ref"]
agent_name = agent_ref["name"]
agent_to_results[agent_name].append(result["full_result"])
result["agent_ref"] = agent_ref

per_agent_metrics = {}
for agent_name, agent_results in agent_to_results.items():
Expand Down Expand Up @@ -1168,6 +1187,7 @@ def run_async_nemo_gym_rollout(

final_batch = BatchedDataDict[DatumSpec](
{
"agent_ref": [r["agent_ref"] for r in results],
"message_log": [r["message_log"] for r in results],
# length is used downstream for mean_prompt_length
"length": torch.tensor(
Expand All @@ -1181,6 +1201,10 @@ def run_async_nemo_gym_rollout(
# stop_strings: NotRequired[list[str]] # Optional stop strings for generation
# Extra information not in the DatumSpec used by the GRPO algorithm
"total_reward": torch.tensor([r["full_result"]["reward"] for r in results]),
# Add truncated field to match other rollout paths (reusing hit_max_tokens logic)
"truncated": torch.tensor(
[m["hit_max_tokens"] for m in all_sample_metrics], dtype=torch.bool
),
}
)

Expand Down
Loading
Loading