Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ loss_fn:

checkpointing:
enabled: true
checkpoint_dir: "results/grpo"
metric_name: "val:accuracy"
higher_is_better: true
keep_top_k: 3
Expand Down Expand Up @@ -229,13 +230,27 @@ policy:
num_nodes: null # Decides number of nodes to be dedicated to generation

data:
# Using the prepared train and validation datasets (downloaded from HuggingFace and split 90/10)
# Train: 1129 samples, Validation: 126 samples
train_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/train.jsonl
validation_jsonl_fpath: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/validation.jsonl
# The complete trajectory is managed by gym, which will pass the entire thing to vllm.
# We just catch the vllm max seq len error and return an empty response with
# finish_reason=stop or incomplete_details=max_length that can be used to stop the trajectory,
# so max_input_seq_length is not used in this case.
max_input_seq_length: null # nemogym dataset doesn't use this parameter
shuffle: true
num_workers: 0

# Using the prepared train and validation datasets (downloaded from HuggingFace and split 90/10)
# Train: 1129 samples, Validation: 126 samples
train:
data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/train.jsonl
validation:
data_path: 3rdparty/Gym-workspace/Gym/data/workplace_assistant/validation.jsonl
default:
dataset_name: NemoGymDataset
env_name: "nemo_gym"
prompt_file: null # nemogym dataset doesn't use this parameter
system_prompt_file: null # nemogym dataset doesn't use this parameter
processor: "nemo_gym_data_processor"

env:
should_use_nemo_gym: true
should_log_nemo_gym_responses: true # If you have low logging storage, set this to false
Expand Down
80 changes: 16 additions & 64 deletions examples/nemo_gym/run_grpo_nemo_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
# limitations under the License.

import argparse
import json
import os
import pprint
from itertools import chain, repeat
from typing import Optional

# Increase the W&B single object size warning threshold. Initially 100_000 (100 KB) -> 10_000_000 (10 MB)
import wandb.util
Expand All @@ -42,18 +39,13 @@
setup,
)
from nemo_rl.algorithms.utils import get_tokenizer
from nemo_rl.data.datasets import AllTaskProcessedDataset
from nemo_rl.data.interfaces import DatumSpec
from nemo_rl.distributed.ray_actor_environment_registry import (
get_actor_python_env,
)
from nemo_rl.data.utils import setup_response_data
from nemo_rl.distributed.virtual_cluster import init_ray
from nemo_rl.environments.nemo_gym import (
NemoGym,
NemoGymConfig,
nemo_gym_example_to_nemo_rl_datum_spec,
setup_nemo_gym_config,
)
from nemo_rl.environments.utils import create_env
from nemo_rl.experience.rollouts import run_async_nemo_gym_rollout
from nemo_rl.models.generation import configure_generation_config
from nemo_rl.utils.config import load_config, parse_hydra_overrides
Expand All @@ -75,40 +67,6 @@ def parse_args() -> tuple[argparse.Namespace, list[str]]:
return args, overrides


def setup_single_nemo_gym_dataset(
jsonl_fpath: str, tokenizer, num_repeats: Optional[int] = None
):
with open(jsonl_fpath) as f:
nemo_gym_examples = list(map(json.loads, f))

print(f"Loaded data at {jsonl_fpath}. Found {len(nemo_gym_examples)} examples")

if num_repeats:
previous_length = len(nemo_gym_examples)
nemo_gym_examples = list(
chain.from_iterable(
repeat(nemo_gym_example, num_repeats)
for nemo_gym_example in nemo_gym_examples
)
)
print(
f"Repeating examples (in a pattern of abc to aabbcc) for {jsonl_fpath} from {previous_length} to {len(nemo_gym_examples)}!"
)

nemo_rl_compatible_examples: list[DatumSpec] = [
nemo_gym_example_to_nemo_rl_datum_spec(nemo_gym_example, idx)
for idx, nemo_gym_example in enumerate(nemo_gym_examples)
]

passthrough_task_processor = lambda datum_dict, *args, **kwargs: datum_dict
return AllTaskProcessedDataset(
nemo_rl_compatible_examples,
tokenizer,
None,
passthrough_task_processor,
)


