From 6ec3b95fade83e4254d0b793a4907c4c90ed6a4a Mon Sep 17 00:00:00 2001 From: ruit Date: Wed, 31 Dec 2025 02:26:46 -0800 Subject: [PATCH 01/13] support nemor gym config Signed-off-by: ruit --- ...17k_bytedtsinghua_qwen3_4binstruct_nf.yaml | 280 ++++++++++++++++++ 1 file changed, 280 insertions(+) create mode 100644 examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml diff --git a/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml b/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml new file mode 100644 index 0000000000..659b565711 --- /dev/null +++ b/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml @@ -0,0 +1,280 @@ +grpo: + max_num_epochs: 1 + num_prompts_per_step: 64 + num_generations_per_prompt: 16 + max_rollout_turns: 1 # for multi-turn rollouts. Math Environments just have 1 turn (answering the question) + max_num_steps: 1000000 + normalize_rewards: true + use_leave_one_out_baseline: true + val_period: 10 + val_at_start: true + overlong_filtering: false + max_val_samples: null # inferred from size of val dataset. for multi evals, repeat val ds via `num_repeats` in `ng_prepare_data`. + val_batch_size: null + seed: 42 + use_dynamic_sampling: false + dynamic_sampling_max_gen_batches: 10 + batch_multiplier: 1 + reward_shaping: + enabled: false + overlong_buffer_length: 128 + overlong_buffer_penalty: 1 + max_response_length: ${policy.max_total_sequence_length} + reward_scaling: + enabled: false + source_min: 0.0 + source_max: 1.0 + target_min: 0.0 + target_max: 1.0 + skip_reference_policy_logprobs_calculation: true + +loss_fn: + reference_policy_kl_penalty: 0 + reference_policy_kl_type: "k3" + kl_input_clamp_value: 20.0 + kl_output_clamp_value: 10.0 + ratio_clip_min: 0.2 + ratio_clip_max: 0.2 + ratio_clip_c: null + # (default off) loss formulation improvements (docs/guides/grpo.md#loss) + use_on_policy_kl_approximation: false + truncated_importance_sampling_ratio: null + use_importance_sampling_correction: false + token_level_loss: true + force_on_policy_ratio: false # Set to true to force ratio=1.0 (requires train_global_batch_size == num_prompts_per_step * num_generations_per_prompt) + +checkpointing: + enabled: true + checkpoint_dir: "results/grpo" + metric_name: "val:accuracy" + higher_is_better: true + keep_top_k: 3 + save_period: 1 + checkpoint_must_save_by: null + +policy: + model_name: "Qwen/Qwen3-4B-Instruct-2507" + tokenizer: + name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default + chat_template_kwargs: null # can be used to pass kwargs to the chat template, e.g., enable_thinking=true + hf_config_overrides: {} + train_global_batch_size: ${mul:${grpo.num_prompts_per_step}, ${grpo.num_generations_per_prompt}} # Match the total rollouts per step + train_micro_batch_size: 1 + logprob_batch_size: 1 + generation_batch_size: 32 # Only used when generating using HF backend + max_total_sequence_length: 32768 + precision: "bfloat16" + logprob_chunk_size: 1024 + offload_optimizer_for_logprob: false + + dtensor_cfg: + _v2: false + enabled: true + cpu_offload: False + sequence_parallel: false + activation_checkpointing: true + tensor_parallel_size: 2 + context_parallel_size: 1 + custom_parallel_plan: null + clear_cache_every_n_steps: null + + megatron_cfg: + enabled: false + # We might want to consider setting this value higher (e.g. to 1) and raising the vllm generation max mem utilization + empty_unused_memory_level: 0 + activation_checkpointing: true + converter_type: "Qwen2ForCausalLM" # Apparently this is comptible with Qwen 3 dense models. + tensor_model_parallel_size: 1 + expert_tensor_parallel_size: 1 + expert_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + num_layers_in_first_pipeline_stage: null + num_layers_in_last_pipeline_stage: null + context_parallel_size: 1 + pipeline_dtype: ${policy.precision} + sequence_parallel: false + freeze_moe_router: true + moe_router_dtype: "fp64" + moe_router_load_balancing_type: "none" # "seq_aux_loss" causes logprob error divergence for grpo + moe_router_bias_update_rate: 0.0 # by default, disable bias updates for grpo + #gives ~20% training perf speedup with sequence packing + apply_rope_fusion: True + defer_fp32_logits: true + moe_permute_fusion: false + bias_activation_fusion: True + moe_per_layer_logging: False + + optimizer: + optimizer: "adam" + lr: 5.0e-6 + min_lr: 5.0e-7 + weight_decay: 0.01 + bf16: true + fp16: false + params_dtype: "float32" + + #adam + adam_beta1: 0.9 + adam_beta2: 0.999 + adam_eps: 1e-8 + + #sgd + sgd_momentum: 0.9 + + #distributed optimizer + use_distributed_optimizer: true + use_precision_aware_optimizer: true + + # optimizer cpu offload + optimizer_cpu_offload: false + optimizer_offload_fraction: 0.0 + + clip_grad: ${policy.max_grad_norm} + + scheduler: + start_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + end_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} + weight_decay_incr_style: "constant" + lr_decay_style: "constant" + lr_decay_iters: null + lr_warmup_iters: 13 + lr_warmup_init: 5.0e-7 + + distributed_data_parallel_config: + grad_reduce_in_fp32: false + overlap_grad_reduce: true + overlap_param_gather: true + use_custom_fsdp: false + data_parallel_sharding_strategy: "optim_grads_params" + + env_vars: null + + # See docs/design-docs/sequence-packing-and-dynamic-batching.md + # for more details on dynamic batching and sequence packing. + dynamic_batching: + enabled: False + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + sequence_length_round: 64 + + sequence_packing: + enabled: false + train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} + logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} + algorithm: "modified_first_fit_decreasing" + sequence_length_round: 64 + + # makes the training sequence length divisible by the tensor parallel size + # this is useful for sequence parallel training + make_sequence_length_divisible_by: ${policy.dtensor_cfg.tensor_parallel_size} + max_grad_norm: 1.0 + + optimizer: + name: "torch.optim.AdamW" + kwargs: + lr: 1.0e-6 + weight_decay: 0.01 + betas: [0.9, 0.999] + eps: 1e-8 + # when using Dtensor, we need to set foreach + # and fused to False + foreach: False + fused: False + + scheduler: + - name: "torch.optim.lr_scheduler.ConstantLR" + kwargs: + factor: 1.0 + total_iters: 10000000000 + - milestones: [] + + generation: + backend: "vllm" + max_new_tokens: ${policy.max_total_sequence_length} + temperature: 1.0 + top_p: 1.0 + top_k: null + stop_token_ids: null + stop_strings: null + vllm_cfg: + async_engine: true + precision: ${policy.precision} + tensor_parallel_size: 1 + pipeline_parallel_size: 1 + enable_expert_parallel: false + expert_parallel_size: 1 + gpu_memory_utilization: 0.8 + max_model_len: ${policy.max_total_sequence_length} + enforce_eager: false + use_deep_gemm: False + num_last_layers_in_bf16: 0 + num_first_layers_in_bf16: 0 + expose_http_server: true + skip_tokenizer_init: false + kv_cache_dtype: null + http_server_serving_chat_kwargs: + # This is the tool parser for Qwen 3 4B Instruct. This needs to be changed for other models. + enable_auto_tools: true + tool_parser: hermes + # Enable the appropriate reasoning parser here. Since this model is an instruct model, we comment it out. + # reasoning_parser: deepseek_r1 + vllm_kwargs: + compilation_config: + # when enforce_eager is False, set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy, + # with the flag, vllm will use the custom CUDA kernels instead of the Triton kernels generated by torch.compile + # for more details, see convergence issue https://github.com/NVIDIA-NeMo/RL/issues/998 + use_inductor: False + colocated: + # true: generation shares training GPUs + # false: uses dedicated generation resources + enabled: true + # only relevant when enabled is false + resources: + gpus_per_node: null # Decides num gpus to be dedicated to generation when there is one node in the cluster i.e cluster.num_nodes == 1 + num_nodes: null # Decides number of nodes to be dedicated to generation + +data: + train_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/train.jsonl + validation_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/validation.jsonl + shuffle: true + num_workers: 0 + +env: + should_use_nemo_gym: true + should_log_nemo_gym_responses: true # If you have low logging storage, set this to false + nemo_gym: # This is passed into NeMo-Gym as the initial_global_config_dict + config_paths: + - responses_api_models/vllm_model/configs/vllm_model_for_training.yaml # Required! And it must be *for_training + - resources_servers/math_with_judge/configs/math_with_judge.yaml + math_with_judge: + resources_servers: + math_with_judge: + judge_model_server: + name: policy_model + should_use_judge: false + +logger: + log_dir: "logs" # Base directory for all logs + num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal + wandb_enabled: true + tensorboard_enabled: false + mlflow_enabled: false # Disable MLflow logging + swanlab_enabled: false + monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard + wandb: + project: "grpo-dev" + name: "grpo-dev-logger" + swanlab: + project: "grpo-dev" + name: "grpo-dev-logger" + tensorboard: {} + mlflow: + experiment_name: "grpo-dev" + run_name: "grpo-dev-logger" + gpu_monitoring: + collection_interval: 10 # How often to collect GPU usage metrics (in seconds) + flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds) + +cluster: + gpus_per_node: 8 + num_nodes: 8 From 31f270372683ff50c5c16733b020fea134c4b7e1 Mon Sep 17 00:00:00 2001 From: ruit Date: Thu, 1 Jan 2026 19:24:10 -0800 Subject: [PATCH 02/13] support run nemo-gym grpo Signed-off-by: ruit --- ...17k_bytedtsinghua_qwen3_4binstruct_nf.yaml | 17 ++- examples/nemo_gym/run_grpo_nemo_gym.py | 106 ++++++++++++++---- .../datasets/response_datasets/__init__.py | 4 + .../response_datasets/nemogym_dataset.py | 59 ++++++++++ nemo_rl/data/processors.py | 21 ++++ nemo_rl/environments/utils.py | 3 + 6 files changed, 188 insertions(+), 22 deletions(-) create mode 100644 nemo_rl/data/datasets/response_datasets/nemogym_dataset.py diff --git a/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml b/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml index 659b565711..88c56e4b42 100644 --- a/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml +++ b/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml @@ -211,7 +211,7 @@ policy: num_first_layers_in_bf16: 0 expose_http_server: true skip_tokenizer_init: false - kv_cache_dtype: null + kv_cache_dtype: ${policy.precision} http_server_serving_chat_kwargs: # This is the tool parser for Qwen 3 4B Instruct. This needs to be changed for other models. enable_auto_tools: true @@ -234,10 +234,21 @@ policy: num_nodes: null # Decides number of nodes to be dedicated to generation data: - train_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/train.jsonl - validation_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/validation.jsonl + max_input_seq_length: ${policy.max_total_sequence_length} shuffle: true num_workers: 0 + train: + dataset_name: NemoGymDataset + data_path: 3rdparty/Gym-workspace/Gym/data/train.jsonl + repeat: 1 + validation: + dataset_name: NemoGymDataset + data_path: 3rdparty/Gym-workspace/Gym/data/validation.jsonl + default: + env_name: "nemo_gym" + prompt_file: null + system_prompt_file: null + processor: "nemo_gym_data_processor" env: should_use_nemo_gym: true diff --git a/examples/nemo_gym/run_grpo_nemo_gym.py b/examples/nemo_gym/run_grpo_nemo_gym.py index c8d2c911e2..77bdb4ea31 100644 --- a/examples/nemo_gym/run_grpo_nemo_gym.py +++ b/examples/nemo_gym/run_grpo_nemo_gym.py @@ -17,7 +17,7 @@ import os import pprint from itertools import chain, repeat -from typing import Optional +from typing import Dict, Optional # Increase the W&B single object size warning threshold. Initially 100_000 (100 KB) -> 10_000_000 (10 MB) import wandb.util @@ -25,6 +25,7 @@ wandb.util.VALUE_BYTES_LIMIT = 10_000_000 import ray +from datasets import concatenate_datasets from omegaconf import OmegaConf from wandb import Table @@ -42,18 +43,19 @@ setup, ) from nemo_rl.algorithms.utils import get_tokenizer -from nemo_rl.data.datasets import AllTaskProcessedDataset -from nemo_rl.data.interfaces import DatumSpec -from nemo_rl.distributed.ray_actor_environment_registry import ( - get_actor_python_env, +from nemo_rl.data.datasets import ( + AllTaskProcessedDataset, + load_response_dataset, + update_single_dataset_config, ) +from nemo_rl.data.interfaces import DatumSpec from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.environments.nemo_gym import ( - NemoGym, NemoGymConfig, nemo_gym_example_to_nemo_rl_datum_spec, setup_nemo_gym_config, ) +from nemo_rl.environments.utils import create_env from nemo_rl.experience.rollouts import run_async_nemo_gym_rollout from nemo_rl.models.generation import configure_generation_config from nemo_rl.utils.config import load_config, parse_hydra_overrides @@ -109,6 +111,80 @@ def setup_single_nemo_gym_dataset( ) +def setup_data( + tokenizer: TokenizerType, + data_config: Dict, + env_configs: Dict, + seed: int, +) -> tuple[ + AllTaskProcessedDataset, + Optional[AllTaskProcessedDataset], + dict[str, EnvironmentInterface], + dict[str, EnvironmentInterface], +]: + print("\n▶ Setting up data...") + # setup train dataset + data_list = [] + task_data_processors = {} + + if isinstance(data_config["train"], dict): + data_config["train"] = [data_config["train"]] + for cfg in data_config["train"]: + update_single_dataset_config(cfg, data_config["default"]) + data = load_response_dataset(cfg, seed) + data_list.append(data) + task_data_processors[data.task_name] = (data.task_spec, data.processor) + + merged_data = concatenate_datasets([data.dataset for data in data_list]) + dataset = AllTaskProcessedDataset( + merged_data, + tokenizer, + None, + task_data_processors, + max_seq_length=data_config["max_input_seq_length"], + ) + print(f" ✓ Training dataset loaded with {len(dataset)} samples.") + + # setup validation dataset + val_task_data_processors = {} + val_data_list = [] + + for data in data_list: + if hasattr(data, "val_dataset") and data.val_dataset is not None: + val_data_list.append(data.val_dataset) + # bind task_name to task_data_processors + task_name = data.task_name + val_task_data_processors[task_name] = task_data_processors[task_name] + + if data_config["validation"] is not None: + if isinstance(data_config["validation"], dict): + data_config["validation"] = [data_config["validation"]] + + for cfg in data_config["validation"]: + update_single_dataset_config(cfg, data_config["default"]) + val_data = load_response_dataset(cfg, seed) + val_data_list.append(val_data.dataset) + # bind task_name to task_data_processors + val_task_data_processors[val_data.task_name] = ( + val_data.task_spec, + val_data.processor, + ) + + val_dataset = None + if len(val_data_list) > 0: + merged_val_data = concatenate_datasets(val_data_list) + val_dataset = AllTaskProcessedDataset( + merged_val_data, + tokenizer, + None, + val_task_data_processors, + max_seq_length=data_config["max_input_seq_length"], + ) + print(f" ✓ Validation dataset loaded with {len(val_dataset)} samples.") + + return dataset, val_dataset + + # These types are directly imported from grpo_train since if something about the architecture changes we want to immediately fail. def collect_trajectories( policy: ColocatablePolicyInterface, @@ -202,13 +278,11 @@ def main() -> None: assert _should_use_nemo_gym(config) print("\n▶ Setting up data...") - train_dataset = setup_single_nemo_gym_dataset( - jsonl_fpath=config["data"]["train_jsonl_fpath"], - tokenizer=tokenizer, - ) - val_dataset = setup_single_nemo_gym_dataset( - jsonl_fpath=config["data"]["validation_jsonl_fpath"], + train_dataset, val_dataset = setup_data( tokenizer=tokenizer, + data_config=config["data"], + env_configs=config["env"], + seed=config["grpo"]["seed"], ) # Validation dataset config setup. @@ -254,13 +328,7 @@ def main() -> None: base_urls=policy_generation.dp_openai_server_base_urls, initial_global_config_dict=config["env"]["nemo_gym"], ) - nemo_gym = NemoGym.options( - runtime_env={ - "py_executable": get_actor_python_env( - "nemo_rl.environments.nemo_gym.NemoGym" - ), - } - ).remote(nemo_gym_config) + nemo_gym = create_env(env_name="nemo_gym", env_config=nemo_gym_config) # Blocking wait for NeMo-Gym to spin up ray.get(nemo_gym.health_check.remote()) task_to_env = {"nemo_gym": nemo_gym} diff --git a/nemo_rl/data/datasets/response_datasets/__init__.py b/nemo_rl/data/datasets/response_datasets/__init__.py index 524b854c3a..83ce498fbc 100644 --- a/nemo_rl/data/datasets/response_datasets/__init__.py +++ b/nemo_rl/data/datasets/response_datasets/__init__.py @@ -22,6 +22,7 @@ from nemo_rl.data.datasets.response_datasets.deepscaler import DeepScalerDataset from nemo_rl.data.datasets.response_datasets.geometry3k import Geometry3KDataset from nemo_rl.data.datasets.response_datasets.helpsteer3 import HelpSteer3Dataset +from nemo_rl.data.datasets.response_datasets.nemogym_dataset import NemoGymDataset from nemo_rl.data.datasets.response_datasets.oai_format_dataset import ( OpenAIFormatDataset, ) @@ -64,6 +65,8 @@ def load_response_dataset(data_config: ResponseDatasetConfig): dataset = dataset_class( **data_config # pyrefly: ignore[missing-argument] `data_path` is required for some classes ) + elif dataset_name == "NemoGymDataset": + dataset = NemoGymDataset(**data_config) else: raise ValueError( f"Unsupported {dataset_name=}. " @@ -87,6 +90,7 @@ def load_response_dataset(data_config: ResponseDatasetConfig): "DeepScalerDataset", "Geometry3KDataset", "HelpSteer3Dataset", + "NemoGymDataset", "OasstDataset", "OpenAIFormatDataset", "OpenMathInstruct2Dataset", diff --git a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py new file mode 100644 index 0000000000..5277484786 --- /dev/null +++ b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py @@ -0,0 +1,59 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Optional + +import torch + +from nemo_rl.data.datasets.raw_dataset import RawDataset +from nemo_rl.data.datasets.utils import load_dataset_from_path + + +class NemoGymDataset(RawDataset): + """Simple wrapper around the Nemo Gym dataset.""" + + def __init__(self, data_path: Optional[str] = None, **kwargs) -> None: + self.task_name = "NemoGymDataset" + + # load from jsonl + if data_path is None: + # Allow optional at type level for config validation; enforce at runtime for clarity + raise ValueError( + "NemoGymDataset requires `data_path` in data_config to load examples." + ) + self.dataset = load_dataset_from_path(data_path) + + # format the dataset + # HuggingFace Dataset 在 map/写入 Arrow 时不会持久化 torch.Tensor,会把它序列化成 Python 列表。因此下游在取样时读到的是 [](list),触发断言 + self.dataset = self.dataset.map( + self.format_data, + with_indices=True, + ) + if "repeat" in kwargs: + self.dataset = self.dataset.repeat(kwargs["repeat"]) + + def format_data(self, data: dict[str, Any], idx: int) -> dict[str, Any]: + return { + "message_log": [ + {"role": "user", "content": "", "token_ids": torch.tensor([])} + ], + "task_name": self.task_name, + "length": 0, + "extra_env_info": data, + "loss_multiplier": 1.0, # Fix to 1.0 to backprop on all examples + "idx": idx, + "stop_strings": None, + # Extra vars + "token_ids": [], # Just need this empty key to be compatible with the current NeMo RL GRPO impl + } diff --git a/nemo_rl/data/processors.py b/nemo_rl/data/processors.py index 8958bef259..0cde1dd6c3 100644 --- a/nemo_rl/data/processors.py +++ b/nemo_rl/data/processors.py @@ -663,6 +663,26 @@ def multichoice_qa_processor( return output +def nemo_gym_data_processor( + datum_dict: dict[str, Any], + *args, + **kwargs, +) -> DatumSpec: + """Process a datum dictionary (directly loaded from dataset) into a DatumSpec for Nemo Gym.""" + # Ensure message_log exists and contains tensor token_ids so downstream padding works + if "message_log" not in datum_dict or not datum_dict["message_log"]: + datum_dict["message_log"] = [ + {"role": "user", "content": "", "token_ids": torch.tensor([])} + ] + else: + for msg in datum_dict["message_log"]: + if "token_ids" not in msg: + msg["token_ids"] = torch.tensor([]) + elif not isinstance(msg["token_ids"], torch.Tensor): + msg["token_ids"] = torch.tensor(msg["token_ids"]) + return cast(DatumSpec, datum_dict) + + # Processor registry. Key is the processor name, value is the processor function. # Note: We cast the literal dict to Dict[str, TaskDataProcessFnCallable] because # type checkers see each concrete function's signature as a distinct callable type. @@ -679,6 +699,7 @@ def multichoice_qa_processor( "multichoice_qa_processor": multichoice_qa_processor, "sft_processor": sft_processor, "vlm_hf_data_processor": vlm_hf_data_processor, + "nemo_gym_data_processor": nemo_gym_data_processor, }, ) diff --git a/nemo_rl/environments/utils.py b/nemo_rl/environments/utils.py index 99fe9eda1a..9b4f4d6279 100644 --- a/nemo_rl/environments/utils.py +++ b/nemo_rl/environments/utils.py @@ -46,6 +46,9 @@ class EnvRegistryEntry(TypedDict, total=False): "vlm": { "actor_class_fqn": "nemo_rl.environments.vlm_environment.VLMEnvironment", }, + "nemo_gym": { + "actor_class_fqn": "nemo_rl.environments.nemo_gym.NemoGym", + }, } From 32a63e335f1a2e3b6e375d616b4a1fa65c0d7750 Mon Sep 17 00:00:00 2001 From: ruit Date: Thu, 1 Jan 2026 21:43:13 -0800 Subject: [PATCH 03/13] unify nemo gym interaface Signed-off-by: ruit --- examples/nemo_gym/run_grpo_nemo_gym.py | 73 ++++++++++---------------- 1 file changed, 28 insertions(+), 45 deletions(-) diff --git a/examples/nemo_gym/run_grpo_nemo_gym.py b/examples/nemo_gym/run_grpo_nemo_gym.py index 77bdb4ea31..c2f47c13a8 100644 --- a/examples/nemo_gym/run_grpo_nemo_gym.py +++ b/examples/nemo_gym/run_grpo_nemo_gym.py @@ -13,10 +13,8 @@ # limitations under the License. import argparse -import json import os import pprint -from itertools import chain, repeat from typing import Dict, Optional # Increase the W&B single object size warning threshold. Initially 100_000 (100 KB) -> 10_000_000 (10 MB) @@ -45,14 +43,13 @@ from nemo_rl.algorithms.utils import get_tokenizer from nemo_rl.data.datasets import ( AllTaskProcessedDataset, + extract_necessary_env_names, load_response_dataset, update_single_dataset_config, ) -from nemo_rl.data.interfaces import DatumSpec from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.environments.nemo_gym import ( NemoGymConfig, - nemo_gym_example_to_nemo_rl_datum_spec, setup_nemo_gym_config, ) from nemo_rl.environments.utils import create_env @@ -77,40 +74,6 @@ def parse_args() -> tuple[argparse.Namespace, list[str]]: return args, overrides -def setup_single_nemo_gym_dataset( - jsonl_fpath: str, tokenizer, num_repeats: Optional[int] = None -): - with open(jsonl_fpath) as f: - nemo_gym_examples = list(map(json.loads, f)) - - print(f"Loaded data at {jsonl_fpath}. Found {len(nemo_gym_examples)} examples") - - if num_repeats: - previous_length = len(nemo_gym_examples) - nemo_gym_examples = list( - chain.from_iterable( - repeat(nemo_gym_example, num_repeats) - for nemo_gym_example in nemo_gym_examples - ) - ) - print( - f"Repeating examples (in a pattern of abc to aabbcc) for {jsonl_fpath} from {previous_length} to {len(nemo_gym_examples)}!" - ) - - nemo_rl_compatible_examples: list[DatumSpec] = [ - nemo_gym_example_to_nemo_rl_datum_spec(nemo_gym_example, idx) - for idx, nemo_gym_example in enumerate(nemo_gym_examples) - ] - - passthrough_task_processor = lambda datum_dict, *args, **kwargs: datum_dict - return AllTaskProcessedDataset( - nemo_rl_compatible_examples, - tokenizer, - None, - passthrough_task_processor, - ) - - def setup_data( tokenizer: TokenizerType, data_config: Dict, @@ -122,10 +85,18 @@ def setup_data( dict[str, EnvironmentInterface], dict[str, EnvironmentInterface], ]: + print("\n▶ Setting up envs...") + env_name_list = extract_necessary_env_names(data_config) + envs = { + env_name: create_env(env_name=env_name, env_config=env_configs[env_name]) + for env_name in env_name_list + if env_name != "nemo_gym" + } print("\n▶ Setting up data...") # setup train dataset - data_list = [] task_data_processors = {} + task_to_env = {} + data_list = [] if isinstance(data_config["train"], dict): data_config["train"] = [data_config["train"]] @@ -133,7 +104,12 @@ def setup_data( update_single_dataset_config(cfg, data_config["default"]) data = load_response_dataset(cfg, seed) data_list.append(data) - task_data_processors[data.task_name] = (data.task_spec, data.processor) + # bind task_name to task_data_processors and task_to_env + task_name = data.task_name + task_data_processors[task_name] = (data.task_spec, data.processor) + # Skip binding nemo_gym env to task_to_env, nemo_gym env need to initialize policy first + if cfg["env_name"] != "nemo_gym": + task_to_env[task_name] = envs[cfg["env_name"]] merged_data = concatenate_datasets([data.dataset for data in data_list]) dataset = AllTaskProcessedDataset( @@ -147,6 +123,7 @@ def setup_data( # setup validation dataset val_task_data_processors = {} + val_task_to_env = {} val_data_list = [] for data in data_list: @@ -155,6 +132,8 @@ def setup_data( # bind task_name to task_data_processors task_name = data.task_name val_task_data_processors[task_name] = task_data_processors[task_name] + if task_name in task_to_env: + val_task_to_env[task_name] = task_to_env[task_name] if data_config["validation"] is not None: if isinstance(data_config["validation"], dict): @@ -165,10 +144,13 @@ def setup_data( val_data = load_response_dataset(cfg, seed) val_data_list.append(val_data.dataset) # bind task_name to task_data_processors - val_task_data_processors[val_data.task_name] = ( + task_name = val_data.task_name + val_task_data_processors[task_name] = ( val_data.task_spec, val_data.processor, ) + if cfg["env_name"] != "nemo_gym": + val_task_to_env[task_name] = envs[cfg["env_name"]] val_dataset = None if len(val_data_list) > 0: @@ -182,7 +164,7 @@ def setup_data( ) print(f" ✓ Validation dataset loaded with {len(val_dataset)} samples.") - return dataset, val_dataset + return dataset, val_dataset, task_to_env, val_task_to_env # These types are directly imported from grpo_train since if something about the architecture changes we want to immediately fail. @@ -278,7 +260,7 @@ def main() -> None: assert _should_use_nemo_gym(config) print("\n▶ Setting up data...") - train_dataset, val_dataset = setup_data( + train_dataset, val_dataset, task_to_env, val_task_to_env = setup_data( tokenizer=tokenizer, data_config=config["data"], env_configs=config["env"], @@ -328,11 +310,12 @@ def main() -> None: base_urls=policy_generation.dp_openai_server_base_urls, initial_global_config_dict=config["env"]["nemo_gym"], ) + # Default nemo_gym env is used for trajectory collection nemo_gym = create_env(env_name="nemo_gym", env_config=nemo_gym_config) # Blocking wait for NeMo-Gym to spin up ray.get(nemo_gym.health_check.remote()) - task_to_env = {"nemo_gym": nemo_gym} - val_task_to_env = task_to_env + task_to_env["nemo_gym"] = nemo_gym + val_task_to_env["nemo_gym"] = nemo_gym if is_trajectory_collection: collect_trajectories( From 7a2f009de99c6fcfb64aa3d6e36270a1a76d9bba Mon Sep 17 00:00:00 2001 From: ruit Date: Thu, 29 Jan 2026 19:37:06 -0800 Subject: [PATCH 04/13] update nemogym config Signed-off-by: ruit --- ...orkplace_assistant_nemotron_nano_v2_9b.yaml | 18 +++++++++++++++--- examples/nemo_gym/run_grpo_nemo_gym.py | 4 ++-- .../response_datasets/nemogym_dataset.py | 2 +- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml index f3e3dcccc8..3c2453590b 100644 --- a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml +++ b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml @@ -61,7 +61,7 @@ policy: train_micro_batch_size: 1 logprob_batch_size: 1 generation_batch_size: 32 # Only used when generating using HF backend - max_total_sequence_length: 8192 + max_total_sequence_length: 12288 precision: "bfloat16" logprob_chunk_size: null # Disabled to allow defer_fp32_logits: false @@ -231,10 +231,22 @@ policy: data: # Using the prepared train and validation datasets (downloaded from HuggingFace and split 90/10) # Train: 1129 samples, Validation: 126 samples - train_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/train.jsonl - validation_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/validation.jsonl + + max_input_seq_length: ${policy.max_total_sequence_length} shuffle: true num_workers: 0 + train: + dataset_name: NemoGymDataset + data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/train.jsonl + repeat: 1 + validation: + dataset_name: NemoGymDataset + data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/validation.jsonl + default: + env_name: "nemo_gym" + prompt_file: null + system_prompt_file: null + processor: "nemo_gym_data_processor" env: should_use_nemo_gym: true diff --git a/examples/nemo_gym/run_grpo_nemo_gym.py b/examples/nemo_gym/run_grpo_nemo_gym.py index c2f47c13a8..03ba43a1ee 100644 --- a/examples/nemo_gym/run_grpo_nemo_gym.py +++ b/examples/nemo_gym/run_grpo_nemo_gym.py @@ -102,7 +102,7 @@ def setup_data( data_config["train"] = [data_config["train"]] for cfg in data_config["train"]: update_single_dataset_config(cfg, data_config["default"]) - data = load_response_dataset(cfg, seed) + data = load_response_dataset(cfg) data_list.append(data) # bind task_name to task_data_processors and task_to_env task_name = data.task_name @@ -141,7 +141,7 @@ def setup_data( for cfg in data_config["validation"]: update_single_dataset_config(cfg, data_config["default"]) - val_data = load_response_dataset(cfg, seed) + val_data = load_response_dataset(cfg) val_data_list.append(val_data.dataset) # bind task_name to task_data_processors task_name = val_data.task_name diff --git a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py index 5277484786..c3d110ba98 100644 --- a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py +++ b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py @@ -35,7 +35,7 @@ def __init__(self, data_path: Optional[str] = None, **kwargs) -> None: self.dataset = load_dataset_from_path(data_path) # format the dataset - # HuggingFace Dataset 在 map/写入 Arrow 时不会持久化 torch.Tensor,会把它序列化成 Python 列表。因此下游在取样时读到的是 [](list),触发断言 + # HuggingFace Dataset does not persist torch.Tensor during map/Arrow writes; it serializes to Python lists. self.dataset = self.dataset.map( self.format_data, with_indices=True, From 2a08dd24a5a71752b28f947e083f07c8ec8ac897 Mon Sep 17 00:00:00 2001 From: Yuki Huang Date: Mon, 2 Feb 2026 03:27:07 -0800 Subject: [PATCH 05/13] update config Signed-off-by: Yuki Huang --- ...17k_bytedtsinghua_qwen3_4binstruct_nf.yaml | 291 ------------------ ...rkplace_assistant_nemotron_nano_v2_9b.yaml | 10 +- 2 files changed, 5 insertions(+), 296 deletions(-) delete mode 100644 examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml diff --git a/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml b/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml deleted file mode 100644 index 88c56e4b42..0000000000 --- a/examples/nemo_gym/grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml +++ /dev/null @@ -1,291 +0,0 @@ -grpo: - max_num_epochs: 1 - num_prompts_per_step: 64 - num_generations_per_prompt: 16 - max_rollout_turns: 1 # for multi-turn rollouts. Math Environments just have 1 turn (answering the question) - max_num_steps: 1000000 - normalize_rewards: true - use_leave_one_out_baseline: true - val_period: 10 - val_at_start: true - overlong_filtering: false - max_val_samples: null # inferred from size of val dataset. for multi evals, repeat val ds via `num_repeats` in `ng_prepare_data`. - val_batch_size: null - seed: 42 - use_dynamic_sampling: false - dynamic_sampling_max_gen_batches: 10 - batch_multiplier: 1 - reward_shaping: - enabled: false - overlong_buffer_length: 128 - overlong_buffer_penalty: 1 - max_response_length: ${policy.max_total_sequence_length} - reward_scaling: - enabled: false - source_min: 0.0 - source_max: 1.0 - target_min: 0.0 - target_max: 1.0 - skip_reference_policy_logprobs_calculation: true - -loss_fn: - reference_policy_kl_penalty: 0 - reference_policy_kl_type: "k3" - kl_input_clamp_value: 20.0 - kl_output_clamp_value: 10.0 - ratio_clip_min: 0.2 - ratio_clip_max: 0.2 - ratio_clip_c: null - # (default off) loss formulation improvements (docs/guides/grpo.md#loss) - use_on_policy_kl_approximation: false - truncated_importance_sampling_ratio: null - use_importance_sampling_correction: false - token_level_loss: true - force_on_policy_ratio: false # Set to true to force ratio=1.0 (requires train_global_batch_size == num_prompts_per_step * num_generations_per_prompt) - -checkpointing: - enabled: true - checkpoint_dir: "results/grpo" - metric_name: "val:accuracy" - higher_is_better: true - keep_top_k: 3 - save_period: 1 - checkpoint_must_save_by: null - -policy: - model_name: "Qwen/Qwen3-4B-Instruct-2507" - tokenizer: - name: ${policy.model_name} ## specify if you'd like to use a tokenizer different from the model's default - chat_template_kwargs: null # can be used to pass kwargs to the chat template, e.g., enable_thinking=true - hf_config_overrides: {} - train_global_batch_size: ${mul:${grpo.num_prompts_per_step}, ${grpo.num_generations_per_prompt}} # Match the total rollouts per step - train_micro_batch_size: 1 - logprob_batch_size: 1 - generation_batch_size: 32 # Only used when generating using HF backend - max_total_sequence_length: 32768 - precision: "bfloat16" - logprob_chunk_size: 1024 - offload_optimizer_for_logprob: false - - dtensor_cfg: - _v2: false - enabled: true - cpu_offload: False - sequence_parallel: false - activation_checkpointing: true - tensor_parallel_size: 2 - context_parallel_size: 1 - custom_parallel_plan: null - clear_cache_every_n_steps: null - - megatron_cfg: - enabled: false - # We might want to consider setting this value higher (e.g. to 1) and raising the vllm generation max mem utilization - empty_unused_memory_level: 0 - activation_checkpointing: true - converter_type: "Qwen2ForCausalLM" # Apparently this is comptible with Qwen 3 dense models. - tensor_model_parallel_size: 1 - expert_tensor_parallel_size: 1 - expert_model_parallel_size: 1 - pipeline_model_parallel_size: 1 - num_layers_in_first_pipeline_stage: null - num_layers_in_last_pipeline_stage: null - context_parallel_size: 1 - pipeline_dtype: ${policy.precision} - sequence_parallel: false - freeze_moe_router: true - moe_router_dtype: "fp64" - moe_router_load_balancing_type: "none" # "seq_aux_loss" causes logprob error divergence for grpo - moe_router_bias_update_rate: 0.0 # by default, disable bias updates for grpo - #gives ~20% training perf speedup with sequence packing - apply_rope_fusion: True - defer_fp32_logits: true - moe_permute_fusion: false - bias_activation_fusion: True - moe_per_layer_logging: False - - optimizer: - optimizer: "adam" - lr: 5.0e-6 - min_lr: 5.0e-7 - weight_decay: 0.01 - bf16: true - fp16: false - params_dtype: "float32" - - #adam - adam_beta1: 0.9 - adam_beta2: 0.999 - adam_eps: 1e-8 - - #sgd - sgd_momentum: 0.9 - - #distributed optimizer - use_distributed_optimizer: true - use_precision_aware_optimizer: true - - # optimizer cpu offload - optimizer_cpu_offload: false - optimizer_offload_fraction: 0.0 - - clip_grad: ${policy.max_grad_norm} - - scheduler: - start_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} - end_weight_decay: ${policy.megatron_cfg.optimizer.weight_decay} - weight_decay_incr_style: "constant" - lr_decay_style: "constant" - lr_decay_iters: null - lr_warmup_iters: 13 - lr_warmup_init: 5.0e-7 - - distributed_data_parallel_config: - grad_reduce_in_fp32: false - overlap_grad_reduce: true - overlap_param_gather: true - use_custom_fsdp: false - data_parallel_sharding_strategy: "optim_grads_params" - - env_vars: null - - # See docs/design-docs/sequence-packing-and-dynamic-batching.md - # for more details on dynamic batching and sequence packing. - dynamic_batching: - enabled: False - train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} - logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} - sequence_length_round: 64 - - sequence_packing: - enabled: false - train_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.train_micro_batch_size}} - logprob_mb_tokens: ${mul:${policy.max_total_sequence_length}, ${policy.logprob_batch_size}} - algorithm: "modified_first_fit_decreasing" - sequence_length_round: 64 - - # makes the training sequence length divisible by the tensor parallel size - # this is useful for sequence parallel training - make_sequence_length_divisible_by: ${policy.dtensor_cfg.tensor_parallel_size} - max_grad_norm: 1.0 - - optimizer: - name: "torch.optim.AdamW" - kwargs: - lr: 1.0e-6 - weight_decay: 0.01 - betas: [0.9, 0.999] - eps: 1e-8 - # when using Dtensor, we need to set foreach - # and fused to False - foreach: False - fused: False - - scheduler: - - name: "torch.optim.lr_scheduler.ConstantLR" - kwargs: - factor: 1.0 - total_iters: 10000000000 - - milestones: [] - - generation: - backend: "vllm" - max_new_tokens: ${policy.max_total_sequence_length} - temperature: 1.0 - top_p: 1.0 - top_k: null - stop_token_ids: null - stop_strings: null - vllm_cfg: - async_engine: true - precision: ${policy.precision} - tensor_parallel_size: 1 - pipeline_parallel_size: 1 - enable_expert_parallel: false - expert_parallel_size: 1 - gpu_memory_utilization: 0.8 - max_model_len: ${policy.max_total_sequence_length} - enforce_eager: false - use_deep_gemm: False - num_last_layers_in_bf16: 0 - num_first_layers_in_bf16: 0 - expose_http_server: true - skip_tokenizer_init: false - kv_cache_dtype: ${policy.precision} - http_server_serving_chat_kwargs: - # This is the tool parser for Qwen 3 4B Instruct. This needs to be changed for other models. - enable_auto_tools: true - tool_parser: hermes - # Enable the appropriate reasoning parser here. Since this model is an instruct model, we comment it out. - # reasoning_parser: deepseek_r1 - vllm_kwargs: - compilation_config: - # when enforce_eager is False, set ++policy.generation.vllm_kwargs.compilation_config.use_inductor=False for better accuracy, - # with the flag, vllm will use the custom CUDA kernels instead of the Triton kernels generated by torch.compile - # for more details, see convergence issue https://github.com/NVIDIA-NeMo/RL/issues/998 - use_inductor: False - colocated: - # true: generation shares training GPUs - # false: uses dedicated generation resources - enabled: true - # only relevant when enabled is false - resources: - gpus_per_node: null # Decides num gpus to be dedicated to generation when there is one node in the cluster i.e cluster.num_nodes == 1 - num_nodes: null # Decides number of nodes to be dedicated to generation - -data: - max_input_seq_length: ${policy.max_total_sequence_length} - shuffle: true - num_workers: 0 - train: - dataset_name: NemoGymDataset - data_path: 3rdparty/Gym-workspace/Gym/data/train.jsonl - repeat: 1 - validation: - dataset_name: NemoGymDataset - data_path: 3rdparty/Gym-workspace/Gym/data/validation.jsonl - default: - env_name: "nemo_gym" - prompt_file: null - system_prompt_file: null - processor: "nemo_gym_data_processor" - -env: - should_use_nemo_gym: true - should_log_nemo_gym_responses: true # If you have low logging storage, set this to false - nemo_gym: # This is passed into NeMo-Gym as the initial_global_config_dict - config_paths: - - responses_api_models/vllm_model/configs/vllm_model_for_training.yaml # Required! And it must be *for_training - - resources_servers/math_with_judge/configs/math_with_judge.yaml - math_with_judge: - resources_servers: - math_with_judge: - judge_model_server: - name: policy_model - should_use_judge: false - -logger: - log_dir: "logs" # Base directory for all logs - num_val_samples_to_print: 0 # Number of validation samples to pretty print on terminal - wandb_enabled: true - tensorboard_enabled: false - mlflow_enabled: false # Disable MLflow logging - swanlab_enabled: false - monitor_gpus: true # If true, will monitor GPU usage and log to wandb and/or tensorboard - wandb: - project: "grpo-dev" - name: "grpo-dev-logger" - swanlab: - project: "grpo-dev" - name: "grpo-dev-logger" - tensorboard: {} - mlflow: - experiment_name: "grpo-dev" - run_name: "grpo-dev-logger" - gpu_monitoring: - collection_interval: 10 # How often to collect GPU usage metrics (in seconds) - flush_interval: 10 # How often to flush GPU usage metrics to the loggers (in seconds) - -cluster: - gpus_per_node: 8 - num_nodes: 8 diff --git a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml index 3c2453590b..fcf4c88fee 100644 --- a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml +++ b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml @@ -45,6 +45,7 @@ loss_fn: checkpointing: enabled: true + checkpoint_dir: "results/grpo" metric_name: "val:accuracy" higher_is_better: true keep_top_k: 3 @@ -61,7 +62,7 @@ policy: train_micro_batch_size: 1 logprob_batch_size: 1 generation_batch_size: 32 # Only used when generating using HF backend - max_total_sequence_length: 12288 + max_total_sequence_length: 8192 precision: "bfloat16" logprob_chunk_size: null # Disabled to allow defer_fp32_logits: false @@ -229,16 +230,15 @@ policy: num_nodes: null # Decides number of nodes to be dedicated to generation data: - # Using the prepared train and validation datasets (downloaded from HuggingFace and split 90/10) - # Train: 1129 samples, Validation: 126 samples - max_input_seq_length: ${policy.max_total_sequence_length} shuffle: true num_workers: 0 + + # Using the prepared train and validation datasets (downloaded from HuggingFace and split 90/10) + # Train: 1129 samples, Validation: 126 samples train: dataset_name: NemoGymDataset data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/train.jsonl - repeat: 1 validation: dataset_name: NemoGymDataset data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/validation.jsonl From d9fd70d953f82aa97213cfca2de89110b33b0f7e Mon Sep 17 00:00:00 2001 From: Yuki Huang Date: Tue, 3 Feb 2026 18:45:32 -0800 Subject: [PATCH 06/13] update nemogym dataset and data processor Signed-off-by: Yuki Huang --- ...rkplace_assistant_nemotron_nano_v2_9b.yaml | 2 +- nemo_rl/data/__init__.py | 2 +- .../datasets/response_datasets/__init__.py | 3 +- .../response_datasets/nemogym_dataset.py | 52 +++++++------------ nemo_rl/data/processors.py | 28 +++++----- nemo_rl/environments/nemo_gym.py | 24 --------- 6 files changed, 37 insertions(+), 74 deletions(-) diff --git a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml index fcf4c88fee..ed5c245abd 100644 --- a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml +++ b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml @@ -230,7 +230,7 @@ policy: num_nodes: null # Decides number of nodes to be dedicated to generation data: - max_input_seq_length: ${policy.max_total_sequence_length} + max_input_seq_length: null # nemogym processor doesn't use this parameter shuffle: true num_workers: 0 diff --git a/nemo_rl/data/__init__.py b/nemo_rl/data/__init__.py index 2fb26ebd90..ad70f95c75 100644 --- a/nemo_rl/data/__init__.py +++ b/nemo_rl/data/__init__.py @@ -44,7 +44,7 @@ class PreferenceDatasetConfig(TypedDict): class DataConfig(TypedDict): - max_input_seq_length: int + max_input_seq_length: int | None add_bos: NotRequired[bool] add_eos: NotRequired[bool] add_generation_prompt: NotRequired[bool] diff --git a/nemo_rl/data/datasets/response_datasets/__init__.py b/nemo_rl/data/datasets/response_datasets/__init__.py index 83ce498fbc..961b7b9ba8 100644 --- a/nemo_rl/data/datasets/response_datasets/__init__.py +++ b/nemo_rl/data/datasets/response_datasets/__init__.py @@ -51,6 +51,7 @@ "tulu3_sft_mixture": Tulu3SftMixtureDataset, # load from local JSONL file or HuggingFace "openai_format": OpenAIFormatDataset, + "NemoGymDataset": NemoGymDataset, "ResponseDataset": ResponseDataset, } @@ -65,8 +66,6 @@ def load_response_dataset(data_config: ResponseDatasetConfig): dataset = dataset_class( **data_config # pyrefly: ignore[missing-argument] `data_path` is required for some classes ) - elif dataset_name == "NemoGymDataset": - dataset = NemoGymDataset(**data_config) else: raise ValueError( f"Unsupported {dataset_name=}. " diff --git a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py index c3d110ba98..d8d6d30a85 100644 --- a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py +++ b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py @@ -12,48 +12,36 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Optional +import json +from typing import Optional -import torch +from datasets import Dataset from nemo_rl.data.datasets.raw_dataset import RawDataset -from nemo_rl.data.datasets.utils import load_dataset_from_path class NemoGymDataset(RawDataset): """Simple wrapper around the Nemo Gym dataset.""" - def __init__(self, data_path: Optional[str] = None, **kwargs) -> None: - self.task_name = "NemoGymDataset" + def __init__( + self, data_path: Optional[str] = None, repeat: int = 1, **kwargs + ) -> None: + self.task_name = "-".join(data_path.split("/")[-2:]).split(".")[0] + if self.task_name[0] == "-": + self.task_name = self.task_name[1:] # load from jsonl - if data_path is None: - # Allow optional at type level for config validation; enforce at runtime for clarity - raise ValueError( - "NemoGymDataset requires `data_path` in data_config to load examples." - ) - self.dataset = load_dataset_from_path(data_path) + with open(data_path) as f: + self.dataset = list(map(json.loads, f)) # format the dataset - # HuggingFace Dataset does not persist torch.Tensor during map/Arrow writes; it serializes to Python lists. - self.dataset = self.dataset.map( - self.format_data, - with_indices=True, + self.dataset = Dataset.from_dict( + { + "extra_env_info": self.dataset, + "task_name": [self.task_name] * len(self.dataset), + } ) - if "repeat" in kwargs: - self.dataset = self.dataset.repeat(kwargs["repeat"]) - - def format_data(self, data: dict[str, Any], idx: int) -> dict[str, Any]: - return { - "message_log": [ - {"role": "user", "content": "", "token_ids": torch.tensor([])} - ], - "task_name": self.task_name, - "length": 0, - "extra_env_info": data, - "loss_multiplier": 1.0, # Fix to 1.0 to backprop on all examples - "idx": idx, - "stop_strings": None, - # Extra vars - "token_ids": [], # Just need this empty key to be compatible with the current NeMo RL GRPO impl - } + + # repeat the dataset + if repeat > 1: + self.dataset = self.dataset.repeat(repeat) diff --git a/nemo_rl/data/processors.py b/nemo_rl/data/processors.py index 0cde1dd6c3..168b11aa5b 100644 --- a/nemo_rl/data/processors.py +++ b/nemo_rl/data/processors.py @@ -665,22 +665,22 @@ def multichoice_qa_processor( def nemo_gym_data_processor( datum_dict: dict[str, Any], - *args, - **kwargs, + task_data_spec: TaskDataSpec, + tokenizer: TokenizerType, + max_seq_length: int, + idx: int, ) -> DatumSpec: """Process a datum dictionary (directly loaded from dataset) into a DatumSpec for Nemo Gym.""" - # Ensure message_log exists and contains tensor token_ids so downstream padding works - if "message_log" not in datum_dict or not datum_dict["message_log"]: - datum_dict["message_log"] = [ - {"role": "user", "content": "", "token_ids": torch.tensor([])} - ] - else: - for msg in datum_dict["message_log"]: - if "token_ids" not in msg: - msg["token_ids"] = torch.tensor([]) - elif not isinstance(msg["token_ids"], torch.Tensor): - msg["token_ids"] = torch.tensor(msg["token_ids"]) - return cast(DatumSpec, datum_dict) + output: DatumSpec = { + "extra_env_info": datum_dict["extra_env_info"], + "loss_multiplier": 1.0, + "idx": idx, + "task_name": datum_dict["task_name"], + # fake keys for compatibility with the current GRPO implementation + "message_log": [{"role": "user", "content": "", "token_ids": torch.tensor([])}], + "length": 0, + } + return output # Processor registry. Key is the processor name, value is the processor function. diff --git a/nemo_rl/environments/nemo_gym.py b/nemo_rl/environments/nemo_gym.py index 5ec15c3cef..b59b8b46fa 100644 --- a/nemo_rl/environments/nemo_gym.py +++ b/nemo_rl/environments/nemo_gym.py @@ -236,27 +236,3 @@ def setup_nemo_gym_config(config, tokenizer) -> None: # Stop strings or token ids are not supported generation_config["stop_strings"] = None generation_config["stop_token_ids"] = None - - -######################################## -# Data utils -######################################## - - -# We do some light preprocessing here to make our data format compatible with nemo rl format -def nemo_gym_example_to_nemo_rl_datum_spec( - nemo_gym_example: dict, idx: int -) -> DatumSpec: - return DatumSpec( - message_log=[ - {"role": "user", "content": "", "token_ids": torch.tensor([])} - ], # Fake message - length=0, - extra_env_info=nemo_gym_example, - loss_multiplier=1.0, # Fix to 1.0 to backprop on all examples - idx=idx, - task_name="nemo_gym", - stop_strings=None, - # Extra vars - token_ids=[], # Just need this empty key to be compatible with the current NeMo RL GRPO impl - ) From 98b67441d9bba91b3f36af7c97910bac9bd8c4eb Mon Sep 17 00:00:00 2001 From: Yuki Huang Date: Tue, 3 Feb 2026 20:19:05 -0800 Subject: [PATCH 07/13] fix Dataset issue Signed-off-by: Yuki Huang --- nemo_rl/data/datasets/response_datasets/nemogym_dataset.py | 6 +++--- nemo_rl/data/processors.py | 4 +++- nemo_rl/environments/nemo_gym.py | 1 - 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py index d8d6d30a85..47bcd59eab 100644 --- a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py +++ b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json from typing import Optional from datasets import Dataset @@ -30,9 +29,10 @@ def __init__( if self.task_name[0] == "-": self.task_name = self.task_name[1:] - # load from jsonl + # load raw line from jsonl + # will use `json.loads` to load to dict format at `nemo_gym_data_processor` later since `Dataset` cannot handle nested structure well with open(data_path) as f: - self.dataset = list(map(json.loads, f)) + self.dataset = [raw_line for raw_line in f] # format the dataset self.dataset = Dataset.from_dict( diff --git a/nemo_rl/data/processors.py b/nemo_rl/data/processors.py index 168b11aa5b..0ae5feef64 100644 --- a/nemo_rl/data/processors.py +++ b/nemo_rl/data/processors.py @@ -14,6 +14,7 @@ """Contains data processors for evaluation.""" +import json import logging from typing import Any, Dict, cast @@ -672,7 +673,8 @@ def nemo_gym_data_processor( ) -> DatumSpec: """Process a datum dictionary (directly loaded from dataset) into a DatumSpec for Nemo Gym.""" output: DatumSpec = { - "extra_env_info": datum_dict["extra_env_info"], + # load to dict format here since `Dataset` cannot handle nested structure well in `NemoGymDataset` + "extra_env_info": json.loads(datum_dict["extra_env_info"]), "loss_multiplier": 1.0, "idx": idx, "task_name": datum_dict["task_name"], diff --git a/nemo_rl/environments/nemo_gym.py b/nemo_rl/environments/nemo_gym.py index b59b8b46fa..6694d4f4d1 100644 --- a/nemo_rl/environments/nemo_gym.py +++ b/nemo_rl/environments/nemo_gym.py @@ -18,7 +18,6 @@ import torch from transformers import PreTrainedTokenizerBase -from nemo_rl.data.interfaces import DatumSpec from nemo_rl.distributed.virtual_cluster import _get_free_port_local, _get_node_ip_local from nemo_rl.environments.interfaces import EnvironmentInterface from nemo_rl.utils.timer import Timer From e52569f94a304c165cad6630fc5631368fbeeb17 Mon Sep 17 00:00:00 2001 From: Yuki Huang Date: Wed, 4 Feb 2026 01:11:09 -0800 Subject: [PATCH 08/13] use setup_response_data Signed-off-by: Yuki Huang --- examples/nemo_gym/run_grpo_nemo_gym.py | 117 ++----------------------- examples/run_distillation.py | 4 +- examples/run_grpo.py | 4 +- examples/run_vlm_grpo.py | 4 +- nemo_rl/data/utils.py | 59 ++++++++----- 5 files changed, 52 insertions(+), 136 deletions(-) diff --git a/examples/nemo_gym/run_grpo_nemo_gym.py b/examples/nemo_gym/run_grpo_nemo_gym.py index 03ba43a1ee..1ded0694c5 100644 --- a/examples/nemo_gym/run_grpo_nemo_gym.py +++ b/examples/nemo_gym/run_grpo_nemo_gym.py @@ -15,7 +15,6 @@ import argparse import os import pprint -from typing import Dict, Optional # Increase the W&B single object size warning threshold. Initially 100_000 (100 KB) -> 10_000_000 (10 MB) import wandb.util @@ -23,7 +22,6 @@ wandb.util.VALUE_BYTES_LIMIT = 10_000_000 import ray -from datasets import concatenate_datasets from omegaconf import OmegaConf from wandb import Table @@ -41,12 +39,7 @@ setup, ) from nemo_rl.algorithms.utils import get_tokenizer -from nemo_rl.data.datasets import ( - AllTaskProcessedDataset, - extract_necessary_env_names, - load_response_dataset, - update_single_dataset_config, -) +from nemo_rl.data.utils import setup_response_data from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.environments.nemo_gym import ( NemoGymConfig, @@ -74,99 +67,6 @@ def parse_args() -> tuple[argparse.Namespace, list[str]]: return args, overrides -def setup_data( - tokenizer: TokenizerType, - data_config: Dict, - env_configs: Dict, - seed: int, -) -> tuple[ - AllTaskProcessedDataset, - Optional[AllTaskProcessedDataset], - dict[str, EnvironmentInterface], - dict[str, EnvironmentInterface], -]: - print("\n▶ Setting up envs...") - env_name_list = extract_necessary_env_names(data_config) - envs = { - env_name: create_env(env_name=env_name, env_config=env_configs[env_name]) - for env_name in env_name_list - if env_name != "nemo_gym" - } - print("\n▶ Setting up data...") - # setup train dataset - task_data_processors = {} - task_to_env = {} - data_list = [] - - if isinstance(data_config["train"], dict): - data_config["train"] = [data_config["train"]] - for cfg in data_config["train"]: - update_single_dataset_config(cfg, data_config["default"]) - data = load_response_dataset(cfg) - data_list.append(data) - # bind task_name to task_data_processors and task_to_env - task_name = data.task_name - task_data_processors[task_name] = (data.task_spec, data.processor) - # Skip binding nemo_gym env to task_to_env, nemo_gym env need to initialize policy first - if cfg["env_name"] != "nemo_gym": - task_to_env[task_name] = envs[cfg["env_name"]] - - merged_data = concatenate_datasets([data.dataset for data in data_list]) - dataset = AllTaskProcessedDataset( - merged_data, - tokenizer, - None, - task_data_processors, - max_seq_length=data_config["max_input_seq_length"], - ) - print(f" ✓ Training dataset loaded with {len(dataset)} samples.") - - # setup validation dataset - val_task_data_processors = {} - val_task_to_env = {} - val_data_list = [] - - for data in data_list: - if hasattr(data, "val_dataset") and data.val_dataset is not None: - val_data_list.append(data.val_dataset) - # bind task_name to task_data_processors - task_name = data.task_name - val_task_data_processors[task_name] = task_data_processors[task_name] - if task_name in task_to_env: - val_task_to_env[task_name] = task_to_env[task_name] - - if data_config["validation"] is not None: - if isinstance(data_config["validation"], dict): - data_config["validation"] = [data_config["validation"]] - - for cfg in data_config["validation"]: - update_single_dataset_config(cfg, data_config["default"]) - val_data = load_response_dataset(cfg) - val_data_list.append(val_data.dataset) - # bind task_name to task_data_processors - task_name = val_data.task_name - val_task_data_processors[task_name] = ( - val_data.task_spec, - val_data.processor, - ) - if cfg["env_name"] != "nemo_gym": - val_task_to_env[task_name] = envs[cfg["env_name"]] - - val_dataset = None - if len(val_data_list) > 0: - merged_val_data = concatenate_datasets(val_data_list) - val_dataset = AllTaskProcessedDataset( - merged_val_data, - tokenizer, - None, - val_task_data_processors, - max_seq_length=data_config["max_input_seq_length"], - ) - print(f" ✓ Validation dataset loaded with {len(val_dataset)} samples.") - - return dataset, val_dataset, task_to_env, val_task_to_env - - # These types are directly imported from grpo_train since if something about the architecture changes we want to immediately fail. def collect_trajectories( policy: ColocatablePolicyInterface, @@ -259,12 +159,10 @@ def main() -> None: # We assert here since this is right after the final config has been materialized. assert _should_use_nemo_gym(config) + # NeMo-Gym environment needs to get dp_openai_server_base_urls from policy_generation, so we don't setup env here. print("\n▶ Setting up data...") - train_dataset, val_dataset, task_to_env, val_task_to_env = setup_data( - tokenizer=tokenizer, - data_config=config["data"], - env_configs=config["env"], - seed=config["grpo"]["seed"], + train_dataset, val_dataset = setup_response_data( + tokenizer, config["data"], env_configs=None ) # Validation dataset config setup. @@ -314,8 +212,11 @@ def main() -> None: nemo_gym = create_env(env_name="nemo_gym", env_config=nemo_gym_config) # Blocking wait for NeMo-Gym to spin up ray.get(nemo_gym.health_check.remote()) - task_to_env["nemo_gym"] = nemo_gym - val_task_to_env["nemo_gym"] = nemo_gym + + # Bind task_to_env and val_task_to_env for nemo_gym env + # Hardcode here to match `run_async_nemo_gym_rollout` + task_to_env = {"nemo_gym": nemo_gym} + val_task_to_env = {"nemo_gym": nemo_gym} if is_trajectory_collection: collect_trajectories( diff --git a/examples/run_distillation.py b/examples/run_distillation.py index 60cf58909d..a57a0d2fed 100644 --- a/examples/run_distillation.py +++ b/examples/run_distillation.py @@ -19,7 +19,7 @@ from nemo_rl.algorithms.distillation import MasterConfig, distillation_train, setup from nemo_rl.algorithms.utils import get_tokenizer -from nemo_rl.data.utils import setup_data_with_envs +from nemo_rl.data.utils import setup_response_data from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.models.generation import configure_generation_config from nemo_rl.utils.config import load_config, parse_hydra_overrides @@ -79,7 +79,7 @@ def main() -> None: val_dataset, task_to_env, val_task_to_env, - ) = setup_data_with_envs(tokenizer, config["data"], config["env"]) + ) = setup_response_data(tokenizer, config["data"], config["env"]) ( student_policy, diff --git a/examples/run_grpo.py b/examples/run_grpo.py index 83fd9f1d97..b05721d6a1 100644 --- a/examples/run_grpo.py +++ b/examples/run_grpo.py @@ -20,7 +20,7 @@ from nemo_rl.algorithms.grpo import MasterConfig, grpo_train, setup from nemo_rl.algorithms.utils import get_tokenizer -from nemo_rl.data.utils import setup_data_with_envs +from nemo_rl.data.utils import setup_response_data from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.models.generation import configure_generation_config from nemo_rl.utils.config import load_config, parse_hydra_overrides @@ -91,7 +91,7 @@ def main() -> None: val_dataset, task_to_env, val_task_to_env, - ) = setup_data_with_envs(tokenizer, config["data"], config["env"]) + ) = setup_response_data(tokenizer, config["data"], config["env"]) ( policy, diff --git a/examples/run_vlm_grpo.py b/examples/run_vlm_grpo.py index 65613ddee6..d75d23a343 100644 --- a/examples/run_vlm_grpo.py +++ b/examples/run_vlm_grpo.py @@ -20,7 +20,7 @@ from nemo_rl.algorithms.grpo import MasterConfig, grpo_train, setup from nemo_rl.algorithms.utils import get_tokenizer -from nemo_rl.data.utils import setup_data_with_envs +from nemo_rl.data.utils import setup_response_data from nemo_rl.distributed.virtual_cluster import init_ray from nemo_rl.models.generation import configure_generation_config from nemo_rl.utils.config import load_config, parse_hydra_overrides @@ -97,7 +97,7 @@ def main() -> None: val_dataset, task_to_env, val_task_to_env, - ) = setup_data_with_envs(processor, config["data"], config["env"], is_vlm=True) + ) = setup_response_data(processor, config["data"], config["env"], is_vlm=True) ( policy, diff --git a/nemo_rl/data/utils.py b/nemo_rl/data/utils.py index c7f97b0592..398c69b111 100644 --- a/nemo_rl/data/utils.py +++ b/nemo_rl/data/utils.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Optional +from typing import Any, Optional, Union from datasets import concatenate_datasets from transformers import AutoProcessor, AutoTokenizer @@ -31,16 +31,19 @@ # TODO: @yukih: unify to setup_data after dataset refactored -def setup_data_with_envs( +def setup_response_data( tokenizer: AutoProcessor | AutoTokenizer, data_config: DataConfig, - env_configs: dict[str, Any], + env_configs: Optional[dict[str, Any]] = None, is_vlm: bool = False, -) -> tuple[ - AllTaskProcessedDataset, - Optional[AllTaskProcessedDataset], - dict[str, EnvironmentInterface], - dict[str, EnvironmentInterface], +) -> Union[ + tuple[AllTaskProcessedDataset, Optional[AllTaskProcessedDataset]], + tuple[ + AllTaskProcessedDataset, + Optional[AllTaskProcessedDataset], + dict[str, EnvironmentInterface], + dict[str, EnvironmentInterface], + ], ]: """Setup data with environments. @@ -49,25 +52,31 @@ def setup_data_with_envs( Args: tokenizer: Tokenizer or processor. data_config: Data config. - env_configs: Environment configs. + env_configs: Environment configs. If None, will not create environments. is_vlm: Whether to use VLM training or not. Returns: - A tuple of (train dataset, validation dataset, task to environment, task to validation environment). + If env_configs is not None: + A tuple of (train dataset, validation dataset, task to environment, task to validation environment). + If env_configs is None: + A tuple of (train dataset, validation dataset). """ assert "train" in data_config, ( "The dataset config structure is updated. Please refer to https://github.com/NVIDIA-NeMo/RL/blob/main/docs/guides/grpo.md#dataset " "and the Migrate Guide in https://github.com/NVIDIA-NeMo/RL/pull/1649 to update the dataset config." ) - print("\n▶ Setting up envs...") - env_name_list = extract_necessary_env_names(data_config) - envs = {} - for env_name in env_name_list: - registered_env_name = "vlm" if is_vlm else env_name - envs[env_name] = create_env( - env_name=registered_env_name, env_config=env_configs[env_name] - ) + # setup environments if needed + has_envs = env_configs is not None + if has_envs: + print("\n▶ Setting up envs...") + env_name_list = extract_necessary_env_names(data_config) + envs = {} + for env_name in env_name_list: + registered_env_name = "vlm" if is_vlm else env_name + envs[env_name] = create_env( + env_name=registered_env_name, env_config=env_configs[env_name] + ) print("\n▶ Setting up data...") # setup train dataset @@ -87,7 +96,8 @@ def setup_data_with_envs( # bind task_name to task_data_processors and task_to_env task_name = data.task_name task_data_processors[task_name] = (data.task_spec, data.processor) - task_to_env[task_name] = envs[cfg["env_name"]] + if has_envs: + task_to_env[task_name] = envs[cfg["env_name"]] merged_data = concatenate_datasets([data.dataset for data in data_list]) dataset = AllTaskProcessedDataset( @@ -111,7 +121,8 @@ def setup_data_with_envs( # bind task_name to task_data_processors and task_to_env task_name = data.task_name val_task_data_processors[task_name] = task_data_processors[task_name] - val_task_to_env[task_name] = task_to_env[task_name] + if has_envs: + val_task_to_env[task_name] = task_to_env[task_name] # validation dataset from config if "validation" in data_config and data_config["validation"] is not None: @@ -130,7 +141,8 @@ def setup_data_with_envs( val_data.task_spec, val_data.processor, ) - val_task_to_env[task_name] = envs[cfg["env_name"]] + if has_envs: + val_task_to_env[task_name] = envs[cfg["env_name"]] val_dataset = None if len(val_data_list) > 0: @@ -144,7 +156,10 @@ def setup_data_with_envs( ) print(f" ✓ Validation dataset loaded with {len(val_dataset)} samples.") - return dataset, val_dataset, task_to_env, val_task_to_env + if has_envs: + return dataset, val_dataset, task_to_env, val_task_to_env + else: + return dataset, val_dataset # TODO: @yukih: unify to setup_data after dataset refactored From b181c7820856359aed06682564fcc4ce8ac2d364 Mon Sep 17 00:00:00 2001 From: Yuki Huang Date: Wed, 4 Feb 2026 01:17:10 -0800 Subject: [PATCH 09/13] fix unit test and minor update Signed-off-by: Yuki Huang --- ...rkplace_assistant_nemotron_nano_v2_9b.yaml | 6 +++--- examples/nemo_gym/run_grpo_nemo_gym.py | 4 ++-- .../response_datasets/nemogym_dataset.py | 6 +----- tests/unit/experience/test_rollouts.py | 20 ++++++++++++++++--- 4 files changed, 23 insertions(+), 13 deletions(-) diff --git a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml index ed5c245abd..c360f82fc0 100644 --- a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml +++ b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml @@ -230,7 +230,7 @@ policy: num_nodes: null # Decides number of nodes to be dedicated to generation data: - max_input_seq_length: null # nemogym processor doesn't use this parameter + max_input_seq_length: null # nemogym dataset doesn't use this parameter shuffle: true num_workers: 0 @@ -244,8 +244,8 @@ data: data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/validation.jsonl default: env_name: "nemo_gym" - prompt_file: null - system_prompt_file: null + prompt_file: null # nemogym dataset doesn't use this parameter + system_prompt_file: null # nemogym dataset doesn't use this parameter processor: "nemo_gym_data_processor" env: diff --git a/examples/nemo_gym/run_grpo_nemo_gym.py b/examples/nemo_gym/run_grpo_nemo_gym.py index 1ded0694c5..d74cbab1b8 100644 --- a/examples/nemo_gym/run_grpo_nemo_gym.py +++ b/examples/nemo_gym/run_grpo_nemo_gym.py @@ -123,7 +123,7 @@ def main() -> None: if not args.config: args.config = os.path.join( os.path.dirname(__file__), - "grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml", + "grpo_workplace_assistant_nemotron_nano_v2_9b.yaml", ) config = load_config(args.config) @@ -216,7 +216,7 @@ def main() -> None: # Bind task_to_env and val_task_to_env for nemo_gym env # Hardcode here to match `run_async_nemo_gym_rollout` task_to_env = {"nemo_gym": nemo_gym} - val_task_to_env = {"nemo_gym": nemo_gym} + val_task_to_env = task_to_env if is_trajectory_collection: collect_trajectories( diff --git a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py index 47bcd59eab..8e4de026c0 100644 --- a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py +++ b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional - from datasets import Dataset from nemo_rl.data.datasets.raw_dataset import RawDataset @@ -22,9 +20,7 @@ class NemoGymDataset(RawDataset): """Simple wrapper around the Nemo Gym dataset.""" - def __init__( - self, data_path: Optional[str] = None, repeat: int = 1, **kwargs - ) -> None: + def __init__(self, data_path: str, repeat: int = 1, **kwargs) -> None: self.task_name = "-".join(data_path.split("/")[-2:]).split(".")[0] if self.task_name[0] == "-": self.task_name = self.task_name[1:] diff --git a/tests/unit/experience/test_rollouts.py b/tests/unit/experience/test_rollouts.py index 117fdca936..f3486de21e 100644 --- a/tests/unit/experience/test_rollouts.py +++ b/tests/unit/experience/test_rollouts.py @@ -13,6 +13,8 @@ # limitations under the License. import gc +import json +import tempfile from copy import deepcopy from dataclasses import asdict @@ -22,8 +24,10 @@ from transformers import AutoTokenizer from nemo_rl.data.collate_fn import rl_collate_fn +from nemo_rl.data.datasets.response_datasets import NemoGymDataset from nemo_rl.data.interfaces import DatumSpec from nemo_rl.data.llm_message_utils import batched_message_log_to_flat_message +from nemo_rl.data.processors import nemo_gym_data_processor from nemo_rl.distributed.batched_data_dict import BatchedDataDict from nemo_rl.distributed.virtual_cluster import RayVirtualCluster from nemo_rl.environments.games.sliding_puzzle import ( @@ -32,7 +36,6 @@ SlidingPuzzleGameLogic, SlidingPuzzleMetadata, ) -from nemo_rl.environments.nemo_gym import nemo_gym_example_to_nemo_rl_datum_spec from nemo_rl.experience.rollouts import ( _calculate_single_metric, run_async_multi_turn_rollout, @@ -794,10 +797,21 @@ def test_run_async_nemo_gym_rollout( nemo_gym_sanity_test_data, # noqa: F811 nemo_gym_tokenizer, # noqa: F811 ): + # only keep the input part of the data for the test + with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f: + for data in nemo_gym_sanity_test_data["input"]: + f.write(json.dumps(data) + "\n") + data_path = f.name + + # load the dataset and convert to compatible format for Nemo RL + nemo_gym_sanity_test_data = NemoGymDataset(data_path) nemo_rl_compatible_examples: list[DatumSpec] = [ - nemo_gym_example_to_nemo_rl_datum_spec(nemo_gym_example, idx) - for idx, nemo_gym_example in enumerate(nemo_gym_sanity_test_data["input"]) + nemo_gym_data_processor( + nemo_gym_sanity_test_data.dataset[idx], None, None, None, idx + ) + for idx in range(len(nemo_gym_sanity_test_data.dataset)) ] + input_batch: BatchedDataDict[DatumSpec] = rl_collate_fn(nemo_rl_compatible_examples) actual_result = run_async_nemo_gym_rollout( policy_generation=nemo_gym_vllm_generation, From e0c20e1ad0ecb3cac9f88f9b646c6b9e56351aab Mon Sep 17 00:00:00 2001 From: Yuki Huang Date: Wed, 4 Feb 2026 05:18:03 -0800 Subject: [PATCH 10/13] pyrefly Signed-off-by: Yuki Huang --- .../grpo_workplace_assistant_nemotron_nano_v2_9b.yaml | 3 +-- pyrefly.toml | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml index c360f82fc0..a28caddfc3 100644 --- a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml +++ b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml @@ -237,12 +237,11 @@ data: # Using the prepared train and validation datasets (downloaded from HuggingFace and split 90/10) # Train: 1129 samples, Validation: 126 samples train: - dataset_name: NemoGymDataset data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/train.jsonl validation: - dataset_name: NemoGymDataset data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/validation.jsonl default: + dataset_name: NemoGymDataset env_name: "nemo_gym" prompt_file: null # nemogym dataset doesn't use this parameter system_prompt_file: null # nemogym dataset doesn't use this parameter diff --git a/pyrefly.toml b/pyrefly.toml index 32e67b658a..93adf3fe88 100644 --- a/pyrefly.toml +++ b/pyrefly.toml @@ -65,6 +65,7 @@ project-includes = [ "nemo_rl/data/datasets/response_datasets/deepscaler.py", "nemo_rl/data/datasets/response_datasets/geometry3k.py", "nemo_rl/data/datasets/response_datasets/helpsteer3.py", + "nemo_rl/data/datasets/response_datasets/nemogym_dataset.py", "nemo_rl/data/datasets/response_datasets/oai_format_dataset.py", "nemo_rl/data/datasets/response_datasets/oasst.py", "nemo_rl/data/datasets/response_datasets/openmathinstruct2.py", @@ -102,16 +103,15 @@ project-includes = [ "nemo_rl/models/dtensor/parallelize.py", "nemo_rl/models/generation/__init__.py", "nemo_rl/models/generation/interfaces.py", + "nemo_rl/models/generation/sglang/__init__.py", + "nemo_rl/models/generation/sglang/config.py", "nemo_rl/models/generation/vllm/__init__.py", "nemo_rl/models/generation/vllm/config.py", "nemo_rl/models/generation/vllm/quantization/fp8_train_utils.py", "nemo_rl/models/generation/vllm/utils.py", "nemo_rl/models/generation/vllm/vllm_backend.py", - "nemo_rl/models/generation/sglang/__init__.py", - "nemo_rl/models/generation/sglang/config.py", "nemo_rl/models/huggingface/__init__.py", "nemo_rl/models/megatron/__init__.py", - "nemo_rl/models/megatron/community_import.py", "nemo_rl/models/policy/__init__.py", "nemo_rl/models/policy/interfaces.py", "nemo_rl/models/policy/utils.py", From a28d13bfa9710162341610e8974bc94e54437830 Mon Sep 17 00:00:00 2001 From: Yuki Huang Date: Thu, 5 Feb 2026 05:51:25 -0800 Subject: [PATCH 11/13] coderabbit Signed-off-by: Yuki Huang --- examples/nemo_gym/run_grpo_nemo_gym.py | 1 - nemo_rl/data/datasets/response_datasets/nemogym_dataset.py | 7 ++++++- nemo_rl/data/processors.py | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/nemo_gym/run_grpo_nemo_gym.py b/examples/nemo_gym/run_grpo_nemo_gym.py index d74cbab1b8..9755b37fa0 100644 --- a/examples/nemo_gym/run_grpo_nemo_gym.py +++ b/examples/nemo_gym/run_grpo_nemo_gym.py @@ -208,7 +208,6 @@ def main() -> None: base_urls=policy_generation.dp_openai_server_base_urls, initial_global_config_dict=config["env"]["nemo_gym"], ) - # Default nemo_gym env is used for trajectory collection nemo_gym = create_env(env_name="nemo_gym", env_config=nemo_gym_config) # Blocking wait for NeMo-Gym to spin up ray.get(nemo_gym.health_check.remote()) diff --git a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py index 8e4de026c0..a1f97e1a5e 100644 --- a/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py +++ b/nemo_rl/data/datasets/response_datasets/nemogym_dataset.py @@ -18,7 +18,12 @@ class NemoGymDataset(RawDataset): - """Simple wrapper around the Nemo Gym dataset.""" + """Simple wrapper around the Nemo Gym dataset. + + Args: + data_path: Path to the dataset JSONL file + repeat: Number of times to repeat the dataset, default is 1 + """ def __init__(self, data_path: str, repeat: int = 1, **kwargs) -> None: self.task_name = "-".join(data_path.split("/")[-2:]).split(".")[0] diff --git a/nemo_rl/data/processors.py b/nemo_rl/data/processors.py index 0ae5feef64..52ac9bf67d 100644 --- a/nemo_rl/data/processors.py +++ b/nemo_rl/data/processors.py @@ -668,7 +668,7 @@ def nemo_gym_data_processor( datum_dict: dict[str, Any], task_data_spec: TaskDataSpec, tokenizer: TokenizerType, - max_seq_length: int, + max_seq_length: int | None, idx: int, ) -> DatumSpec: """Process a datum dictionary (directly loaded from dataset) into a DatumSpec for Nemo Gym.""" From 0e03bffd9a946725eeea3443ab497d58223c0f7a Mon Sep 17 00:00:00 2001 From: Yuki Huang Date: Thu, 5 Feb 2026 06:18:06 -0800 Subject: [PATCH 12/13] address comments Signed-off-by: Yuki Huang --- examples/nemo_gym/run_grpo_nemo_gym.py | 11 ++++++----- nemo_rl/data/utils.py | 5 ++++- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/examples/nemo_gym/run_grpo_nemo_gym.py b/examples/nemo_gym/run_grpo_nemo_gym.py index 9755b37fa0..dbef2c5791 100644 --- a/examples/nemo_gym/run_grpo_nemo_gym.py +++ b/examples/nemo_gym/run_grpo_nemo_gym.py @@ -175,11 +175,12 @@ def main() -> None: The validation set you pass in will directly be used for validation with no additional preprocessing. If you want to have some number of repetitions, please include that in your dataset, via ``num_repeats``, in your dataset config and `ng_prepare_data` will prepare it accordingly.""" ) - print( - f"Setting `grpo.max_val_samples` and `grpo.val_batch_size` to the length of the validation dataset, which is {len(val_dataset)}" - ) - config["grpo"]["max_val_samples"] = len(val_dataset) - config["grpo"]["val_batch_size"] = config["grpo"]["max_val_samples"] + if val_dataset is not None: + print( + f"Setting `grpo.max_val_samples` and `grpo.val_batch_size` to the length of the validation dataset, which is {len(val_dataset)}" + ) + config["grpo"]["max_val_samples"] = len(val_dataset) + config["grpo"]["val_batch_size"] = config["grpo"]["max_val_samples"] # Print config print("Final config:") diff --git a/nemo_rl/data/utils.py b/nemo_rl/data/utils.py index 398c69b111..4b11e80e7f 100644 --- a/nemo_rl/data/utils.py +++ b/nemo_rl/data/utils.py @@ -52,7 +52,10 @@ def setup_response_data( Args: tokenizer: Tokenizer or processor. data_config: Data config. - env_configs: Environment configs. If None, will not create environments. + env_configs: Environment configs. + If None, no environments will be created. This is used for: + - Algorithms like SFT which do not need environments. + - Environments like NeMo-Gym which need to handle the environment creation outside of this function. is_vlm: Whether to use VLM training or not. Returns: From 66cb82c1cb0c6bc5d2c6194aeca19fbaf2f0e4c5 Mon Sep 17 00:00:00 2001 From: Yuki Huang Date: Fri, 6 Feb 2026 19:49:08 -0800 Subject: [PATCH 13/13] add comment Signed-off-by: Yuki Huang --- .../grpo_workplace_assistant_nemotron_nano_v2_9b.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml index a28caddfc3..07f5949288 100644 --- a/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml +++ b/examples/nemo_gym/grpo_workplace_assistant_nemotron_nano_v2_9b.yaml @@ -230,6 +230,10 @@ policy: num_nodes: null # Decides number of nodes to be dedicated to generation data: + # The complete trajectory is managed by gym, which will pass the entire thing to vllm. + # We just catch the vllm max seq len error and return an empty response with + # finish_reason=stop or incomplete_details=max_length that can be used to stop the trajectory, + # so max_input_seq_length is not used in this case. max_input_seq_length: null # nemogym dataset doesn't use this parameter shuffle: true num_workers: 0