Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b121a9a
try add pengui nstub module
bxyu-nvidia Oct 9, 2025
77484b2
try add test
bxyu-nvidia Oct 9, 2025
6846571
move into test penguin
bxyu-nvidia Oct 9, 2025
f1794b5
add to pyproject
bxyu-nvidia Oct 9, 2025
36a8b36
add emails
bxyu-nvidia Oct 9, 2025
66d98d9
no init
bxyu-nvidia Oct 9, 2025
b7f4124
add penguin extra
bxyu-nvidia Oct 9, 2025
fb84291
find packages
bxyu-nvidia Oct 9, 2025
78c3607
lint
bxyu-nvidia Oct 9, 2025
a4249ee
remove penguin submodule
bxyu-nvidia Oct 9, 2025
30b92fe
copy in
bxyu-nvidia Oct 9, 2025
b99601f
bump lock
bxyu-nvidia Oct 9, 2025
b093f92
dont skip fixture since its not a test
bxyu-nvidia Oct 9, 2025
3fff298
fix test
bxyu-nvidia Oct 9, 2025
3591e1c
import config types
bxyu-nvidia Oct 9, 2025
fb98d02
try search
bxyu-nvidia Oct 9, 2025
09b07d3
fix test priknt
bxyu-nvidia Oct 9, 2025
5a21806
Merge branch 'bxyu/add-penguin-stub' into bxyu/add-penguin-env
bxyu-nvidia Oct 9, 2025
318d62e
remove unused package name
bxyu-nvidia Oct 9, 2025
f82c4bb
try add deps
bxyu-nvidia Oct 10, 2025
f100f21
add test data
bxyu-nvidia Oct 10, 2025
f03b948
try match mbridge cached deps flow
bxyu-nvidia Oct 10, 2025
9af7e73
use set
bxyu-nvidia Oct 10, 2025
20a67fb
Merge branch 'main' of github.com:NVIDIA-NeMo/RL into bxyu/add-pengui…
bxyu-nvidia Oct 10, 2025
941a75d
update uv lock
bxyu-nvidia Oct 10, 2025
7a316ce
update uv lock
terrykong Oct 10, 2025
f6c53f5
try fix deps
bxyu-nvidia Oct 10, 2025
8695546
add back uv lock update
bxyu-nvidia Oct 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions 3rdparty/Penguin-workspace/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import tomllib
from pathlib import Path

Expand All @@ -21,19 +22,81 @@

# If the submodule is present, expose `penguin` package from the checkout
src_dir = Path("Penguin")
package_name = "penguin"


CACHED_DEPENDENCIES = [
"openai<=1.97.1",
"tqdm",
"pydantic",
"pydantic_core",
"devtools",
"fastapi",
"uvicorn",
"uvloop",
"hydra-core",
"omegaconf",
"gradio",
"mlflow",
"tdigest>=0.5.2.2",
"aiohttp",
"yappi",
]

if src_dir.exists():
pyproject_toml_path = src_dir / "pyproject.toml"
with pyproject_toml_path.open("rb") as f:
pyproject_toml = tomllib.load(f)
if not pyproject_toml_path.exists():
raise FileNotFoundError(
f"[Penguin][setup] {pyproject_toml_path} not found; skipping dependency consistency check."
)

packages = pyproject_toml["tool"]["setuptools"]["packages"]["find"]["include"]

for package in packages:
final_packages.append(package)
final_package_dir[package] = src_dir / package

actual_dependencies = pyproject_toml["project"]["dependencies"]

########################################
# Compare cached dependencies with the submodule's pyproject
########################################

missing_in_cached = set(actual_dependencies) - set(CACHED_DEPENDENCIES)
extra_in_cached = set(CACHED_DEPENDENCIES) - set(actual_dependencies)

if missing_in_cached or extra_in_cached:
print(
"[Penguin][setup] Dependency mismatch between Penguin-workspace/Penguin/pyproject.toml vs Penguin-workspace/setup.py::CACHED_DEPENDENCIES.",
file=sys.stderr,
)
if missing_in_cached:
print(
" - Present in Penguin-workspace/Penguin/pyproject.toml but missing from CACHED_DEPENDENCIES:",
file=sys.stderr,
)
for dep in sorted(missing_in_cached):
print(f" * {dep}", file=sys.stderr)
if extra_in_cached:
print(
" - Present in CACHED_DEPENDENCIES but not in Penguin-workspace/Penguin/pyproject.toml:",
file=sys.stderr,
)
for dep in sorted(extra_in_cached):
print(f" * {dep}", file=sys.stderr)
print(
" Please update CACHED_DEPENDENCIES or the submodule pyproject to keep them in sync.",
file=sys.stderr,
)
sys.exit(1)
else:
print(
"[Penguin][setup] Dependency sets are consistent with the submodule pyproject.",
file=sys.stderr,
)