# These types are directly imported from grpo_train since if something about the architecture changes we want to immediately fail.
def collect_trajectories(
policy: ColocatablePolicyInterface,
Expand Down Expand Up @@ -165,7 +123,7 @@ def main() -> None:
if not args.config:
args.config = os.path.join(
os.path.dirname(__file__),
"grpo_dapo17k_bytedtsinghua_qwen3_4binstruct_nf.yaml",
"grpo_workplace_assistant_nemotron_nano_v2_9b.yaml",
)

config = load_config(args.config)
Expand Down Expand Up @@ -201,14 +159,10 @@ def main() -> None:
# We assert here since this is right after the final config has been materialized.
assert _should_use_nemo_gym(config)

# NeMo-Gym environment needs to get dp_openai_server_base_urls from policy_generation, so we don't setup env here.
print("\n▶ Setting up data...")
train_dataset = setup_single_nemo_gym_dataset(
jsonl_fpath=config["data"]["train_jsonl_fpath"],
tokenizer=tokenizer,
)
val_dataset = setup_single_nemo_gym_dataset(
jsonl_fpath=config["data"]["validation_jsonl_fpath"],
tokenizer=tokenizer,
train_dataset, val_dataset = setup_response_data(
tokenizer, config["data"], env_configs=None
)

# Validation dataset config setup.
Expand All @@ -221,11 +175,12 @@ def main() -> None:
The validation set you pass in will directly be used for validation with no additional preprocessing. If you want to have some number of repetitions, please include that in your dataset, via ``num_repeats``, in your dataset config and `ng_prepare_data` will prepare it accordingly."""
)

print(
f"Setting `grpo.max_val_samples` and `grpo.val_batch_size` to the length of the validation dataset, which is {len(val_dataset)}"
)
config["grpo"]["max_val_samples"] = len(val_dataset)
config["grpo"]["val_batch_size"] = config["grpo"]["max_val_samples"]
if val_dataset is not None:
print(
f"Setting `grpo.max_val_samples` and `grpo.val_batch_size` to the length of the validation dataset, which is {len(val_dataset)}"
)
config["grpo"]["max_val_samples"] = len(val_dataset)
config["grpo"]["val_batch_size"] = config["grpo"]["max_val_samples"]

# Print config
print("Final config:")
Expand Down Expand Up @@ -254,15 +209,12 @@ def main() -> None:
base_urls=policy_generation.dp_openai_server_base_urls,
initial_global_config_dict=config["env"]["nemo_gym"],
)
nemo_gym = NemoGym.options(
runtime_env={
"py_executable": get_actor_python_env(
"nemo_rl.environments.nemo_gym.NemoGym"
),
}
).remote(nemo_gym_config)
nemo_gym = create_env(env_name="nemo_gym", env_config=nemo_gym_config)
# Blocking wait for NeMo-Gym to spin up
ray.get(nemo_gym.health_check.remote())

# Bind task_to_env and val_task_to_env for nemo_gym env
# Hardcode here to match `run_async_nemo_gym_rollout`
task_to_env = {"nemo_gym": nemo_gym}
val_task_to_env = task_to_env

Expand Down
4 changes: 2 additions & 2 deletions examples/run_distillation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from nemo_rl.algorithms.distillation import MasterConfig, distillation_train, setup
from nemo_rl.algorithms.utils import get_tokenizer
from nemo_rl.data.utils import setup_data_with_envs
from nemo_rl.data.utils import setup_response_data
from nemo_rl.distributed.virtual_cluster import init_ray
from nemo_rl.models.generation import configure_generation_config
from nemo_rl.utils.config import load_config, parse_hydra_overrides
Expand Down Expand Up @@ -79,7 +79,7 @@ def main() -> None:
val_dataset,
task_to_env,
val_task_to_env,
) = setup_data_with_envs(tokenizer, config["data"], config["env"])
) = setup_response_data(tokenizer, config["data"], config["env"])

(
student_policy,
Expand Down
4 changes: 2 additions & 2 deletions examples/run_grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from nemo_rl.algorithms.grpo import MasterConfig, grpo_train, setup
from nemo_rl.algorithms.utils import get_tokenizer
from nemo_rl.data.utils import setup_data_with_envs
from nemo_rl.data.utils import setup_response_data
from nemo_rl.distributed.virtual_cluster import init_ray
from nemo_rl.models.generation import configure_generation_config
from nemo_rl.utils.config import load_config, parse_hydra_overrides
Expand Down Expand Up @@ -91,7 +91,7 @@ def main() -> None:
val_dataset,
task_to_env,
val_task_to_env,
) = setup_data_with_envs(tokenizer, config["data"], config["env"])
) = setup_response_data(tokenizer, config["data"], config["env"])

(
policy,
Expand Down
4 changes: 2 additions & 2 deletions examples/run_vlm_grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from nemo_rl.algorithms.grpo import MasterConfig, grpo_train, setup
from nemo_rl.algorithms.utils import get_tokenizer
from nemo_rl.data.utils import setup_data_with_envs
from nemo_rl.data.utils import setup_response_data
from nemo_rl.distributed.virtual_cluster import init_ray
from nemo_rl.models.generation import configure_generation_config
from nemo_rl.utils.config import load_config, parse_hydra_overrides
Expand Down Expand Up @@ -97,7 +97,7 @@ def main() -> None:
val_dataset,
task_to_env,
val_task_to_env,
) = setup_data_with_envs(processor, config["data"], config["env"], is_vlm=True)
) = setup_response_data(processor, config["data"], config["env"], is_vlm=True)

(
policy,
Expand Down
2 changes: 1 addition & 1 deletion nemo_rl/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class PreferenceDatasetConfig(TypedDict):


class DataConfig(TypedDict):
max_input_seq_length: int
max_input_seq_length: int | None
add_bos: NotRequired[bool]
add_eos: NotRequired[bool]
add_generation_prompt: NotRequired[bool]
Expand Down
3 changes: 3 additions & 0 deletions nemo_rl/data/datasets/response_datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from nemo_rl.data.datasets.response_datasets.deepscaler import DeepScalerDataset
from nemo_rl.data.datasets.response_datasets.geometry3k import Geometry3KDataset
from nemo_rl.data.datasets.response_datasets.helpsteer3 import HelpSteer3Dataset
from nemo_rl.data.datasets.response_datasets.nemogym_dataset import NemoGymDataset
from nemo_rl.data.datasets.response_datasets.oai_format_dataset import (
OpenAIFormatDataset,
)
Expand Down Expand Up @@ -50,6 +51,7 @@
"tulu3_sft_mixture": Tulu3SftMixtureDataset,
# load from local JSONL file or HuggingFace
"openai_format": OpenAIFormatDataset,
"NemoGymDataset": NemoGymDataset,
"ResponseDataset": ResponseDataset,
}

Expand Down Expand Up @@ -87,6 +89,7 @@ def load_response_dataset(data_config: ResponseDatasetConfig):
"DeepScalerDataset",
"Geometry3KDataset",
"HelpSteer3Dataset",
"NemoGymDataset",
"OasstDataset",
"OpenAIFormatDataset",
"OpenMathInstruct2Dataset",
Expand Down
48 changes: 48 additions & 0 deletions nemo_rl/data/datasets/response_datasets/nemogym_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datasets import Dataset

from nemo_rl.data.datasets.raw_dataset import RawDataset


class NemoGymDataset(RawDataset):
"""Simple wrapper around the Nemo Gym dataset.

Args:
data_path: Path to the dataset JSONL file
repeat: Number of times to repeat the dataset, default is 1
"""

def __init__(self, data_path: str, repeat: int = 1, **kwargs) -> None:
self.task_name = "-".join(data_path.split("/")[-2:]).split(".")[0]
if self.task_name[0] == "-":
self.task_name = self.task_name[1:]

# load raw line from jsonl
# will use `json.loads` to load to dict format at `nemo_gym_data_processor` later since `Dataset` cannot handle nested structure well
with open(data_path) as f:
self.dataset = [raw_line for raw_line in f]

# format the dataset
self.dataset = Dataset.from_dict(
{
"extra_env_info": self.dataset,
"task_name": [self.task_name] * len(self.dataset),
}
)

# repeat the dataset
if repeat > 1:
self.dataset = self.dataset.repeat(repeat)
23 changes: 23 additions & 0 deletions nemo_rl/data/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Contains data processors for evaluation."""

import json
import logging
from typing import Any, Dict, cast

Expand Down Expand Up @@ -663,6 +664,27 @@ def multichoice_qa_processor(
return output


def nemo_gym_data_processor(
datum_dict: dict[str, Any],
task_data_spec: TaskDataSpec,
tokenizer: TokenizerType,
max_seq_length: int | None,
idx: int,
) -> DatumSpec:
"""Process a datum dictionary (directly loaded from dataset) into a DatumSpec for Nemo Gym."""
output: DatumSpec = {
# load to dict format here since `Dataset` cannot handle nested structure well in `NemoGymDataset`
"extra_env_info": json.loads(datum_dict["extra_env_info"]),
"loss_multiplier": 1.0,
"idx": idx,
"task_name": datum_dict["task_name"],
# fake keys for compatibility with the current GRPO implementation
"message_log": [{"role": "user", "content": "", "token_ids": torch.tensor([])}],
"length": 0,
}
return output


# Processor registry. Key is the processor name, value is the processor function.
# Note: We cast the literal dict to Dict[str, TaskDataProcessFnCallable] because
# type checkers see each concrete function's signature as a distinct callable type.
Expand All @@ -679,6 +701,7 @@ def multichoice_qa_processor(
"multichoice_qa_processor": multichoice_qa_processor,
"sft_processor": sft_processor,
"vlm_hf_data_processor": vlm_hf_data_processor,
"nemo_gym_data_processor": nemo_gym_data_processor,
},
)

Expand Down
Loading
Loading