Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
dd953b0
Ray utils.
pjin-nvidia Nov 13, 2025
5717d7a
No cover.
pjin-nvidia Nov 13, 2025
44dcee1
Merge remote-tracking branch 'origin/main' into pjin/ray-utils
pjin-nvidia Nov 16, 2025
a85f4f0
WIP.
pjin-nvidia Nov 16, 2025
85a09fe
Ray GPU node-related global config keys. Simplified spinup (WIP).
pjin-nvidia Nov 16, 2025
5c1fe99
Querying ray state to find nodes with available and unused GPUs.
pjin-nvidia Nov 17, 2025
f32957e
Only use explicitly reserved ray GPU nodes if specified.
pjin-nvidia Nov 17, 2025
ef77c4c
Comment. Cleanup.
pjin-nvidia Nov 17, 2025
bbf4631
Cleanup.
pjin-nvidia Nov 17, 2025
531a61d
Type.
pjin-nvidia Nov 17, 2025
f88ec6a
No cover.
pjin-nvidia Nov 17, 2025
d819740
Type.
pjin-nvidia Nov 17, 2025
7640773
Rename reserved => allowed.
pjin-nvidia Nov 17, 2025
70670a2
Rename.
pjin-nvidia Nov 17, 2025
e61253c
VLLMModel local spinup (originally from PR #317).
pjin-nvidia Nov 17, 2025
56b9bfa
VLLM spinup in a Ray worker.
pjin-nvidia Nov 20, 2025
04a97dd
Import.
pjin-nvidia Nov 20, 2025
70ac196
Do not count resources of ray actors in 'DEAD' state (these resources…
pjin-nvidia Nov 20, 2025
3e5c924
Support for specifying non-anonymous Ray namespace.
pjin-nvidia Nov 26, 2025
8bdcec0
Fix for starting nested Ray actors.
pjin-nvidia Nov 26, 2025
17f640f
Merge remote-tracking branch 'origin/main' into pjin/ray-utils
pjin-nvidia Nov 26, 2025
d4b8074
Merge remote-tracking branch 'origin/main' into pjin/ray-utils
pjin-nvidia Dec 1, 2025
8fe389f
Matching the misc infra PR.
pjin-nvidia Dec 1, 2025
613efb4
No cover.
pjin-nvidia Dec 1, 2025
7575eb6
Global scheduling helper to track free GPUs of schedulable ray nodes.
pjin-nvidia Dec 2, 2025
d7e1683
Rename.
pjin-nvidia Dec 2, 2025
f7c1937
Print.
pjin-nvidia Dec 2, 2025
2d37d17
Avoid an unnecessary ray import.
pjin-nvidia Dec 2, 2025
a35f58d
Try to pass the linter.
pjin-nvidia Dec 2, 2025
1b53089
Test.
pjin-nvidia Dec 2, 2025
6327760
Tests.
pjin-nvidia Dec 2, 2025
f5466f9
Fix test.
pjin-nvidia Dec 2, 2025
7a7e952
Fix test.
pjin-nvidia Dec 2, 2025
eab68a0
Unfix test.
pjin-nvidia Dec 2, 2025
d62ab6c
VLLMModel refresh.
pjin-nvidia Dec 2, 2025
7809170
Add vllm_model pyproject.toml (depends on PR #317).
pjin-nvidia Dec 3, 2025
156f039
Unpin vllm version.
pjin-nvidia Dec 3, 2025
21ba79e
Consolidated ray actor env vars setup.
pjin-nvidia Dec 3, 2025
74acc72
Fix.
pjin-nvidia Dec 4, 2025
6c59909
Fix.
pjin-nvidia Dec 4, 2025
b595ce2
Format.
pjin-nvidia Dec 5, 2025
bf0ccfe
Use a scheduling coordination helper.
pjin-nvidia Dec 9, 2025
f655a8c
Merge remote-tracking branch 'origin/main' into pjin/ray-utils
pjin-nvidia Dec 9, 2025
de2dd33
Sync vllm_model pyproject.toml.
pjin-nvidia Dec 9, 2025
7d399f3
This is just a list of node IDs (as of RL commit: 07a71f7b1656adb99f6…
pjin-nvidia Dec 9, 2025
5a8bb0c
Minimum version of vllm >= 0.11.2.
pjin-nvidia Dec 9, 2025
9058505
Fix for recent VLLM.
pjin-nvidia Dec 10, 2025
a97fd1a
Merge remote-tracking branch 'origin/main' into pjin/ray-utils
pjin-nvidia Dec 10, 2025
413c1ff
Simplified node IP address lookup.
pjin-nvidia Dec 11, 2025
1a7d969
Merge remote-tracking branch 'origin/main' into pjin/ray-utils
pjin-nvidia Dec 11, 2025
38e730c
Using ray actor type hints.
pjin-nvidia Dec 11, 2025
472896a
Remove.
pjin-nvidia Dec 11, 2025
fb6091b
Add sglang_model.
pjin-nvidia Dec 12, 2025
e1b3bde
Cherrypick: fix args bugs.
pjin-nvidia Dec 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions nemo_gym/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
GlobalConfigDictParserConfig,
get_global_config_dict,
)
from nemo_gym.ray_utils import (
_NeMoGymRayGPUSchedulingHelperActorProxy,
_start_global_ray_gpu_scheduling_helper,
)
from nemo_gym.server_utils import (
HEAD_SERVER_KEY_NAME,
HeadServer,
Expand Down Expand Up @@ -162,6 +166,7 @@ class ServerInstanceDisplayConfig(BaseModel):
class RunHelper: # pragma: no cover
_head_server: uvicorn.Server
_head_server_thread: Thread
_head_ray_gpu_helper: _NeMoGymRayGPUSchedulingHelperActorProxy

_processes: Dict[str, Popen]
_server_instance_display_configs: List[ServerInstanceDisplayConfig]
Expand All @@ -174,6 +179,8 @@ def start(self, global_config_dict_parser_config: GlobalConfigDictParserConfig)
# Note: This function will modify the global config dict - update `ray_head_node_address`
initialize_ray()

self._head_ray_gpu_helper = _start_global_ray_gpu_scheduling_helper()

# Assume Nemo Gym Run is for a single agent.
escaped_config_dict_yaml_str = shlex.quote(OmegaConf.to_yaml(global_config_dict))

Expand Down
8 changes: 8 additions & 0 deletions nemo_gym/global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
DISALLOWED_PORTS_KEY_NAME = "disallowed_ports"
HEAD_SERVER_DEPS_KEY_NAME = "head_server_deps"
PYTHON_VERSION_KEY_NAME = "python_version"
RAY_HEAD_NODE_ADDRESS_KEY_NAME = "ray_head_node_address"
RAY_NAMESPACE_KEY_NAME = "ray_namespace"
RAY_GPU_NODES_KEY_NAME = "ray_gpu_nodes"
RAY_NUM_GPUS_PER_NODE_KEY_NAME = "ray_num_gpus_per_node"
USE_ABSOLUTE_IP = "use_absolute_ip"
NEMO_GYM_RESERVED_TOP_LEVEL_KEYS = [
CONFIG_PATHS_KEY_NAME,
Expand All @@ -54,6 +58,10 @@
DISALLOWED_PORTS_KEY_NAME,
HEAD_SERVER_DEPS_KEY_NAME,
PYTHON_VERSION_KEY_NAME,
RAY_HEAD_NODE_ADDRESS_KEY_NAME,
RAY_NAMESPACE_KEY_NAME,
RAY_GPU_NODES_KEY_NAME,
RAY_NUM_GPUS_PER_NODE_KEY_NAME,
USE_ABSOLUTE_IP,
]

Expand Down
172 changes: 172 additions & 0 deletions nemo_gym/ray_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from collections import defaultdict
from time import sleep
from typing import Dict, Optional

import ray
import ray.util.state
from ray.actor import ActorClass, ActorProxy
from ray.util import get_node_ip_address
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

from nemo_gym.global_config import (
RAY_GPU_NODES_KEY_NAME,
RAY_NUM_GPUS_PER_NODE_KEY_NAME,
get_global_config_dict,
)


class _NeMoGymRayGPUSchedulingHelper: # pragma: no cover
def __init__(self, cfg):
self.cfg = cfg
self.avail_gpus_dict = defaultdict(int)
self.used_gpus_dict = defaultdict(int)

@ray.method
def _post_init(self) -> None:
# If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes
# for scheduling GPU actors.
# Otherwise if value of RAY_GPU_NODES_KEY_NAME is a list, then Gym will only use
# the listed Ray GPU nodes for scheduling GPU actors.
allowed_gpu_nodes = self.cfg.get(RAY_GPU_NODES_KEY_NAME, None)
if allowed_gpu_nodes is not None:
allowed_gpu_nodes = set(allowed_gpu_nodes)

head = self.cfg["ray_head_node_address"]
node_states = ray.util.state.list_nodes(head, detail=True)
for state in node_states:
assert state.node_id is not None
avail_num_gpus = state.resources_total.get("GPU", 0)
if allowed_gpu_nodes is not None and state.node_id not in allowed_gpu_nodes:
continue
self.avail_gpus_dict[state.node_id] += avail_num_gpus

def alloc_gpu_node(self, num_gpus: int) -> Optional[str]:
for node_id, avail_num_gpus in self.avail_gpus_dict.items():
used_num_gpus = self.used_gpus_dict[node_id]
if used_num_gpus + num_gpus <= avail_num_gpus:
self.used_gpus_dict[node_id] += num_gpus
return node_id
return None


_NeMoGymRayGPUSchedulingHelperActor: ActorClass[_NeMoGymRayGPUSchedulingHelper] = ray.remote(
_NeMoGymRayGPUSchedulingHelper
) # pragma: no cover
_NeMoGymRayGPUSchedulingHelperActorProxy = ActorProxy[_NeMoGymRayGPUSchedulingHelper] # pragma: no cover


def _start_global_ray_gpu_scheduling_helper(
node_id: Optional[str] = None,
) -> _NeMoGymRayGPUSchedulingHelperActorProxy: # pragma: no cover
cfg = get_global_config_dict()
helper_options = {
"name": "_NeMoGymRayGPUSchedulingHelper",
"num_cpus": 0,
}
if node_id is not None:
helper_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy(
node_id=node_id,
soft=True,
)
helper = _NeMoGymRayGPUSchedulingHelperActor.options(**helper_options).remote(cfg)
ray.get(helper._post_init.remote())
return helper


def get_global_ray_gpu_scheduling_helper() -> _NeMoGymRayGPUSchedulingHelperActorProxy: # pragma: no cover
cfg = get_global_config_dict()
while True:
try:
get_actor_args = {
"name": "_NeMoGymRayGPUSchedulingHelper",
}
ray_namespace = cfg.get("ray_namespace", None)
if ray_namespace is None:
ray_namespace = "nemo_gym"
get_actor_args["namespace"] = ray_namespace
worker = ray.get_actor(**get_actor_args)
return worker
except ValueError:
sleep(3)


def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: # pragma: no cover
cfg = get_global_config_dict()
head = cfg["ray_head_node_address"]
id_to_ip = {}
node_states = ray.util.state.list_nodes(head)
for state in node_states:
id_to_ip[state.node_id] = state.node_ip
return id_to_ip


def lookup_current_ray_node_id() -> str: # pragma: no cover
return ray.get_runtime_context().get_node_id()


def lookup_current_ray_node_ip() -> str: # pragma: no cover
return get_node_ip_address()


def _prepare_ray_worker_env_vars() -> Dict[str, str]: # pragma: no cover
worker_env_vars = {
**os.environ,
}
pop_env_vars = [
"CUDA_VISIBLE_DEVICES",
"RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES",
"RAY_JOB_ID",
"RAY_RAYLET_PID",
]
for k in pop_env_vars:
worker_env_vars.pop(k, None)
return worker_env_vars


def spinup_single_ray_gpu_node_worker(
worker_cls: ActorClass,
num_gpus: int,
*worker_args,
**worker_kwargs,
) -> ActorProxy: # pragma: no cover
cfg = get_global_config_dict()

num_gpus_per_node = cfg.get(RAY_NUM_GPUS_PER_NODE_KEY_NAME, 8)
assert num_gpus >= 1, f"Must request at least 1 GPU node for spinning up {worker_cls}"
assert num_gpus <= num_gpus_per_node, (
f"Requested {num_gpus} > {num_gpus_per_node} GPU nodes for spinning up {worker_cls}"
)

helper = get_global_ray_gpu_scheduling_helper()
node_id = ray.get(helper.alloc_gpu_node.remote(num_gpus))
if node_id is None:
raise RuntimeError(f"Cannot find an available Ray node with {num_gpus} GPUs to spin up {worker_cls}")

worker_options = {}
worker_options["num_gpus"] = num_gpus
worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy(
node_id=node_id,
soft=False,
)
worker_options["runtime_env"] = {
"py_executable": sys.executable,
"env_vars": _prepare_ray_worker_env_vars(),
}
worker = worker_cls.options(**worker_options).remote(*worker_args, **worker_kwargs)
return worker
6 changes: 6 additions & 0 deletions nemo_gym/server_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ def initialize_ray() -> None:

global_config_dict = get_global_config_dict()
ray_head_node_address = global_config_dict.get("ray_head_node_address")
ray_namespace = global_config_dict.get("ray_namespace", None)
ray_init_kwargs = dict(ignore_reinit_error=True)

if ray_head_node_address:
Expand All @@ -358,6 +359,11 @@ def initialize_ray() -> None:
else:
print("Starting Ray cluster...")

if ray_namespace is None:
ray_namespace = "nemo_gym"
print(f"Ray namespace: {ray_namespace}")
ray_init_kwargs["namespace"] = ray_namespace

ray.init(**ray_init_kwargs)

if not ray_head_node_address:
Expand Down
Empty file.
Loading