setuptools.setup(
name="penguin",
version="0.0.0",
Expand All @@ -43,5 +106,5 @@
packages=final_packages,
package_dir=final_package_dir,
py_modules=["is_penguin_installed"],
install_requires=[],
install_requires=CACHED_DEPENDENCIES,
)
1 change: 1 addition & 0 deletions nemo_rl/distributed/ray_actor_environment_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
# ReplayBuffer needs vLLM environment to handle trajectory data from VllmGenerationWorker
"nemo_rl.algorithms.async_utils.ReplayBuffer": PY_EXECUTABLES.VLLM,
"nemo_rl.environments.tools.retriever.RAGEnvironment": PY_EXECUTABLES.SYSTEM,
"nemo_rl.environments.penguin.Penguin": PY_EXECUTABLES.PENGUIN,
}


Expand Down
3 changes: 3 additions & 0 deletions nemo_rl/distributed/virtual_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class PY_EXECUTABLES:
# Use NeMo-RL direct dependencies and nemo-automodel.
AUTOMODEL = "uv run --locked --extra automodel"

# Use Penguin dependencies
PENGUIN = "uv run --locked --extra penguin"

# Megatron-core (and nemo dependencies)
# We always run with --reinstall to avoid issues where someone runs "uv run ... --extra mcore ..."
# but the submodules are not downloaded yet. This results in errors where it appears Megatron/Nemo
Expand Down
202 changes: 202 additions & 0 deletions nemo_rl/environments/penguin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pathlib import Path
from typing import Any, Dict, List, TypedDict

import ray
import torch

from nemo_rl.data.interfaces import DatumSpec
from nemo_rl.distributed.virtual_cluster import _get_free_port_local, _get_node_ip_local
from nemo_rl.environments.interfaces import EnvironmentInterface


class PenguinConfig(TypedDict):
model_name: str
base_urls: List[str]
initial_global_config_dict: Dict[str, Any]


@ray.remote(max_restarts=-1, max_task_retries=-1) # pragma: no cover
class Penguin(EnvironmentInterface):
"""This environment class isn't really used for training. It's really meant as an integration wrapper around Penguin that hooks into the existing NeMo RL resource management via ray. So there is still one source of truth for resource management in NeMo RL."""

def __init__(self, cfg: PenguinConfig):
self.cfg = cfg

self.node_ip = _get_node_ip_local()
self.head_server_port = _get_free_port_local()

from omegaconf import DictConfig
from penguin.cli import GlobalConfigDictParserConfig, RunHelper
from penguin.rollout_collection import RolloutCollectionHelper
from penguin.server_utils import HEAD_SERVER_KEY_NAME, BaseServerConfig

RELATIVE_PATH = "nemo_rl/environments/penguin.py"
assert __file__.endswith(RELATIVE_PATH)

initial_global_config_dict = self.cfg["initial_global_config_dict"]
# Policy information
initial_global_config_dict["policy_model_name"] = self.cfg["model_name"]
initial_global_config_dict["policy_api_key"] = (
"dummy_key" # No key necessary for training.
)
initial_global_config_dict["policy_base_url"] = self.cfg["base_urls"]

initial_global_config_dict["global_aiohttp_connector_limit_per_host"] = (
initial_global_config_dict.get("global_aiohttp_connector_limit_per_host")
or 1024
)
initial_global_config_dict["global_aiohttp_connector_limit"] = (
initial_global_config_dict["global_aiohttp_connector_limit_per_host"]
* len(self.cfg["base_urls"])
)

print(
f"""Set `global_aiohttp_connector_limit_per_host` to a flat {initial_global_config_dict["global_aiohttp_connector_limit_per_host"]}.
Since there are {len(self.cfg["base_urls"])} data-parallel vLLM worker instances, the `global_aiohttp_connector_limit` has been set to {len(self.cfg["base_urls"])} * {initial_global_config_dict["global_aiohttp_connector_limit_per_host"]} = {initial_global_config_dict["global_aiohttp_connector_limit"]}."""
)

