diff --git a/examples/nemo_gym/grpo_nanov3.yaml b/examples/nemo_gym/grpo_nanov3.yaml new file mode 100644 index 0000000000..82c3f089dc --- /dev/null +++ b/examples/nemo_gym/grpo_nanov3.yaml @@ -0,0 +1,289 @@ +# GRPO Algorithm Configuration for training Nano v3 +grpo: + num_prompts_per_step: 128 + num_generations_per_prompt: 16 + num_val_generations_per_prompt: 4 + max_rollout_turns: 1 # for multi-turn rollouts. Math Environments just have 1 turn (answering the question) + max_num_epochs: 1 + max_num_steps: 1000000 + normalize_rewards: true + use_leave_one_out_baseline: true + val_period: 5 + val_at_start: False + val_at_end: False + overlong_filtering: true + max_val_samples: null + val_batch_size: 256 + seed: 42 + async_grpo: + enabled: false # Set to true to enable async training mode + # Max age (in training steps) for trajectories used in training + max_trajectory_age_steps: 1 + + batch_multiplier: 1 + use_dynamic_sampling: False + reward_shaping: + enabled: False + reward_scaling: + enabled: False + + seq_logprob_error_threshold: 2 + +loss_fn: + reference_policy_kl_penalty: 0 + reference_policy_kl_type: k3 + kl_input_clamp_value: null + kl_output_clamp_value: null + ratio_clip_min: 0.2 + ratio_clip_max: 0.28 + ratio_clip_c: null + # (default off) loss formulation improvements (docs/guides/grpo.md#loss) + use_on_policy_kl_approximation: True + use_importance_sampling_correction: True + sequence_level_importance_ratios: False + token_level_loss: True + truncated_importance_sampling_ratio: null + +checkpointing: + enabled: true + checkpoint_dir: "results/grpo" + metric_name: "val:total_reward/mean" + higher_is_better: true + keep_top_k: 1000000 + save_period: 10 + checkpoint_must_save_by: "00:03:40:00" + +policy: + model_name: "/path/to/hf_checkpoint" + tokenizer: + name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default + train_global_batch_size: 2048 + train_micro_batch_size: 1 + generation_batch_size: 64 # Only used when generating using HF backend + logprob_batch_size: 1 + max_total_sequence_length: 49152 + precision: "bfloat16" + logprob_chunk_size: 2048 + + dtensor_cfg: + _v2: true + enabled: false + cpu_offload: False + sequence_parallel: false + activation_checkpointing: false + tensor_parallel_size: 1 + context_parallel_size: 1 + custom_parallel_plan: null + + megatron_cfg: + enabled: true + empty_unused_memory_level: 1 + activation_checkpointing: true + bias_activation_fusion: False + # converter_type: "Qwen2ForCausalLM" + tensor_model_parallel_size: 2 + expert_tensor_parallel_size: 1 + expert_model_parallel_size: 8 + pipeline_model_parallel_size: 2 + num_layers_in_first_pipeline_stage: null + num_layers_in_last_pipeline_stage: null + context_parallel_size: 4 + pipeline_dtype: ${policy.precision} + sequence_parallel: true + freeze_moe_router: true + # moe_router_dtype: "fp64" + moe_router_dtype: "fp32" + moe_router_load_balancing_type: "none" # "seq_aux_loss" causes logprob error divergence for grpo + moe_router_bias_update_rate: 1e-3 + moe_permute_fusion: true + moe_enable_deepep: false + moe_token_dispatcher_type: "alltoall" + moe_aux_loss_coeff: 0.0 + moe_router_enable_expert_bias: true + #gives ~20% training perf speedup with sequence packing + apply_rope_fusion: True + defer_fp32_logits: True + track_moe_metrics: True + moe_per_layer_logging: True + moe_shared_expert_overlap: false + + optimizer: + optimizer: "adam" + lr: 3e-6 + min_lr: 3e-6 + weight_decay: 0.0 + bf16: true + fp16: false + params_dtype: "float32" + + #adam + adam_beta1: 0.9 + adam_beta2: 0.999 + adam_eps: 1e-8 + + #sgd + sgd_momentum: 0.9 + + clip_grad: ${policy.max_grad_norm} + + #distributed optimizer + use_distributed_optimizer: true + use_precision_aware_optimizer: true + + optimizer_cpu_offload: False + optimizer_offload_fraction: 0 + + scheduler: + start_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + end_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + weight_decay_incr_style: "constant" + lr_decay_style: "constant" + lr_decay_iters: null + lr_warmup_iters: 10 + lr_warmup_init: 3.0e-7 + + distributed_data_parallel_config: + grad_reduce_in_fp32: false + overlap_grad_reduce: true + overlap_param_gather: true + average_in_collective: false + use_custom_fsdp: false + data_parallel_sharding_strategy: "optim_grads_params" + + env_vars: null + + # See docs/design-docs/sequence-packing-and-dynamic-batching.md + # for more details on dynamic batching and sequence packing. + dynamic_batching: + enabled: False + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + sequence_length_round: 64 + + sequence_packing: + enabled: True + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + algorithm: "modified_first_fit_decreasing" + sequence_length_round: 64 + + # makes the training sequence length divisible by the tensor parallel size + # this is useful for sequence parallel training + make_sequence_length_divisible_by: ${policy.megatron_cfg.tensor_model_parallel_size} + max_grad_norm: 1.0 + + optimizer: null # remove default FSDP optimizer + scheduler: null + + offload_optimizer_for_logprob: False + + generation: + backend: "vllm" + max_new_tokens: ${policy.max_total_sequence_length} + temperature: 1.0 + top_p: 1.0 + top_k: null + stop_token_ids: null + stop_strings: null + vllm_cfg: + # NB: can re-enable prefix cache on vllm >= 0.11.2. + # enable_prefix_caching: false + async_engine: false + kv_cache_dtype: auto + precision: ${policy.precision} + tensor_parallel_size: 4 + pipeline_parallel_size: 1 + expert_parallel_size: 1 # When EP > 1, EP must be a multiple of TP since vLLM's EP = DP * TP + gpu_memory_utilization: 0.5 + max_model_len: ${policy.max_total_sequence_length} + # when enforce_eager is False, it is optional to set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy, + # with the flag, vllm will use the custom CUDA kernels instead of the Triton kernels generated by torch.compile + # for more details, see convergence issue https://github.com/NVIDIA-NeMo/RL/issues/998 + enforce_eager: False + use_deep_gemm: False + num_last_layers_in_bf16: 0 + num_first_layers_in_bf16: 0 + expose_http_server: true + http_server_serving_chat_kwargs: + enable_auto_tools: true + tool_parser: qwen3_coder + reasoning_parser: deepseek_r1 + + vllm_kwargs: + mamba_ssm_cache_dtype: "float32" + compilation_config: + # when enforce_eager is False, set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy, + # with the flag, vllm will use the custom CUDA kernels instead of the Triton kernels generated by torch.compile + # for more details, see convergence issue https://github.com/NVIDIA-NeMo/RL/issues/998 + use_inductor: False + colocated: + # true: generation shares training GPUs + # false: uses dedicated generation resources + enabled: true + # only relevant when enabled is false + resources: + gpus_per_node: null # Decides num gpus to be dedicated to generation when there is one node in the cluster i.e cluster.num_nodes == 1 + num_nodes: null # Decides number of nodes to be dedicated to generation + +data: + max_input_seq_length: null + shuffle: False + num_workers: 1 + + train: + data_path: "/path/to/train.jsonl" + validation: + data_path: "/path/to/validation.jsonl" + default: + dataset_name: NemoGymDataset + env_name: "nemo_gym" + prompt_file: null + system_prompt_file: null + processor: "nemo_gym_data_processor" + +env: + should_use_nemo_gym: true + nemo_gym: + config_paths: + - responses_api_models/vllm_model/configs/vllm_model_for_training.yaml # Required! And it must be *for_training + - resources_servers/math_with_judge/configs/math_with_judge.yaml + - resources_servers/code_gen/configs/code_gen.yaml + - resources_servers/workplace_assistant/configs/workplace_assistant.yaml + - resources_servers/mcqa/configs/mcqa.yaml + - resources_servers/instruction_following/configs/instruction_following.yaml + - resources_servers/structured_outputs/configs/structured_outputs_json.yaml + math_with_judge: + resources_servers: + math_with_judge: + judge_model_server: + name: policy_model + should_use_judge: false + code_gen: + resources_servers: + code_gen: + num_processes: 1024 + unit_test_timeout_secs: 10 + debug: false + +logger: + log_dir: "logs" # Base directory for all logs + num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal + wandb_enabled: false + tensorboard_enabled: false + mlflow_enabled: false # Disable MLflow logging + monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard + swanlab_enabled: false # Disable SwanLab logging + wandb: + project: "grpo-dev" + name: "grpo-dev-logger" + tensorboard: {} + mlflow: + experiment_name: "grpo-dev" + run_name: "grpo-dev-logger" + gpu_monitoring: + collection_interval: 10 # How often to collect GPU usage metrics (in seconds) + flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds) + +cluster: + gpus_per_node: 8 + num_nodes: 32 diff --git a/nemo_rl/algorithms/grpo.py b/nemo_rl/algorithms/grpo.py index d349500516..6772739655 100644 --- a/nemo_rl/algorithms/grpo.py +++ b/nemo_rl/algorithms/grpo.py @@ -1801,7 +1801,6 @@ def grpo_train( # Get flat advantages and token mask for masked metrics computation flat_advantages = train_data["advantages"] flat_token_mask = flat_messages["token_loss_mask"] - del flat_messages # Filter advantages using token mask (only valid response tokens) response_advantages = torch.masked_select( @@ -1962,21 +1961,29 @@ def grpo_train( # Log training data memory_tracker.snapshot_start_of_stage("Logging", dir()) if not _should_log_nemo_gym_responses(master_config): - log_data = {"content": metrics_logging_data["content"]} + log_data = {} + if "agent_ref" in repeated_batch: + log_data["agent_ref"] = repeated_batch["agent_ref"] + log_data["content"] = flat_messages["content"] log_data["rewards"] = rewards.tolist() if master_config["grpo"]["use_dynamic_sampling"]: log_data["filtered_rewards"] = rewards.tolist() log_data["rewards"] = repeated_batch["total_reward"].tolist() - + log_data["input_lengths"] = input_lengths.tolist() + log_data["token_ids"] = train_data["input_ids"].tolist() + log_data["token_loss_mask"] = train_data["token_mask"].tolist() + log_data["sample_loss_mask"] = train_data["sample_mask"].tolist() + log_data["advantages"] = train_data["advantages"].tolist() log_data["generation_logprobs"] = train_data[ "generation_logprobs" ].tolist() log_data["prev_logprobs"] = train_data["prev_logprobs"].tolist() - log_data["input_lengths"] = input_lengths.tolist() + logger.log_batched_dict_as_jsonl( log_data, f"train_data_step{total_steps + 1}.jsonl" ) del log_data + del flat_messages timing_metrics: dict[str, float] = timer.get_timing_metrics( reduction_op="sum" diff --git a/nemo_rl/experience/rollouts.py b/nemo_rl/experience/rollouts.py index 231820fa10..603a972095 100644 --- a/nemo_rl/experience/rollouts.py +++ b/nemo_rl/experience/rollouts.py @@ -979,6 +979,14 @@ async def run_single_sample_with_error_handling(i, sample_state): return asyncio.run(_async_rollout_implementation()) +def _tensorize_by_key(message_logs: list, key: str): + if not message_logs or key not in message_logs[0]: + return + + for m in message_logs: + m[key] = torch.tensor(m[key]) + + @dataclass class AsyncNemoGymRolloutResult: input_ids: torch.Tensor @@ -1058,6 +1066,15 @@ def run_async_nemo_gym_rollout( ) ) + # Tensorize all token ids + for r in results: + _tensorize_by_key(r["input_message_log"], "token_ids") + _tensorize_by_key(r["message_log"], "token_ids") + _tensorize_by_key( + [m for m in r["message_log"] if m["role"] == "assistant"], + "generation_logprobs", + ) + # Prepare for the rollout metrics calculation below. Not strictly necessary here, but good to have parity with `run_async_multi_turn_rollout` with timer.time(f"{timer_prefix}/prepare_for_metrics_calculation"): batch_size = len(nemo_gym_rows) @@ -1120,8 +1137,10 @@ def run_async_nemo_gym_rollout( with timer.time(f"{timer_prefix}/per_agent_misc_metrics"): agent_to_results: dict[str, list[dict]] = defaultdict(list) for nemo_gym_row, result in zip(nemo_gym_rows, results): - agent_name = nemo_gym_row["agent_ref"]["name"] + agent_ref = nemo_gym_row["agent_ref"] + agent_name = agent_ref["name"] agent_to_results[agent_name].append(result["full_result"]) + result["agent_ref"] = agent_ref per_agent_metrics = {} for agent_name, agent_results in agent_to_results.items(): @@ -1168,6 +1187,7 @@ def run_async_nemo_gym_rollout( final_batch = BatchedDataDict[DatumSpec]( { + "agent_ref": [r["agent_ref"] for r in results], "message_log": [r["message_log"] for r in results], # length is used downstream for mean_prompt_length "length": torch.tensor( @@ -1181,6 +1201,10 @@ def run_async_nemo_gym_rollout( # stop_strings: NotRequired[list[str]] # Optional stop strings for generation # Extra information not in the DatumSpec used by the GRPO algorithm "total_reward": torch.tensor([r["full_result"]["reward"] for r in results]), + # Add truncated field to match other rollout paths (reusing hit_max_tokens logic) + "truncated": torch.tensor( + [m["hit_max_tokens"] for m in all_sample_metrics], dtype=torch.bool + ), } ) diff --git a/nemo_rl/models/generation/__init__.py b/nemo_rl/models/generation/__init__.py index 25bb0596df..90575e77b1 100644 --- a/nemo_rl/models/generation/__init__.py +++ b/nemo_rl/models/generation/__init__.py @@ -57,7 +57,11 @@ def configure_generation_config( # Respect the skip_tokenizer_init setting from the config. VLMs for example, require this to be False. if "skip_tokenizer_init" not in config["vllm_cfg"]: # set skip_tokenizer_init - if is_eval or config["stop_strings"] is not None: + if ( + is_eval + or config["stop_strings"] is not None + or config["vllm_cfg"].get("expose_http_server", None) + ): config["vllm_cfg"]["skip_tokenizer_init"] = False else: config["vllm_cfg"]["skip_tokenizer_init"] = True