# Head server
initial_global_config_dict[HEAD_SERVER_KEY_NAME] = {
"host": "0.0.0.0",
"port": self.head_server_port,
}

self.rh = RunHelper()
self.rh.start(
global_config_dict_parser_config=GlobalConfigDictParserConfig(
dotenv_path=Path(__file__.removesuffix(RELATIVE_PATH)).absolute()
/ "penguin_env.yaml",
initial_global_config_dict=DictConfig(initial_global_config_dict),
skip_load_from_cli=True,
)
)

# Setup for rollout collection
self.head_server_config = BaseServerConfig(
host=self.node_ip,
port=self.head_server_port,
)
self.rch = RolloutCollectionHelper()

def health_check(self) -> bool:
return True

async def run_rollouts(self, penguin_examples: list[dict]) -> list[dict]:
penguin_results = await self.rch.run_examples(
examples=penguin_examples, head_server_config=self.head_server_config
)

nemo_rl_results = list(
map(self._postprocess_penguin_to_nemo_rl_result, penguin_results)
)
return nemo_rl_results

def _postprocess_penguin_to_nemo_rl_result(self, penguin_result: dict) -> dict:
nemo_rl_message_log = []
seen_token_ids: List[int] = []
for output_item_dict in penguin_result["response"]["output"]:
# Nemo RL really only has two types of messages: assistant and not assistant since that is all that it is concerned with (i.e. to train or not to train)
# Here we map all the trainable messages to assistant and all the non-trainable messages to user.
# Eventually we can maybe be smarter about this, but this is functional for now.

# Note that Penguin will only return token ids on "assistant" messages and not other message types.
if "generation_token_ids" not in output_item_dict:
continue

assert (
seen_token_ids
== output_item_dict["prompt_token_ids"][: len(seen_token_ids)]
), f"""Non-contiguous messages found! This may be a tokenization issue where certain tokens are combined when messages are concatenated, or it may be due to part of the chat history being truncated (like if super long history is truncated or if reasoning is stripped out).
Seen token IDs: {seen_token_ids}
Output prompt token IDs: {output_item_dict["prompt_token_ids"]}
"""

nemo_rl_message_log.append(
{
"role": "user",
"content": "",
"token_ids": output_item_dict["prompt_token_ids"][
len(seen_token_ids) :
],
}
)
nemo_rl_message_log.append(
{
"role": "assistant",
"content": "",
"token_ids": output_item_dict["generation_token_ids"],
"generation_logprobs": output_item_dict["generation_log_probs"],
}
)

seen_token_ids.extend(nemo_rl_message_log[-2]["token_ids"])
seen_token_ids.extend(nemo_rl_message_log[-1]["token_ids"])

return {
"message_log": nemo_rl_message_log,
"input_message_log": nemo_rl_message_log[:1],
"full_result": penguin_result,
}

def shutdown(self) -> None:
self.rh.shutdown()

def step(self, message_log_batch, metadata):
# This is not used since Penguin will handle the rollouts entirely.
raise NotImplementedError

def global_post_process_and_metrics(self, batch):
# Similar to the step function, this is not used.
raise NotImplementedError


########################################
# Global config utils
########################################


def setup_penguin_config(config, tokenizer) -> None:
generation_config = config["policy"]["generation"]

# Enable the http server. Requires both async engine and the expose_http_server flag
generation_config["vllm_cfg"]["async_engine"] = True
generation_config["vllm_cfg"]["expose_http_server"] = True

# Stop strings or token ids are not supported
generation_config["stop_strings"] = None
generation_config["stop_token_ids"] = None


########################################
# Data utils
########################################


# We do some light preprocessing here to make our data format compatible with nemo rl format
def penguin_example_to_nemo_rl_datum_spec(penguin_example: dict, idx: int) -> DatumSpec:
return DatumSpec(
message_log=[
{"role": "user", "content": "", "token_ids": torch.tensor([])}
], # Fake message
length=0,
extra_env_info=penguin_example,
loss_multiplier=1.0, # Fix to 1.0 to backprop on all examples
idx=idx,
task_name="penguin",
stop_strings=None,
# Extra vars
token_ids=[], # Just need this empty key to be compatible with the current NeMo RL GRPO impl
)

Large diffs are not rendered by default.

Loading
Loading