From dd953b0d6268be6d4b86017ff2e87b16c3bdf44d Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 13 Nov 2025 15:25:15 -0800 Subject: [PATCH 01/48] Ray utils. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 49 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 nemo_gym/ray_utils.py diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py new file mode 100644 index 000000000..e264b3a5a --- /dev/null +++ b/nemo_gym/ray_utils.py @@ -0,0 +1,49 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import sys +from typing import Optional + +from nemo_gym.server_utils import ( + get_global_config_dict, +) + + +def spinup_single_ray_gpu_node_worker(worker_cls, num_gpus: Optional[int] = None): + from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy + + cfg = get_global_config_dict() + nodes = cfg.get("ray_gpu_nodes", []) + num_gpus_per_node = cfg.get("ray_num_gpus_per_node", 1) + if num_gpus is None: + num_gpus = num_gpus_per_node + for node in nodes: + worker_options = {} + worker_options["num_gpus"] = num_gpus + worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( + node_id=node["node_id"], + soft=False, + ) + py_exec = sys.executable + worker_runtime_env = { + "py_executable": py_exec, + "env_vars": { + **os.environ, + }, + } + worker_options["runtime_env"] = worker_runtime_env + worker = worker_cls.options(**worker_options).remote() + return worker + raise RuntimeError(f"No available Ray GPU nodes for spinning up {worker_cls}") From 5717d7ae7290bfaa8f7448818c8468707db73ed5 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 13 Nov 2025 15:28:24 -0800 Subject: [PATCH 02/48] No cover. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index e264b3a5a..11e8e0cae 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -21,7 +21,7 @@ ) -def spinup_single_ray_gpu_node_worker(worker_cls, num_gpus: Optional[int] = None): +def spinup_single_ray_gpu_node_worker(worker_cls, num_gpus: Optional[int] = None): # pragma: no cover from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy cfg = get_global_config_dict() From a85f4f0c4bb1078d8fb71c41e4dc8c68efeba094 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sat, 15 Nov 2025 16:50:13 -0800 Subject: [PATCH 03/48] WIP. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 11e8e0cae..c1981d23a 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -14,16 +14,21 @@ # limitations under the License. import os import sys -from typing import Optional + +from ray.actor import ActorClass +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from nemo_gym.server_utils import ( get_global_config_dict, ) -def spinup_single_ray_gpu_node_worker(worker_cls, num_gpus: Optional[int] = None): # pragma: no cover - from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy - +def spinup_single_ray_gpu_node_worker( + worker_cls: ActorClass, + num_gpus: int, + *worker_args, + **worker_kwargs, +): # pragma: no cover cfg = get_global_config_dict() nodes = cfg.get("ray_gpu_nodes", []) num_gpus_per_node = cfg.get("ray_num_gpus_per_node", 1) @@ -44,6 +49,6 @@ def spinup_single_ray_gpu_node_worker(worker_cls, num_gpus: Optional[int] = None }, } worker_options["runtime_env"] = worker_runtime_env - worker = worker_cls.options(**worker_options).remote() + worker = worker_cls.options(**worker_options).remote(*worker_args, **worker_kwargs) return worker raise RuntimeError(f"No available Ray GPU nodes for spinning up {worker_cls}") From 85a09fe13506bf942503ec4776a8e0007123e3c3 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sun, 16 Nov 2025 13:17:13 -0800 Subject: [PATCH 04/48] Ray GPU node-related global config keys. Simplified spinup (WIP). Signed-off-by: Peter Jin --- nemo_gym/global_config.py | 4 +++ nemo_gym/ray_utils.py | 55 ++++++++++++++++++++------------------- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/nemo_gym/global_config.py b/nemo_gym/global_config.py index 3528f3dde..1765a0188 100644 --- a/nemo_gym/global_config.py +++ b/nemo_gym/global_config.py @@ -45,6 +45,8 @@ DISALLOWED_PORTS_KEY_NAME = "disallowed_ports" HEAD_SERVER_DEPS_KEY_NAME = "head_server_deps" PYTHON_VERSION_KEY_NAME = "python_version" +RAY_GPU_NODES_KEY_NAME = "ray_gpu_nodes" +RAY_NUM_GPUS_PER_NODE_KEY_NAME = "ray_num_gpus_per_node" NEMO_GYM_RESERVED_TOP_LEVEL_KEYS = [ CONFIG_PATHS_KEY_NAME, ENTRYPOINT_KEY_NAME, @@ -53,6 +55,8 @@ DISALLOWED_PORTS_KEY_NAME, HEAD_SERVER_DEPS_KEY_NAME, PYTHON_VERSION_KEY_NAME, + RAY_GPU_NODES_KEY_NAME, + RAY_NUM_GPUS_PER_NODE_KEY_NAME, ] POLICY_BASE_URL_KEY_NAME = "policy_base_url" diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index c1981d23a..0ddad0c89 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -16,11 +16,15 @@ import sys from ray.actor import ActorClass -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy -from nemo_gym.server_utils import ( - get_global_config_dict, -) + +# from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy + +# from nemo_gym.global_config import ( +# RAY_GPU_NODES_KEY_NAME, +# RAY_NUM_GPUS_PER_NODE_KEY_NAME, +# get_global_config_dict, +# ) def spinup_single_ray_gpu_node_worker( @@ -29,26 +33,23 @@ def spinup_single_ray_gpu_node_worker( *worker_args, **worker_kwargs, ): # pragma: no cover - cfg = get_global_config_dict() - nodes = cfg.get("ray_gpu_nodes", []) - num_gpus_per_node = cfg.get("ray_num_gpus_per_node", 1) - if num_gpus is None: - num_gpus = num_gpus_per_node - for node in nodes: - worker_options = {} - worker_options["num_gpus"] = num_gpus - worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( - node_id=node["node_id"], - soft=False, - ) - py_exec = sys.executable - worker_runtime_env = { - "py_executable": py_exec, - "env_vars": { - **os.environ, - }, - } - worker_options["runtime_env"] = worker_runtime_env - worker = worker_cls.options(**worker_options).remote(*worker_args, **worker_kwargs) - return worker - raise RuntimeError(f"No available Ray GPU nodes for spinning up {worker_cls}") + # cfg = get_global_config_dict() + # nodes = cfg.get(RAY_GPU_NODES_KEY_NAME, []) + # num_gpus_per_node = cfg.get(RAY_NUM_GPUS_PER_NODE_KEY_NAME, 1) + worker_options = {} + worker_options["num_gpus"] = num_gpus + # worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( + # node_id=node["node_id"], + # soft=False, + # ) + py_exec = sys.executable + worker_runtime_env = { + "py_executable": py_exec, + "env_vars": { + **os.environ, + }, + } + worker_options["runtime_env"] = worker_runtime_env + worker = worker_cls.options(**worker_options).remote(*worker_args, **worker_kwargs) + return worker + # raise RuntimeError(f"No available Ray GPU nodes for spinning up {worker_cls}") From 5c1fe99bd6ded84f6003cd50175a021c0456c8fd Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sun, 16 Nov 2025 19:26:23 -0800 Subject: [PATCH 05/48] Querying ray state to find nodes with available and unused GPUs. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 68 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 0ddad0c89..d7c4a3e6a 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -14,17 +14,51 @@ # limitations under the License. import os import sys +from collections import defaultdict +from time import sleep +from typing import Any, Dict, List, Optional +import ray.util.state from ray.actor import ActorClass +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy +from nemo_gym.global_config import ( + RAY_NUM_GPUS_PER_NODE_KEY_NAME, + get_global_config_dict, +) -# from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy -# from nemo_gym.global_config import ( -# RAY_GPU_NODES_KEY_NAME, -# RAY_NUM_GPUS_PER_NODE_KEY_NAME, -# get_global_config_dict, -# ) +def _lookup_node_id_with_free_gpus(num_gpus: int, node_list: Optional[List[Dict[str, Any]]] = None) -> Optional[str]: + cfg = get_global_config_dict() + node_avail_gpu_dict = defaultdict(int) + node_states = ray.util.state.list_nodes( + cfg["ray_head_node_address"], + detail=True, + ) + for state in node_states: + assert state.node_id is not None + node_avail_gpu_dict[state.node_id] += state.resources_total.get("GPU", 0) + while True: + retry = False + node_used_gpu_dict = defaultdict(int) + actor_states = ray.util.state.list_actors( + cfg["ray_head_node_address"], + detail=True, + ) + for state in actor_states: + if state.state == "PENDING_CREATION" or state.node_id is None: + retry = True + break + node_used_gpu_dict[state.node_id] += state.required_resources.get("GPU", 0) + if retry: + sleep(2) + continue + break + for node_id, avail_num_gpus in node_avail_gpu_dict.items(): + used_num_gpus = node_used_gpu_dict[node_id] + if used_num_gpus + num_gpus <= avail_num_gpus: + return node_id + return None def spinup_single_ray_gpu_node_worker( @@ -33,15 +67,22 @@ def spinup_single_ray_gpu_node_worker( *worker_args, **worker_kwargs, ): # pragma: no cover - # cfg = get_global_config_dict() - # nodes = cfg.get(RAY_GPU_NODES_KEY_NAME, []) - # num_gpus_per_node = cfg.get(RAY_NUM_GPUS_PER_NODE_KEY_NAME, 1) + cfg = get_global_config_dict() + # nodes = cfg.get(RAY_GPU_NODES_KEY_NAME, None) + num_gpus_per_node = cfg.get(RAY_NUM_GPUS_PER_NODE_KEY_NAME, 8) + assert num_gpus >= 1, f"Must request at least 1 GPU node for spinning up {worker_cls}" + assert num_gpus <= num_gpus_per_node, ( + f"Requested {num_gpus} > {num_gpus_per_node} GPU nodes for spinning up {worker_cls}" + ) + node_id = _lookup_node_id_with_free_gpus(num_gpus) + if node_id is None: + raise RuntimeError(f"Cannot find {num_gpus} available Ray GPU nodes for spinning up {worker_cls}") worker_options = {} worker_options["num_gpus"] = num_gpus - # worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( - # node_id=node["node_id"], - # soft=False, - # ) + worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( + node_id=node_id, + soft=False, + ) py_exec = sys.executable worker_runtime_env = { "py_executable": py_exec, @@ -52,4 +93,3 @@ def spinup_single_ray_gpu_node_worker( worker_options["runtime_env"] = worker_runtime_env worker = worker_cls.options(**worker_options).remote(*worker_args, **worker_kwargs) return worker - # raise RuntimeError(f"No available Ray GPU nodes for spinning up {worker_cls}") From f32957e111af3d270729889ba1150568a17530fa Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sun, 16 Nov 2025 19:40:16 -0800 Subject: [PATCH 06/48] Only use explicitly reserved ray GPU nodes if specified. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index d7c4a3e6a..7f330a7e6 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -16,19 +16,20 @@ import sys from collections import defaultdict from time import sleep -from typing import Any, Dict, List, Optional +from typing import Optional, Set import ray.util.state from ray.actor import ActorClass from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from nemo_gym.global_config import ( + RAY_GPU_NODES_KEY_NAME, RAY_NUM_GPUS_PER_NODE_KEY_NAME, get_global_config_dict, ) -def _lookup_node_id_with_free_gpus(num_gpus: int, node_list: Optional[List[Dict[str, Any]]] = None) -> Optional[str]: +def _lookup_node_id_with_free_gpus(num_gpus: int, reserved_gpu_nodes: Set[str] = None) -> Optional[str]: cfg = get_global_config_dict() node_avail_gpu_dict = defaultdict(int) node_states = ray.util.state.list_nodes( @@ -37,6 +38,8 @@ def _lookup_node_id_with_free_gpus(num_gpus: int, node_list: Optional[List[Dict[ ) for state in node_states: assert state.node_id is not None + if reserved_gpu_nodes is not None and state.node_id in reserved_gpu_nodes: + continue node_avail_gpu_dict[state.node_id] += state.resources_total.get("GPU", 0) while True: retry = False @@ -68,13 +71,15 @@ def spinup_single_ray_gpu_node_worker( **worker_kwargs, ): # pragma: no cover cfg = get_global_config_dict() - # nodes = cfg.get(RAY_GPU_NODES_KEY_NAME, None) + gpu_nodes = cfg.get(RAY_GPU_NODES_KEY_NAME, None) + if gpu_nodes is not None: + gpu_nodes = set([node["node_id"] for node in gpu_nodes]) num_gpus_per_node = cfg.get(RAY_NUM_GPUS_PER_NODE_KEY_NAME, 8) assert num_gpus >= 1, f"Must request at least 1 GPU node for spinning up {worker_cls}" assert num_gpus <= num_gpus_per_node, ( f"Requested {num_gpus} > {num_gpus_per_node} GPU nodes for spinning up {worker_cls}" ) - node_id = _lookup_node_id_with_free_gpus(num_gpus) + node_id = _lookup_node_id_with_free_gpus(num_gpus, reserved_gpu_nodes=gpu_nodes) if node_id is None: raise RuntimeError(f"Cannot find {num_gpus} available Ray GPU nodes for spinning up {worker_cls}") worker_options = {} From ef77c4c25af29c7b9d7c7e78e777e67e3f5d81c4 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sun, 16 Nov 2025 19:44:02 -0800 Subject: [PATCH 07/48] Comment. Cleanup. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 7f330a7e6..50572eea8 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -71,26 +71,32 @@ def spinup_single_ray_gpu_node_worker( **worker_kwargs, ): # pragma: no cover cfg = get_global_config_dict() + + # If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes. + # Otherwise if value of RAY_GPU_NODES_KEY_NAME is a list, then Gym will only use + # the listed Ray GPU nodes. gpu_nodes = cfg.get(RAY_GPU_NODES_KEY_NAME, None) if gpu_nodes is not None: gpu_nodes = set([node["node_id"] for node in gpu_nodes]) + num_gpus_per_node = cfg.get(RAY_NUM_GPUS_PER_NODE_KEY_NAME, 8) assert num_gpus >= 1, f"Must request at least 1 GPU node for spinning up {worker_cls}" assert num_gpus <= num_gpus_per_node, ( f"Requested {num_gpus} > {num_gpus_per_node} GPU nodes for spinning up {worker_cls}" ) + node_id = _lookup_node_id_with_free_gpus(num_gpus, reserved_gpu_nodes=gpu_nodes) if node_id is None: raise RuntimeError(f"Cannot find {num_gpus} available Ray GPU nodes for spinning up {worker_cls}") + worker_options = {} worker_options["num_gpus"] = num_gpus worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( node_id=node_id, soft=False, ) - py_exec = sys.executable worker_runtime_env = { - "py_executable": py_exec, + "py_executable": sys.executable, "env_vars": { **os.environ, }, From bbf4631311aa907760e54219881adf541358ae0a Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sun, 16 Nov 2025 19:45:47 -0800 Subject: [PATCH 08/48] Cleanup. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 50572eea8..b2d8daac7 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -31,6 +31,7 @@ def _lookup_node_id_with_free_gpus(num_gpus: int, reserved_gpu_nodes: Set[str] = None) -> Optional[str]: cfg = get_global_config_dict() + node_avail_gpu_dict = defaultdict(int) node_states = ray.util.state.list_nodes( cfg["ray_head_node_address"], @@ -41,6 +42,7 @@ def _lookup_node_id_with_free_gpus(num_gpus: int, reserved_gpu_nodes: Set[str] = if reserved_gpu_nodes is not None and state.node_id in reserved_gpu_nodes: continue node_avail_gpu_dict[state.node_id] += state.resources_total.get("GPU", 0) + while True: retry = False node_used_gpu_dict = defaultdict(int) @@ -57,6 +59,7 @@ def _lookup_node_id_with_free_gpus(num_gpus: int, reserved_gpu_nodes: Set[str] = sleep(2) continue break + for node_id, avail_num_gpus in node_avail_gpu_dict.items(): used_num_gpus = node_used_gpu_dict[node_id] if used_num_gpus + num_gpus <= avail_num_gpus: From 531a61da9b4b57a4629753ab5a80794a6f480653 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sun, 16 Nov 2025 19:47:27 -0800 Subject: [PATCH 09/48] Type. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index b2d8daac7..36e48502e 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -19,7 +19,7 @@ from typing import Optional, Set import ray.util.state -from ray.actor import ActorClass +from ray.actor import ActorClass, ActorProxy from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from nemo_gym.global_config import ( @@ -72,7 +72,7 @@ def spinup_single_ray_gpu_node_worker( num_gpus: int, *worker_args, **worker_kwargs, -): # pragma: no cover +) -> ActorProxy: # pragma: no cover cfg = get_global_config_dict() # If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes. From f88ec6a79c05fe2bc761bf45fb98f00983265507 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sun, 16 Nov 2025 19:49:25 -0800 Subject: [PATCH 10/48] No cover. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 36e48502e..78716807c 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -29,7 +29,9 @@ ) -def _lookup_node_id_with_free_gpus(num_gpus: int, reserved_gpu_nodes: Set[str] = None) -> Optional[str]: +def _lookup_node_id_with_free_gpus( + num_gpus: int, reserved_gpu_nodes: Set[str] = None +) -> Optional[str]: # pragma: no cover cfg = get_global_config_dict() node_avail_gpu_dict = defaultdict(int) From d81974065c269a691992a0814c16ba120959bce0 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sun, 16 Nov 2025 19:50:08 -0800 Subject: [PATCH 11/48] Type. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 78716807c..8d9ffc011 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -30,7 +30,7 @@ def _lookup_node_id_with_free_gpus( - num_gpus: int, reserved_gpu_nodes: Set[str] = None + num_gpus: int, reserved_gpu_nodes: Optional[Set[str]] = None ) -> Optional[str]: # pragma: no cover cfg = get_global_config_dict() From 76407739366c2350b52a69089a3e4d890348599a Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sun, 16 Nov 2025 19:55:08 -0800 Subject: [PATCH 12/48] Rename reserved => allowed. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 8d9ffc011..fa1d81a45 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -30,7 +30,7 @@ def _lookup_node_id_with_free_gpus( - num_gpus: int, reserved_gpu_nodes: Optional[Set[str]] = None + num_gpus: int, allowed_gpu_nodes: Optional[Set[str]] = None ) -> Optional[str]: # pragma: no cover cfg = get_global_config_dict() @@ -41,7 +41,7 @@ def _lookup_node_id_with_free_gpus( ) for state in node_states: assert state.node_id is not None - if reserved_gpu_nodes is not None and state.node_id in reserved_gpu_nodes: + if allowed_gpu_nodes is not None and state.node_id not in allowed_gpu_nodes: continue node_avail_gpu_dict[state.node_id] += state.resources_total.get("GPU", 0) @@ -77,9 +77,10 @@ def spinup_single_ray_gpu_node_worker( ) -> ActorProxy: # pragma: no cover cfg = get_global_config_dict() - # If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes. + # If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes + # for scheduling GPU actors. # Otherwise if value of RAY_GPU_NODES_KEY_NAME is a list, then Gym will only use - # the listed Ray GPU nodes. + # the listed Ray GPU nodes for scheduling GPU actors. gpu_nodes = cfg.get(RAY_GPU_NODES_KEY_NAME, None) if gpu_nodes is not None: gpu_nodes = set([node["node_id"] for node in gpu_nodes]) @@ -90,7 +91,7 @@ def spinup_single_ray_gpu_node_worker( f"Requested {num_gpus} > {num_gpus_per_node} GPU nodes for spinning up {worker_cls}" ) - node_id = _lookup_node_id_with_free_gpus(num_gpus, reserved_gpu_nodes=gpu_nodes) + node_id = _lookup_node_id_with_free_gpus(num_gpus, allowed_gpu_nodes=gpu_nodes) if node_id is None: raise RuntimeError(f"Cannot find {num_gpus} available Ray GPU nodes for spinning up {worker_cls}") From 70670a2c6e1a8dd68d9c87f3a35985fc37cef566 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Mon, 17 Nov 2025 10:43:14 -0800 Subject: [PATCH 13/48] Rename. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index fa1d81a45..251c60c78 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -29,7 +29,7 @@ ) -def _lookup_node_id_with_free_gpus( +def _lookup_ray_node_with_free_gpus( num_gpus: int, allowed_gpu_nodes: Optional[Set[str]] = None ) -> Optional[str]: # pragma: no cover cfg = get_global_config_dict() @@ -91,7 +91,7 @@ def spinup_single_ray_gpu_node_worker( f"Requested {num_gpus} > {num_gpus_per_node} GPU nodes for spinning up {worker_cls}" ) - node_id = _lookup_node_id_with_free_gpus(num_gpus, allowed_gpu_nodes=gpu_nodes) + node_id = _lookup_ray_node_with_free_gpus(num_gpus, allowed_gpu_nodes=gpu_nodes) if node_id is None: raise RuntimeError(f"Cannot find {num_gpus} available Ray GPU nodes for spinning up {worker_cls}") From e61253c2e755b94a04cd16a92472ff88acf155a3 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Mon, 17 Nov 2025 13:55:00 -0800 Subject: [PATCH 14/48] VLLMModel local spinup (originally from PR #317). Signed-off-by: Peter Jin --- responses_api_models/vllm_model/app.py | 165 +++++++++++++++++++++++-- 1 file changed, 156 insertions(+), 9 deletions(-) diff --git a/responses_api_models/vllm_model/app.py b/responses_api_models/vllm_model/app.py index b9a61f996..78a52899a 100644 --- a/responses_api_models/vllm_model/app.py +++ b/responses_api_models/vllm_model/app.py @@ -12,9 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import json import re -from time import time -from typing import ClassVar, Dict, List, Optional, Tuple, Union +import urllib +from multiprocessing import Process +from time import sleep, time +from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union from uuid import uuid4 from aiohttp.client_exceptions import ClientResponseError @@ -26,6 +29,7 @@ Body, SimpleResponsesAPIModel, ) +from nemo_gym.global_config import find_open_port from nemo_gym.openai_utils import ( RESPONSES_TO_TRAIN, NeMoGymAsyncOpenAI, @@ -66,23 +70,166 @@ class VLLMModelConfig(BaseResponsesAPIModelConfig): uses_reasoning_parser: bool replace_developer_role_with_system: bool = False + spinup_server: bool = False + server_args: Optional[Dict[str, Any]] = None + + enable_router: bool = False + # router_backend values should be one of "ray" or "mp" (matching the allowed + # values of VLLM --distributed-executor-backend). + router_backend: str = "mp" + router_dp_size: int = 1 + def model_post_init(self, context): if isinstance(self.base_url, str): self.base_url = [self.base_url] return super().model_post_init(context) +def _spinup_vllm_server( + config: VLLMModelConfig, server_host: str, server_port: int, router_dp_rank: Optional[int] +) -> None: + import os + + import uvloop + import vllm.engine.arg_utils + import vllm.entrypoints.openai.api_server + import vllm.entrypoints.openai.cli_args + import vllm.utils + + argv = [] + argv.append("--model") + argv.append(config.model) + argv.append("--host") + argv.append(server_host) + argv.append("--port") + argv.append(f"{server_port}") + argv.append("--distributed-executor-backend") + if config.enable_router: + argv.append(config.router_backend) + else: + argv.append("mp") + for k, v in (config.server_args or {}).items(): + if isinstance(v, bool): + if not v: + arg_key = f"--no-{k.replace('_', '-')}" + else: + arg_key = f"--{k.replace('_', '-')}" + argv.append(arg_key) + else: + arg_key = f"--{k.replace('_', '-')}" + argv.append(arg_key) + argv.append(f"{v}") + + if config.enable_router and config.router_backend == "mp": + tp_size = (config.server_args or {}).get("tensor_parallel_size", 1) + tp_start = router_dp_rank * tp_size + tp_ranks = [] + for tp_rank_offset in range(tp_size): + tp_ranks.append(tp_start + tp_rank_offset) + os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([f"{r}" for r in tp_ranks]) + + server_args = vllm.utils.FlexibleArgumentParser() + server_args = vllm.entrypoints.openai.cli_args.make_arg_parser(server_args) + server_args = server_args.parse_args(argv) + vllm.entrypoints.openai.cli_args.validate_parsed_serve_args(server_args) + + uvloop.run(vllm.entrypoints.openai.api_server.run_server(server_args)) + + +# Use this to query the VLLM servers during spinup without having to start an +# asyncio event loop for the async client. +def _vllm_server_heartbeat(base_url: str): + req_headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + req_body = { + "messages": [ + { + "role": "user", + "content": "hi", + } + ], + "max_tokens": 8, + "temperature": 1.0, + } + req_data = json.dumps(req_body).encode("utf-8") + req_url = f"{base_url}/chat/completions" + req = urllib.request.Request( + req_url, + headers=req_headers, + data=req_data, + ) + with urllib.request.urlopen(req, timeout=5) as out: + out_status = out.status + out_data = out.read() + output = out_data.decode("utf-8") + return { + "_status": out_status, + "output": output, + "except": None, + } + + class VLLMModel(SimpleResponsesAPIModel): config: VLLMModelConfig def model_post_init(self, context): - self._clients = [ - NeMoGymAsyncOpenAI( - base_url=base_url, - api_key=self.config.api_key, - ) - for base_url in self.config.base_url - ] + if self.config.spinup_server: + self._server_urls = [] + self._server_procs = [] + self._clients = [] + + router_dp_size = 1 + if self.config.enable_router: + router_dp_size = max(1, self.config.router_dp_size) + + for router_dp_rank in range(router_dp_size): + # FIXME: this server host is wrong for multi-node via ray. + server_host = "127.0.0.1" + server_port = find_open_port() + server_url = f"http://{server_host}:{server_port}/v1" + + server_proc = Process( + target=_spinup_vllm_server, + args=( + self.config, + server_host, + server_port, + router_dp_rank if self.config.enable_router else None, + ), + daemon=False, + ) + server_proc.start() + + self._server_urls.append(server_url) + self._server_procs.append(server_proc) + self._clients.append( + NeMoGymAsyncOpenAI( + base_url=server_url, + api_key=self.config.api_key, + ) + ) + + for server_url in self._server_urls: + while True: + try: + _vllm_server_heartbeat(server_url) + break + except Exception: + sleep(5) + continue + + else: + self._server_urls = None + self._server_procs = None + self._clients = [ + NeMoGymAsyncOpenAI( + base_url=base_url, + api_key=self.config.api_key, + ) + for base_url in self.config.base_url + ] self._session_id_to_client: Dict[str, NeMoGymAsyncOpenAI] = dict() From 56b9bfa34f83fdd0cbf44917f363ceb53c541f0c Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Wed, 19 Nov 2025 16:11:08 -0800 Subject: [PATCH 15/48] VLLM spinup in a Ray worker. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 14 +++- responses_api_models/vllm_model/app.py | 100 +++++++++++++++---------- 2 files changed, 75 insertions(+), 39 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 251c60c78..1e011b293 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -16,7 +16,7 @@ import sys from collections import defaultdict from time import sleep -from typing import Optional, Set +from typing import Dict, Optional, Set import ray.util.state from ray.actor import ActorClass, ActorProxy @@ -29,6 +29,18 @@ ) +def lookup_current_ray_node_id() -> str: + return ray.runtime_context.get_runtime_context().get_node_id() + + +def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: + id_to_ip = {} + node_states = ray.util.state.list_nodes() + for state in node_states: + id_to_ip[state.node_id] = state.node_ip + return id_to_ip + + def _lookup_ray_node_with_free_gpus( num_gpus: int, allowed_gpu_nodes: Optional[Set[str]] = None ) -> Optional[str]: # pragma: no cover diff --git a/responses_api_models/vllm_model/app.py b/responses_api_models/vllm_model/app.py index 78a52899a..85d404f04 100644 --- a/responses_api_models/vllm_model/app.py +++ b/responses_api_models/vllm_model/app.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json +import os import re import urllib from multiprocessing import Process @@ -20,6 +21,7 @@ from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union from uuid import uuid4 +import ray from aiohttp.client_exceptions import ClientResponseError from fastapi import Request from pydantic import BaseModel, Field @@ -58,6 +60,11 @@ NeMoGymSummary, TokenIDLogProbMixin, ) +from nemo_gym.ray_utils import ( + lookup_current_ray_node_id, + lookup_ray_node_id_to_ip_dict, + spinup_single_ray_gpu_node_worker, +) from nemo_gym.server_utils import SESSION_ID_KEY @@ -74,9 +81,6 @@ class VLLMModelConfig(BaseResponsesAPIModelConfig): server_args: Optional[Dict[str, Any]] = None enable_router: bool = False - # router_backend values should be one of "ray" or "mp" (matching the allowed - # values of VLLM --distributed-executor-backend). - router_backend: str = "mp" router_dp_size: int = 1 def model_post_init(self, context): @@ -85,11 +89,7 @@ def model_post_init(self, context): return super().model_post_init(context) -def _spinup_vllm_server( - config: VLLMModelConfig, server_host: str, server_port: int, router_dp_rank: Optional[int] -) -> None: - import os - +def _spinup_vllm_server(config: VLLMModelConfig, server_host: str, server_port: int, router_dp_rank: int) -> None: import uvloop import vllm.engine.arg_utils import vllm.entrypoints.openai.api_server @@ -104,10 +104,7 @@ def _spinup_vllm_server( argv.append("--port") argv.append(f"{server_port}") argv.append("--distributed-executor-backend") - if config.enable_router: - argv.append(config.router_backend) - else: - argv.append("mp") + argv.append("mp") for k, v in (config.server_args or {}).items(): if isinstance(v, bool): if not v: @@ -120,14 +117,6 @@ def _spinup_vllm_server( argv.append(arg_key) argv.append(f"{v}") - if config.enable_router and config.router_backend == "mp": - tp_size = (config.server_args or {}).get("tensor_parallel_size", 1) - tp_start = router_dp_rank * tp_size - tp_ranks = [] - for tp_rank_offset in range(tp_size): - tp_ranks.append(tp_start + tp_rank_offset) - os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([f"{r}" for r in tp_ranks]) - server_args = vllm.utils.FlexibleArgumentParser() server_args = vllm.entrypoints.openai.cli_args.make_arg_parser(server_args) server_args = server_args.parse_args(argv) @@ -136,6 +125,38 @@ def _spinup_vllm_server( uvloop.run(vllm.entrypoints.openai.api_server.run_server(server_args)) +@ray.remote +class VLLMModelSpinupWorker: + def __init__(self, config: VLLMModelConfig, working_dir: Optional[str], router_dp_rank: int): + self.config = config + self.working_dir = working_dir + self._server_host = "0.0.0.0" + self._server_port = find_open_port() + self._router_dp_rank = router_dp_rank + + if self.working_dir is not None: + os.chdir(self.working_dir) + + server_proc = Process( + target=_spinup_vllm_server, + args=( + self.config, + self._server_host, + self._server_port, + self._router_dp_rank, + ), + daemon=False, + ) + server_proc.start() + self._server_proc = server_proc + + def _get_ip(self) -> int: + return lookup_ray_node_id_to_ip_dict()[lookup_current_ray_node_id()] + + def _get_port(self) -> int: + return self._server_port + + # Use this to query the VLLM servers during spinup without having to start an # asyncio event loop for the async client. def _vllm_server_heartbeat(base_url: str): @@ -175,35 +196,38 @@ class VLLMModel(SimpleResponsesAPIModel): config: VLLMModelConfig def model_post_init(self, context): + working_dir = os.getcwd() + if self.config.spinup_server: self._server_urls = [] - self._server_procs = [] + self._server_workers = [] self._clients = [] + server_tp_size = (self.config.server_args or {}).get("tensor_parallel_size", 1) + server_dp_size = (self.config.server_args or {}).get("data_parallel_size", 1) + + assert server_dp_size == 1 + router_dp_size = 1 if self.config.enable_router: router_dp_size = max(1, self.config.router_dp_size) for router_dp_rank in range(router_dp_size): - # FIXME: this server host is wrong for multi-node via ray. - server_host = "127.0.0.1" - server_port = find_open_port() - server_url = f"http://{server_host}:{server_port}/v1" - - server_proc = Process( - target=_spinup_vllm_server, - args=( - self.config, - server_host, - server_port, - router_dp_rank if self.config.enable_router else None, - ), - daemon=False, + server_worker = spinup_single_ray_gpu_node_worker( + VLLMModelSpinupWorker, + num_gpus=server_tp_size, + config=self.config, + working_dir=working_dir, + router_dp_rank=router_dp_rank, ) - server_proc.start() + + server_ip = ray.get(server_worker._get_ip.remote()) + server_port = ray.get(server_worker._get_port.remote()) + server_url = f"http://{server_ip}:{server_port}/v1" self._server_urls.append(server_url) - self._server_procs.append(server_proc) + self._server_workers.append(server_worker) + self._clients.append( NeMoGymAsyncOpenAI( base_url=server_url, @@ -222,7 +246,7 @@ def model_post_init(self, context): else: self._server_urls = None - self._server_procs = None + self._server_workers = None self._clients = [ NeMoGymAsyncOpenAI( base_url=base_url, From 04a97dd08c0ae7cea2702039470bc28bb14c9620 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 20 Nov 2025 10:55:39 -0800 Subject: [PATCH 16/48] Import. Signed-off-by: Peter Jin --- nemo_gym/cli.py | 16 +++++++++++----- nemo_gym/ray_utils.py | 1 + 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/nemo_gym/cli.py b/nemo_gym/cli.py index a433daabd..80b0550f5 100644 --- a/nemo_gym/cli.py +++ b/nemo_gym/cli.py @@ -24,7 +24,7 @@ from subprocess import Popen from threading import Thread from time import sleep -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple import rich import uvicorn @@ -238,11 +238,17 @@ def wait_for_spinup(self) -> None: self.poll() statuses = self.check_http_server_statuses() - num_spun_up = statuses.count("success") + num_spun_up = 0 + waiting = [] + for name, status in statuses: + if status == "success": + num_spun_up += 1 + else: + waiting.append(name) if len(statuses) != num_spun_up: print( f"""{num_spun_up} / {len(statuses)} servers ready ({statuses.count("timeout")} timed out, {statuses.count("connection_error")} connection errored, {statuses.count("unknown_error")} had unknown errors). -Waiting for servers to spin up. Sleeping {sleep_interval}s...""" +Waiting for servers {waiting} to spin up. Sleeping {sleep_interval}s...""" ) else: print(f"All {num_spun_up} / {len(statuses)} servers ready! Polling every 60s") @@ -284,7 +290,7 @@ async def sleep(): finally: self.shutdown() - def check_http_server_statuses(self) -> List[ServerStatus]: + def check_http_server_statuses(self) -> List[Tuple[str, ServerStatus]]: print( "Checking for HTTP server statuses (you should see some HTTP requests to `/` that may 404. This is expected.)" ) @@ -292,7 +298,7 @@ def check_http_server_statuses(self) -> List[ServerStatus]: for server_instance_display_config in self._server_instance_display_configs: name = server_instance_display_config.config_path status = self._server_client.poll_for_status(name) - statuses.append(status) + statuses.append((name, status)) return statuses diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 1e011b293..78f43ccec 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -18,6 +18,7 @@ from time import sleep from typing import Dict, Optional, Set +import ray import ray.util.state from ray.actor import ActorClass, ActorProxy from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy From 70ac196c4483a1cb6b94c2970783a215853e67d4 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 20 Nov 2025 10:57:13 -0800 Subject: [PATCH 17/48] Do not count resources of ray actors in 'DEAD' state (these resources were presumably released). Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 78f43ccec..08a6b9b4a 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -66,6 +66,8 @@ def _lookup_ray_node_with_free_gpus( detail=True, ) for state in actor_states: + if state.state == "DEAD": + continue if state.state == "PENDING_CREATION" or state.node_id is None: retry = True break From 3e5c924a34bd02bc0d46e09c8696a165edd55c8c Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Wed, 26 Nov 2025 13:33:51 -0800 Subject: [PATCH 18/48] Support for specifying non-anonymous Ray namespace. Signed-off-by: Peter Jin --- nemo_gym/server_utils.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nemo_gym/server_utils.py b/nemo_gym/server_utils.py index 88466daef..e8d2f11bb 100644 --- a/nemo_gym/server_utils.py +++ b/nemo_gym/server_utils.py @@ -350,6 +350,7 @@ def initialize_ray() -> None: global_config_dict = get_global_config_dict() ray_head_node_address = global_config_dict.get("ray_head_node_address") + ray_namespace = global_config_dict.get("ray_namespace", None) ray_init_kwargs = dict(ignore_reinit_error=True) if ray_head_node_address: @@ -358,6 +359,10 @@ def initialize_ray() -> None: else: print("Starting Ray cluster...") + if ray_namespace is not None: + print(f"Ray namespace: {ray_namespace}") + ray_init_kwargs["namespace"] = ray_namespace + ray.init(**ray_init_kwargs) if not ray_head_node_address: From 8bdcec09da379718691843c6e7253d788ec52cbb Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Wed, 26 Nov 2025 13:47:02 -0800 Subject: [PATCH 19/48] Fix for starting nested Ray actors. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 08a6b9b4a..59f03ad0d 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -31,12 +31,14 @@ def lookup_current_ray_node_id() -> str: - return ray.runtime_context.get_runtime_context().get_node_id() + return ray.get_runtime_context().get_node_id() def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: + cfg = get_global_config_dict() + head = cfg["ray_head_node_address"] id_to_ip = {} - node_states = ray.util.state.list_nodes() + node_states = ray.util.state.list_nodes(head) for state in node_states: id_to_ip[state.node_id] = state.node_ip return id_to_ip @@ -46,12 +48,10 @@ def _lookup_ray_node_with_free_gpus( num_gpus: int, allowed_gpu_nodes: Optional[Set[str]] = None ) -> Optional[str]: # pragma: no cover cfg = get_global_config_dict() + head = cfg["ray_head_node_address"] node_avail_gpu_dict = defaultdict(int) - node_states = ray.util.state.list_nodes( - cfg["ray_head_node_address"], - detail=True, - ) + node_states = ray.util.state.list_nodes(head, detail=True) for state in node_states: assert state.node_id is not None if allowed_gpu_nodes is not None and state.node_id not in allowed_gpu_nodes: @@ -61,10 +61,7 @@ def _lookup_ray_node_with_free_gpus( while True: retry = False node_used_gpu_dict = defaultdict(int) - actor_states = ray.util.state.list_actors( - cfg["ray_head_node_address"], - detail=True, - ) + actor_states = ray.util.state.list_actors(head, detail=True) for state in actor_states: if state.state == "DEAD": continue @@ -116,11 +113,20 @@ def spinup_single_ray_gpu_node_worker( node_id=node_id, soft=False, ) + worker_env_vars = { + **os.environ, + } + pop_env_vars = [ + "CUDA_VISIBLE_DEVICES", + "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES", + "RAY_JOB_ID", + "RAY_RAYLET_PID", + ] + for k in pop_env_vars: + worker_env_vars.pop(k, None) worker_runtime_env = { "py_executable": sys.executable, - "env_vars": { - **os.environ, - }, + "env_vars": worker_env_vars, } worker_options["runtime_env"] = worker_runtime_env worker = worker_cls.options(**worker_options).remote(*worker_args, **worker_kwargs) From 8fe389f44d587df142f1da750a5b38b2c308693a Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Mon, 1 Dec 2025 15:11:29 -0800 Subject: [PATCH 20/48] Matching the misc infra PR. Signed-off-by: Peter Jin --- nemo_gym/cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nemo_gym/cli.py b/nemo_gym/cli.py index 977d42607..c9c77e338 100644 --- a/nemo_gym/cli.py +++ b/nemo_gym/cli.py @@ -253,7 +253,8 @@ def wait_for_spinup(self) -> None: if len(statuses) != num_spun_up: print( f"""{num_spun_up} / {len(statuses)} servers ready ({statuses.count("timeout")} timed out, {statuses.count("connection_error")} connection errored, {statuses.count("unknown_error")} had unknown errors). -Waiting for servers {waiting} to spin up. Sleeping {sleep_interval}s...""" +Waiting for servers to spin up: {waiting} +Sleeping {sleep_interval}s...""" ) else: print(f"All {num_spun_up} / {len(statuses)} servers ready! Polling every 60s") From 613efb44944f3556a96a15af106bb25de7d3773c Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Mon, 1 Dec 2025 15:38:40 -0800 Subject: [PATCH 21/48] No cover. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 59f03ad0d..67e153a9a 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -30,11 +30,11 @@ ) -def lookup_current_ray_node_id() -> str: +def lookup_current_ray_node_id() -> str: # pragma: no cover return ray.get_runtime_context().get_node_id() -def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: +def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: # pragma: no cover cfg = get_global_config_dict() head = cfg["ray_head_node_address"] id_to_ip = {} @@ -44,7 +44,7 @@ def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: return id_to_ip -def _lookup_ray_node_with_free_gpus( +def lookup_ray_node_with_free_gpus( num_gpus: int, allowed_gpu_nodes: Optional[Set[str]] = None ) -> Optional[str]: # pragma: no cover cfg = get_global_config_dict() @@ -103,7 +103,7 @@ def spinup_single_ray_gpu_node_worker( f"Requested {num_gpus} > {num_gpus_per_node} GPU nodes for spinning up {worker_cls}" ) - node_id = _lookup_ray_node_with_free_gpus(num_gpus, allowed_gpu_nodes=gpu_nodes) + node_id = lookup_ray_node_with_free_gpus(num_gpus, allowed_gpu_nodes=gpu_nodes) if node_id is None: raise RuntimeError(f"Cannot find {num_gpus} available Ray GPU nodes for spinning up {worker_cls}") From 7575eb6230f5bfbf6dc3c024ad576be6944d0b94 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 13:45:49 -0800 Subject: [PATCH 22/48] Global scheduling helper to track free GPUs of schedulable ray nodes. Signed-off-by: Peter Jin --- nemo_gym/cli.py | 7 +++- nemo_gym/ray_utils.py | 77 ++++++++++++++++++++++++++++++++++------ nemo_gym/server_utils.py | 16 +++++---- 3 files changed, 83 insertions(+), 17 deletions(-) diff --git a/nemo_gym/cli.py b/nemo_gym/cli.py index c9c77e338..8788af2d5 100644 --- a/nemo_gym/cli.py +++ b/nemo_gym/cli.py @@ -36,6 +36,7 @@ from devtools import pprint from omegaconf import DictConfig, OmegaConf from pydantic import BaseModel, Field +from ray import ActorProxy from tqdm.auto import tqdm from nemo_gym import PARENT_DIR, __version__ @@ -49,6 +50,7 @@ GlobalConfigDictParserConfig, get_global_config_dict, ) +from nemo_gym.ray_utils import _NeMoGymRayGPUSchedulingHelper from nemo_gym.server_utils import ( HEAD_SERVER_KEY_NAME, HeadServer, @@ -119,6 +121,7 @@ class ServerInstanceDisplayConfig(BaseModel): class RunHelper: # pragma: no cover _head_server: uvicorn.Server _head_server_thread: Thread + _head_ray_gpu_helper: ActorProxy _processes: Dict[str, Popen] _server_instance_display_configs: List[ServerInstanceDisplayConfig] @@ -129,7 +132,9 @@ def start(self, global_config_dict_parser_config: GlobalConfigDictParserConfig) # Initialize Ray cluster in the main process # Note: This function will modify the global config dict - update `ray_head_node_address` - initialize_ray() + init_node_id = initialize_ray() + + self._head_ray_gpu_helper = _NeMoGymRayGPUSchedulingHelper._start_global(init_node_id) # Assume Nemo Gym Run is for a single agent. escaped_config_dict_yaml_str = shlex.quote(OmegaConf.to_yaml(global_config_dict)) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 67e153a9a..f0f59b700 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -30,6 +30,70 @@ ) +def get_global_ray_gpu_scheduling_helper() -> ActorProxy: # pragma: no cover + cfg = get_global_config_dict() + while True: + try: + get_actor_args = { + "name": "_NeMoGymRayGPUSchedulingHelper", + } + ray_namespace = cfg.get("ray_namespace", None) + if ray_namespace is not None: + get_actor_args["namespace"] = ray_namespace + worker = ray.get_actor(**get_actor_args) + except ValueError: + sleep(3) + return worker + + +@ray.remote +class _NeMoGymRayGPUSchedulingHelper: # pragma: no cover + @classmethod + def _start_global(worker_cls, node_id: Optional[str] = None): + worker_options = { + "name": "_NeMoGymRayGPUSchedulingHelper", + "num_cpus": 0, + } + if node_id is not None: + worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( + node_id=node_id, + soft=True, + ) + worker = worker_cls.options(**worker_options).remote() + return worker + + def __init__(self, *args, **kwargs): + self.cfg = get_global_config_dict() + self.avail_gpu_node_dict = defaultdict(int) + self.used_gpu_node_dict = defaultdict(int) + + # If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes + # for scheduling GPU actors. + # Otherwise if value of RAY_GPU_NODES_KEY_NAME is a list, then Gym will only use + # the listed Ray GPU nodes for scheduling GPU actors. + allowed_gpu_nodes = self.cfg.get(RAY_GPU_NODES_KEY_NAME, None) + if allowed_gpu_nodes is not None: + allowed_gpu_nodes = set( + [node["node_id"] if isinstance(node, dict) else node for node in allowed_gpu_nodes] + ) + + head = self.cfg["ray_head_node_address"] + node_states = ray.util.state.list_nodes(head, detail=True) + for state in node_states: + assert state.node_id is not None + if allowed_gpu_nodes is not None and state.node_id not in allowed_gpu_nodes: + continue + self.avail_gpu_node_dict[state.node_id] += state.resources_total.get("GPU", 0) + + def alloc_gpu_node(self, num_gpus: int) -> Optional[str]: + for node_id, avail_num_gpus in self.avail_gpu_node_dict.items(): + used_num_gpus = self.used_gpu_node_dict[node_id] + if used_num_gpus + num_gpus <= avail_num_gpus: + self.used_gpu_node_dict[node_id] += num_gpus + return node_id + return None + + def lookup_current_ray_node_id() -> str: # pragma: no cover return ray.get_runtime_context().get_node_id() @@ -44,7 +108,7 @@ def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: # pragma: no cover return id_to_ip -def lookup_ray_node_with_free_gpus( +def _lookup_ray_node_with_free_gpus( num_gpus: int, allowed_gpu_nodes: Optional[Set[str]] = None ) -> Optional[str]: # pragma: no cover cfg = get_global_config_dict() @@ -89,21 +153,14 @@ def spinup_single_ray_gpu_node_worker( ) -> ActorProxy: # pragma: no cover cfg = get_global_config_dict() - # If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes - # for scheduling GPU actors. - # Otherwise if value of RAY_GPU_NODES_KEY_NAME is a list, then Gym will only use - # the listed Ray GPU nodes for scheduling GPU actors. - gpu_nodes = cfg.get(RAY_GPU_NODES_KEY_NAME, None) - if gpu_nodes is not None: - gpu_nodes = set([node["node_id"] for node in gpu_nodes]) - num_gpus_per_node = cfg.get(RAY_NUM_GPUS_PER_NODE_KEY_NAME, 8) assert num_gpus >= 1, f"Must request at least 1 GPU node for spinning up {worker_cls}" assert num_gpus <= num_gpus_per_node, ( f"Requested {num_gpus} > {num_gpus_per_node} GPU nodes for spinning up {worker_cls}" ) - node_id = lookup_ray_node_with_free_gpus(num_gpus, allowed_gpu_nodes=gpu_nodes) + helper = get_global_ray_gpu_scheduling_helper() + node_id = ray.get(helper.alloc_gpu_node.remote(num_gpus)) if node_id is None: raise RuntimeError(f"Cannot find {num_gpus} available Ray GPU nodes for spinning up {worker_cls}") diff --git a/nemo_gym/server_utils.py b/nemo_gym/server_utils.py index e8d2f11bb..6eef4beba 100644 --- a/nemo_gym/server_utils.py +++ b/nemo_gym/server_utils.py @@ -336,7 +336,7 @@ class UvicornLoggingConfig(BaseModel): uvicorn_logging_show_200_ok: bool = False -def initialize_ray() -> None: +def initialize_ray() -> str: """ Initialize ray cluster in a process. We store the Ray address in the global config dict so that child processes can connect to it. @@ -346,7 +346,8 @@ def initialize_ray() -> None: if ray.is_initialized(): print("Ray already initialized") - return + ray_ctx = ray.get_runtime_context() + return ray_ctx.get_node_id() global_config_dict = get_global_config_dict() ray_head_node_address = global_config_dict.get("ray_head_node_address") @@ -359,16 +360,19 @@ def initialize_ray() -> None: else: print("Starting Ray cluster...") - if ray_namespace is not None: - print(f"Ray namespace: {ray_namespace}") - ray_init_kwargs["namespace"] = ray_namespace + if ray_namespace is None: + ray_namespace = "nemo_gym" + print(f"Ray namespace: {ray_namespace}") + ray_init_kwargs["namespace"] = ray_namespace ray.init(**ray_init_kwargs) + ray_ctx = ray.get_runtime_context() if not ray_head_node_address: with open_dict(global_config_dict): - global_config_dict["ray_head_node_address"] = ray.get_runtime_context().gcs_address + global_config_dict["ray_head_node_address"] = ray_ctx.gcs_address print(f"Started Ray cluster at {global_config_dict['ray_head_node_address']}") + return ray_ctx.get_node_id() class SimpleServer(BaseServer): From d7e16830b4804c13c80e8b9a6602ef19dc9abdf5 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 13:58:31 -0800 Subject: [PATCH 23/48] Rename. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index f0f59b700..ef2ec88c8 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -64,8 +64,8 @@ def _start_global(worker_cls, node_id: Optional[str] = None): def __init__(self, *args, **kwargs): self.cfg = get_global_config_dict() - self.avail_gpu_node_dict = defaultdict(int) - self.used_gpu_node_dict = defaultdict(int) + self.avail_gpus_dict = defaultdict(int) + self.used_gpus_dict = defaultdict(int) # If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes # for scheduling GPU actors. @@ -83,13 +83,13 @@ def __init__(self, *args, **kwargs): assert state.node_id is not None if allowed_gpu_nodes is not None and state.node_id not in allowed_gpu_nodes: continue - self.avail_gpu_node_dict[state.node_id] += state.resources_total.get("GPU", 0) + self.avail_gpus_dict[state.node_id] += state.resources_total.get("GPU", 0) def alloc_gpu_node(self, num_gpus: int) -> Optional[str]: - for node_id, avail_num_gpus in self.avail_gpu_node_dict.items(): - used_num_gpus = self.used_gpu_node_dict[node_id] + for node_id, avail_num_gpus in self.avail_gpus_dict.items(): + used_num_gpus = self.used_gpus_dict[node_id] if used_num_gpus + num_gpus <= avail_num_gpus: - self.used_gpu_node_dict[node_id] += num_gpus + self.used_gpus_dict[node_id] += num_gpus return node_id return None From f7c1937b39fd296f74a0dbb71478e84f633f5c44 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 14:02:43 -0800 Subject: [PATCH 24/48] Print. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index ef2ec88c8..18af1e2c1 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -162,7 +162,7 @@ def spinup_single_ray_gpu_node_worker( helper = get_global_ray_gpu_scheduling_helper() node_id = ray.get(helper.alloc_gpu_node.remote(num_gpus)) if node_id is None: - raise RuntimeError(f"Cannot find {num_gpus} available Ray GPU nodes for spinning up {worker_cls}") + raise RuntimeError(f"Cannot find an available Ray node with {num_gpus} GPUs to spin up {worker_cls}") worker_options = {} worker_options["num_gpus"] = num_gpus From 2d37d17fca16944e43c2129c7f4824da1bb04f4a Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 14:07:47 -0800 Subject: [PATCH 25/48] Avoid an unnecessary ray import. Signed-off-by: Peter Jin --- nemo_gym/cli.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nemo_gym/cli.py b/nemo_gym/cli.py index 8788af2d5..85b77d89a 100644 --- a/nemo_gym/cli.py +++ b/nemo_gym/cli.py @@ -36,7 +36,6 @@ from devtools import pprint from omegaconf import DictConfig, OmegaConf from pydantic import BaseModel, Field -from ray import ActorProxy from tqdm.auto import tqdm from nemo_gym import PARENT_DIR, __version__ @@ -121,7 +120,7 @@ class ServerInstanceDisplayConfig(BaseModel): class RunHelper: # pragma: no cover _head_server: uvicorn.Server _head_server_thread: Thread - _head_ray_gpu_helper: ActorProxy + _head_ray_gpu_helper: "ActorProxy" _processes: Dict[str, Popen] _server_instance_display_configs: List[ServerInstanceDisplayConfig] From a35f58d07e19d95c5daf33e7dafe9d1339334986 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 14:12:29 -0800 Subject: [PATCH 26/48] Try to pass the linter. Signed-off-by: Peter Jin --- nemo_gym/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_gym/cli.py b/nemo_gym/cli.py index 85b77d89a..213490dc1 100644 --- a/nemo_gym/cli.py +++ b/nemo_gym/cli.py @@ -28,7 +28,7 @@ from subprocess import Popen from threading import Thread from time import sleep -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import psutil import rich @@ -120,7 +120,7 @@ class ServerInstanceDisplayConfig(BaseModel): class RunHelper: # pragma: no cover _head_server: uvicorn.Server _head_server_thread: Thread - _head_ray_gpu_helper: "ActorProxy" + _head_ray_gpu_helper: Any _processes: Dict[str, Popen] _server_instance_display_configs: List[ServerInstanceDisplayConfig] From 1b530897ecd7e54e4fc92e0b51aa3f548d15cc40 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 14:15:22 -0800 Subject: [PATCH 27/48] Test. Signed-off-by: Peter Jin --- tests/unit_tests/test_server_utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit_tests/test_server_utils.py b/tests/unit_tests/test_server_utils.py index dfd39da65..7313db752 100644 --- a/tests/unit_tests/test_server_utils.py +++ b/tests/unit_tests/test_server_utils.py @@ -194,7 +194,9 @@ def test_initialize_ray_with_address(self, monkeypatch: MonkeyPatch) -> None: ray_is_initialized_mock.assert_called_once() get_global_config_dict_mock.assert_called_once() - ray_init_mock.assert_called_once_with(address="ray://test-address:10001", ignore_reinit_error=True) + ray_init_mock.assert_called_once_with( + address="ray://test-address:10001", ignore_reinit_error=True, namespace="nemo_gym" + ) def test_initialize_ray_without_address(self, monkeypatch: MonkeyPatch) -> None: ray_is_initialized_mock = self._mock_ray_return_value(monkeypatch, False) @@ -217,5 +219,5 @@ def test_initialize_ray_without_address(self, monkeypatch: MonkeyPatch) -> None: ray_is_initialized_mock.assert_called_once() get_global_config_dict_mock.assert_called_once() - ray_init_mock.assert_called_once_with(ignore_reinit_error=True) + ray_init_mock.assert_called_once_with(ignore_reinit_error=True, namespace="nemo_gym") ray_get_runtime_context_mock.assert_called_once() From 6327760b29eda3d6fe4465c0ca954d174c664de0 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 14:19:56 -0800 Subject: [PATCH 28/48] Tests. Signed-off-by: Peter Jin --- nemo_gym/cli.py | 4 ++-- nemo_gym/server_utils.py | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/nemo_gym/cli.py b/nemo_gym/cli.py index 213490dc1..fa1f09eee 100644 --- a/nemo_gym/cli.py +++ b/nemo_gym/cli.py @@ -131,9 +131,9 @@ def start(self, global_config_dict_parser_config: GlobalConfigDictParserConfig) # Initialize Ray cluster in the main process # Note: This function will modify the global config dict - update `ray_head_node_address` - init_node_id = initialize_ray() + initialize_ray() - self._head_ray_gpu_helper = _NeMoGymRayGPUSchedulingHelper._start_global(init_node_id) + self._head_ray_gpu_helper = _NeMoGymRayGPUSchedulingHelper._start_global() # Assume Nemo Gym Run is for a single agent. escaped_config_dict_yaml_str = shlex.quote(OmegaConf.to_yaml(global_config_dict)) diff --git a/nemo_gym/server_utils.py b/nemo_gym/server_utils.py index 6eef4beba..17f6bf202 100644 --- a/nemo_gym/server_utils.py +++ b/nemo_gym/server_utils.py @@ -336,7 +336,7 @@ class UvicornLoggingConfig(BaseModel): uvicorn_logging_show_200_ok: bool = False -def initialize_ray() -> str: +def initialize_ray() -> None: """ Initialize ray cluster in a process. We store the Ray address in the global config dict so that child processes can connect to it. @@ -346,8 +346,7 @@ def initialize_ray() -> str: if ray.is_initialized(): print("Ray already initialized") - ray_ctx = ray.get_runtime_context() - return ray_ctx.get_node_id() + return global_config_dict = get_global_config_dict() ray_head_node_address = global_config_dict.get("ray_head_node_address") @@ -372,7 +371,7 @@ def initialize_ray() -> str: with open_dict(global_config_dict): global_config_dict["ray_head_node_address"] = ray_ctx.gcs_address print(f"Started Ray cluster at {global_config_dict['ray_head_node_address']}") - return ray_ctx.get_node_id() + return class SimpleServer(BaseServer): From f5466f9fffa6f37f04d794e8f24cef623556be97 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 14:23:55 -0800 Subject: [PATCH 29/48] Fix test. Signed-off-by: Peter Jin --- tests/unit_tests/test_server_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit_tests/test_server_utils.py b/tests/unit_tests/test_server_utils.py index 7313db752..17f7d3338 100644 --- a/tests/unit_tests/test_server_utils.py +++ b/tests/unit_tests/test_server_utils.py @@ -192,7 +192,7 @@ def test_initialize_ray_with_address(self, monkeypatch: MonkeyPatch) -> None: initialize_ray() - ray_is_initialized_mock.assert_called_once() + ray_is_initialized_mock.assert_called() get_global_config_dict_mock.assert_called_once() ray_init_mock.assert_called_once_with( address="ray://test-address:10001", ignore_reinit_error=True, namespace="nemo_gym" From 7a7e952377416b92abc81996ea23de4afec06db6 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 14:27:37 -0800 Subject: [PATCH 30/48] Fix test. Signed-off-by: Peter Jin --- tests/unit_tests/test_server_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit_tests/test_server_utils.py b/tests/unit_tests/test_server_utils.py index 17f7d3338..18bd10aea 100644 --- a/tests/unit_tests/test_server_utils.py +++ b/tests/unit_tests/test_server_utils.py @@ -192,9 +192,9 @@ def test_initialize_ray_with_address(self, monkeypatch: MonkeyPatch) -> None: initialize_ray() - ray_is_initialized_mock.assert_called() get_global_config_dict_mock.assert_called_once() - ray_init_mock.assert_called_once_with( + ray_is_initialized_mock.assert_called() + ray_init_mock.assert_called_with( address="ray://test-address:10001", ignore_reinit_error=True, namespace="nemo_gym" ) From eab68a0ca3b2eee03bc7244def3eed2e046ef2e0 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 14:32:45 -0800 Subject: [PATCH 31/48] Unfix test. Signed-off-by: Peter Jin --- nemo_gym/server_utils.py | 4 +--- tests/unit_tests/test_server_utils.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/nemo_gym/server_utils.py b/nemo_gym/server_utils.py index 17f6bf202..cc67c8e82 100644 --- a/nemo_gym/server_utils.py +++ b/nemo_gym/server_utils.py @@ -366,12 +366,10 @@ def initialize_ray() -> None: ray.init(**ray_init_kwargs) - ray_ctx = ray.get_runtime_context() if not ray_head_node_address: with open_dict(global_config_dict): - global_config_dict["ray_head_node_address"] = ray_ctx.gcs_address + global_config_dict["ray_head_node_address"] = ray.get_runtime_context().gcs_address print(f"Started Ray cluster at {global_config_dict['ray_head_node_address']}") - return class SimpleServer(BaseServer): diff --git a/tests/unit_tests/test_server_utils.py b/tests/unit_tests/test_server_utils.py index 18bd10aea..7313db752 100644 --- a/tests/unit_tests/test_server_utils.py +++ b/tests/unit_tests/test_server_utils.py @@ -192,9 +192,9 @@ def test_initialize_ray_with_address(self, monkeypatch: MonkeyPatch) -> None: initialize_ray() + ray_is_initialized_mock.assert_called_once() get_global_config_dict_mock.assert_called_once() - ray_is_initialized_mock.assert_called() - ray_init_mock.assert_called_with( + ray_init_mock.assert_called_once_with( address="ray://test-address:10001", ignore_reinit_error=True, namespace="nemo_gym" ) From d62ab6c170f05d4acb779e3b4394ed011032b929 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 15:18:38 -0800 Subject: [PATCH 32/48] VLLMModel refresh. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 12 +++++--- responses_api_models/vllm_model/app.py | 40 +++++++++++++------------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 18af1e2c1..d566950b1 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -94,10 +94,6 @@ def alloc_gpu_node(self, num_gpus: int) -> Optional[str]: return None -def lookup_current_ray_node_id() -> str: # pragma: no cover - return ray.get_runtime_context().get_node_id() - - def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: # pragma: no cover cfg = get_global_config_dict() head = cfg["ray_head_node_address"] @@ -108,6 +104,14 @@ def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: # pragma: no cover return id_to_ip +def lookup_current_ray_node_id() -> str: # pragma: no cover + return ray.get_runtime_context().get_node_id() + + +def lookup_current_ray_node_ip() -> str: # pragma: no cover + return lookup_ray_node_id_to_ip_dict()[lookup_current_ray_node_id()] + + def _lookup_ray_node_with_free_gpus( num_gpus: int, allowed_gpu_nodes: Optional[Set[str]] = None ) -> Optional[str]: # pragma: no cover diff --git a/responses_api_models/vllm_model/app.py b/responses_api_models/vllm_model/app.py index 85d404f04..ef92e06e8 100644 --- a/responses_api_models/vllm_model/app.py +++ b/responses_api_models/vllm_model/app.py @@ -61,8 +61,7 @@ TokenIDLogProbMixin, ) from nemo_gym.ray_utils import ( - lookup_current_ray_node_id, - lookup_ray_node_id_to_ip_dict, + lookup_current_ray_node_ip, spinup_single_ray_gpu_node_worker, ) from nemo_gym.server_utils import SESSION_ID_KEY @@ -80,7 +79,6 @@ class VLLMModelConfig(BaseResponsesAPIModelConfig): spinup_server: bool = False server_args: Optional[Dict[str, Any]] = None - enable_router: bool = False router_dp_size: int = 1 def model_post_init(self, context): @@ -89,7 +87,7 @@ def model_post_init(self, context): return super().model_post_init(context) -def _spinup_vllm_server(config: VLLMModelConfig, server_host: str, server_port: int, router_dp_rank: int) -> None: +def _start_vllm_server(config: VLLMModelConfig, server_host: str, server_port: int, router_dp_rank: int) -> None: import uvloop import vllm.engine.arg_utils import vllm.entrypoints.openai.api_server @@ -106,14 +104,17 @@ def _spinup_vllm_server(config: VLLMModelConfig, server_host: str, server_port: argv.append("--distributed-executor-backend") argv.append("mp") for k, v in (config.server_args or {}).items(): - if isinstance(v, bool): + k2 = k.replace("_", "-") + if v is None: + pass + elif isinstance(v, bool): if not v: - arg_key = f"--no-{k.replace('_', '-')}" + arg_key = f"--no-{k2}" else: - arg_key = f"--{k.replace('_', '-')}" + arg_key = f"--{k2}" argv.append(arg_key) else: - arg_key = f"--{k.replace('_', '-')}" + arg_key = f"--{k2}" argv.append(arg_key) argv.append(f"{v}") @@ -126,24 +127,24 @@ def _spinup_vllm_server(config: VLLMModelConfig, server_host: str, server_port: @ray.remote -class VLLMModelSpinupWorker: +class VLLMServerSpinupWorker: def __init__(self, config: VLLMModelConfig, working_dir: Optional[str], router_dp_rank: int): self.config = config self.working_dir = working_dir - self._server_host = "0.0.0.0" + self.router_dp_rank = router_dp_rank + self._server_host = lookup_current_ray_node_ip() self._server_port = find_open_port() - self._router_dp_rank = router_dp_rank if self.working_dir is not None: os.chdir(self.working_dir) server_proc = Process( - target=_spinup_vllm_server, + target=_start_vllm_server, args=( self.config, self._server_host, self._server_port, - self._router_dp_rank, + self.router_dp_rank, ), daemon=False, ) @@ -151,7 +152,7 @@ def __init__(self, config: VLLMModelConfig, working_dir: Optional[str], router_d self._server_proc = server_proc def _get_ip(self) -> int: - return lookup_ray_node_id_to_ip_dict()[lookup_current_ray_node_id()] + return self._server_host def _get_port(self) -> int: return self._server_port @@ -203,19 +204,18 @@ def model_post_init(self, context): self._server_workers = [] self._clients = [] + # TODO: support for other parallel sizes. server_tp_size = (self.config.server_args or {}).get("tensor_parallel_size", 1) server_dp_size = (self.config.server_args or {}).get("data_parallel_size", 1) assert server_dp_size == 1 - router_dp_size = 1 - if self.config.enable_router: - router_dp_size = max(1, self.config.router_dp_size) + router_dp_size = max(1, self.config.router_dp_size) for router_dp_rank in range(router_dp_size): server_worker = spinup_single_ray_gpu_node_worker( - VLLMModelSpinupWorker, - num_gpus=server_tp_size, + VLLMServerSpinupWorker, + server_tp_size, config=self.config, working_dir=working_dir, router_dp_rank=router_dp_rank, @@ -241,7 +241,7 @@ def model_post_init(self, context): _vllm_server_heartbeat(server_url) break except Exception: - sleep(5) + sleep(3) continue else: From 78091709ff2111ad3d76c679964fef5ae9b9b236 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 16:06:36 -0800 Subject: [PATCH 33/48] Add vllm_model pyproject.toml (depends on PR #317). Signed-off-by: Peter Jin --- .../vllm_model/pyproject.toml | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 responses_api_models/vllm_model/pyproject.toml diff --git a/responses_api_models/vllm_model/pyproject.toml b/responses_api_models/vllm_model/pyproject.toml new file mode 100644 index 000000000..44cd9902d --- /dev/null +++ b/responses_api_models/vllm_model/pyproject.toml @@ -0,0 +1,35 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[project] +name = "vllm-model" +version = "0.2.0rc0" +requires-python = ">=3.12" +dependencies = [ + "nemo-gym[dev]", + "vllm==0.10.2", +] + +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools>=61", "setuptools-scm"] + +[tool.setuptools.packages.find] +where = [".."] +include = ["vllm_model"] + +[tool.uv.sources] +nemo-gym = { path = "../..", editable = true } + From 156f039e3d27c03b5c008dba3347690a464f4b07 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 16:21:08 -0800 Subject: [PATCH 34/48] Unpin vllm version. Signed-off-by: Peter Jin --- responses_api_models/vllm_model/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/responses_api_models/vllm_model/pyproject.toml b/responses_api_models/vllm_model/pyproject.toml index 44cd9902d..44452fa27 100644 --- a/responses_api_models/vllm_model/pyproject.toml +++ b/responses_api_models/vllm_model/pyproject.toml @@ -19,7 +19,7 @@ version = "0.2.0rc0" requires-python = ">=3.12" dependencies = [ "nemo-gym[dev]", - "vllm==0.10.2", + "vllm", ] [build-system] From 21ba79e02aa04823539166bad4c45f80f9a5c254 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 2 Dec 2025 16:44:20 -0800 Subject: [PATCH 35/48] Consolidated ray actor env vars setup. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index d566950b1..b2ba0a86e 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -30,6 +30,21 @@ ) +def _prepare_ray_worker_env_vars() -> Dict[str, str]: + worker_env_vars = { + **os.environ, + } + pop_env_vars = [ + "CUDA_VISIBLE_DEVICES", + "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES", + "RAY_JOB_ID", + "RAY_RAYLET_PID", + ] + for k in pop_env_vars: + worker_env_vars.pop(k, None) + return worker_env_vars + + def get_global_ray_gpu_scheduling_helper() -> ActorProxy: # pragma: no cover cfg = get_global_config_dict() while True: @@ -59,6 +74,10 @@ def _start_global(worker_cls, node_id: Optional[str] = None): node_id=node_id, soft=True, ) + worker_options["runtime_env"] = { + "py_executable": sys.executable, + "env_vars": _prepare_ray_worker_env_vars(), + } worker = worker_cls.options(**worker_options).remote() return worker @@ -174,21 +193,9 @@ def spinup_single_ray_gpu_node_worker( node_id=node_id, soft=False, ) - worker_env_vars = { - **os.environ, - } - pop_env_vars = [ - "CUDA_VISIBLE_DEVICES", - "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES", - "RAY_JOB_ID", - "RAY_RAYLET_PID", - ] - for k in pop_env_vars: - worker_env_vars.pop(k, None) - worker_runtime_env = { + worker_options["runtime_env"] = { "py_executable": sys.executable, - "env_vars": worker_env_vars, + "env_vars": _prepare_ray_worker_env_vars(), } - worker_options["runtime_env"] = worker_runtime_env worker = worker_cls.options(**worker_options).remote(*worker_args, **worker_kwargs) return worker From 74acc726e491b32c4ff53ed3c72645e6300ac709 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Wed, 3 Dec 2025 16:16:48 -0800 Subject: [PATCH 36/48] Fix. Signed-off-by: Peter Jin --- nemo_gym/cli.py | 6 ++++-- nemo_gym/ray_utils.py | 40 ++++++++++++++++++++-------------------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/nemo_gym/cli.py b/nemo_gym/cli.py index fa1f09eee..9fa771396 100644 --- a/nemo_gym/cli.py +++ b/nemo_gym/cli.py @@ -49,7 +49,9 @@ GlobalConfigDictParserConfig, get_global_config_dict, ) -from nemo_gym.ray_utils import _NeMoGymRayGPUSchedulingHelper +from nemo_gym.ray_utils import ( + _start_global_ray_gpu_scheduling_helper, +) from nemo_gym.server_utils import ( HEAD_SERVER_KEY_NAME, HeadServer, @@ -133,7 +135,7 @@ def start(self, global_config_dict_parser_config: GlobalConfigDictParserConfig) # Note: This function will modify the global config dict - update `ray_head_node_address` initialize_ray() - self._head_ray_gpu_helper = _NeMoGymRayGPUSchedulingHelper._start_global() + self._head_ray_gpu_helper = _start_global_ray_gpu_scheduling_helper() # Assume Nemo Gym Run is for a single agent. escaped_config_dict_yaml_str = shlex.quote(OmegaConf.to_yaml(global_config_dict)) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index b2ba0a86e..306dd09b6 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -30,7 +30,7 @@ ) -def _prepare_ray_worker_env_vars() -> Dict[str, str]: +def _prepare_ray_worker_env_vars() -> Dict[str, str]: # pragma: no cover worker_env_vars = { **os.environ, } @@ -45,6 +45,24 @@ def _prepare_ray_worker_env_vars() -> Dict[str, str]: return worker_env_vars +def _start_global_ray_gpu_scheduling_helper(node_id: Optional[str] = None): # pragma: no cover + worker_options = { + "name": "_NeMoGymRayGPUSchedulingHelper", + "num_cpus": 0, + } + if node_id is not None: + worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( + node_id=node_id, + soft=True, + ) + worker_options["runtime_env"] = { + "py_executable": sys.executable, + "env_vars": _prepare_ray_worker_env_vars(), + } + worker = _NeMoGymRayGPUSchedulingHelper.options(**worker_options).remote() + return worker + + def get_global_ray_gpu_scheduling_helper() -> ActorProxy: # pragma: no cover cfg = get_global_config_dict() while True: @@ -56,31 +74,13 @@ def get_global_ray_gpu_scheduling_helper() -> ActorProxy: # pragma: no cover if ray_namespace is not None: get_actor_args["namespace"] = ray_namespace worker = ray.get_actor(**get_actor_args) + return worker except ValueError: sleep(3) - return worker @ray.remote class _NeMoGymRayGPUSchedulingHelper: # pragma: no cover - @classmethod - def _start_global(worker_cls, node_id: Optional[str] = None): - worker_options = { - "name": "_NeMoGymRayGPUSchedulingHelper", - "num_cpus": 0, - } - if node_id is not None: - worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( - node_id=node_id, - soft=True, - ) - worker_options["runtime_env"] = { - "py_executable": sys.executable, - "env_vars": _prepare_ray_worker_env_vars(), - } - worker = worker_cls.options(**worker_options).remote() - return worker - def __init__(self, *args, **kwargs): self.cfg = get_global_config_dict() self.avail_gpus_dict = defaultdict(int) From 6c599093e5a8830fe5578eaad48dd2d1b9e66aa3 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 4 Dec 2025 12:22:22 -0800 Subject: [PATCH 37/48] Fix. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 306dd09b6..2a116ae63 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -93,7 +93,7 @@ def __init__(self, *args, **kwargs): allowed_gpu_nodes = self.cfg.get(RAY_GPU_NODES_KEY_NAME, None) if allowed_gpu_nodes is not None: allowed_gpu_nodes = set( - [node["node_id"] if isinstance(node, dict) else node for node in allowed_gpu_nodes] + [node["node_id"] if "node_id" in node else node for node in allowed_gpu_nodes] ) head = self.cfg["ray_head_node_address"] @@ -102,7 +102,8 @@ def __init__(self, *args, **kwargs): assert state.node_id is not None if allowed_gpu_nodes is not None and state.node_id not in allowed_gpu_nodes: continue - self.avail_gpus_dict[state.node_id] += state.resources_total.get("GPU", 0) + avail_num_gpus = state.resources_total.get("GPU", 0) + self.avail_gpus_dict[state.node_id] += avail_num_gpus def alloc_gpu_node(self, num_gpus: int) -> Optional[str]: for node_id, avail_num_gpus in self.avail_gpus_dict.items(): From b595ce2daf90ff156147a650560958f4df8e7a31 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 4 Dec 2025 16:21:57 -0800 Subject: [PATCH 38/48] Format. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 2a116ae63..796479c10 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -92,9 +92,7 @@ def __init__(self, *args, **kwargs): # the listed Ray GPU nodes for scheduling GPU actors. allowed_gpu_nodes = self.cfg.get(RAY_GPU_NODES_KEY_NAME, None) if allowed_gpu_nodes is not None: - allowed_gpu_nodes = set( - [node["node_id"] if "node_id" in node else node for node in allowed_gpu_nodes] - ) + allowed_gpu_nodes = set([node["node_id"] if "node_id" in node else node for node in allowed_gpu_nodes]) head = self.cfg["ray_head_node_address"] node_states = ray.util.state.list_nodes(head, detail=True) From bf0ccfeaac084c08557febe1b342526ed3956c70 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Mon, 8 Dec 2025 17:04:27 -0800 Subject: [PATCH 39/48] Use a scheduling coordination helper. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 67 ++++++++++--------------------------------- 1 file changed, 15 insertions(+), 52 deletions(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 796479c10..186f67e4c 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -16,7 +16,7 @@ import sys from collections import defaultdict from time import sleep -from typing import Dict, Optional, Set +from typing import Dict, Optional import ray import ray.util.state @@ -45,22 +45,20 @@ def _prepare_ray_worker_env_vars() -> Dict[str, str]: # pragma: no cover return worker_env_vars -def _start_global_ray_gpu_scheduling_helper(node_id: Optional[str] = None): # pragma: no cover - worker_options = { +def _start_global_ray_gpu_scheduling_helper(node_id: Optional[str] = None) -> ActorProxy: # pragma: no cover + cfg = get_global_config_dict() + helper_options = { "name": "_NeMoGymRayGPUSchedulingHelper", "num_cpus": 0, } if node_id is not None: - worker_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( + helper_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( node_id=node_id, soft=True, ) - worker_options["runtime_env"] = { - "py_executable": sys.executable, - "env_vars": _prepare_ray_worker_env_vars(), - } - worker = _NeMoGymRayGPUSchedulingHelper.options(**worker_options).remote() - return worker + helper = _NeMoGymRayGPUSchedulingHelper.options(**helper_options).remote(cfg) + ray.get(helper._post_init.remote()) + return helper def get_global_ray_gpu_scheduling_helper() -> ActorProxy: # pragma: no cover @@ -71,8 +69,9 @@ def get_global_ray_gpu_scheduling_helper() -> ActorProxy: # pragma: no cover "name": "_NeMoGymRayGPUSchedulingHelper", } ray_namespace = cfg.get("ray_namespace", None) - if ray_namespace is not None: - get_actor_args["namespace"] = ray_namespace + if ray_namespace is None: + ray_namespace = "nemo_gym" + get_actor_args["namespace"] = ray_namespace worker = ray.get_actor(**get_actor_args) return worker except ValueError: @@ -81,11 +80,12 @@ def get_global_ray_gpu_scheduling_helper() -> ActorProxy: # pragma: no cover @ray.remote class _NeMoGymRayGPUSchedulingHelper: # pragma: no cover - def __init__(self, *args, **kwargs): - self.cfg = get_global_config_dict() + def __init__(self, cfg): + self.cfg = cfg self.avail_gpus_dict = defaultdict(int) self.used_gpus_dict = defaultdict(int) + def _post_init(self) -> None: # If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes # for scheduling GPU actors. # Otherwise if value of RAY_GPU_NODES_KEY_NAME is a list, then Gym will only use @@ -98,9 +98,9 @@ def __init__(self, *args, **kwargs): node_states = ray.util.state.list_nodes(head, detail=True) for state in node_states: assert state.node_id is not None + avail_num_gpus = state.resources_total.get("GPU", 0) if allowed_gpu_nodes is not None and state.node_id not in allowed_gpu_nodes: continue - avail_num_gpus = state.resources_total.get("GPU", 0) self.avail_gpus_dict[state.node_id] += avail_num_gpus def alloc_gpu_node(self, num_gpus: int) -> Optional[str]: @@ -130,43 +130,6 @@ def lookup_current_ray_node_ip() -> str: # pragma: no cover return lookup_ray_node_id_to_ip_dict()[lookup_current_ray_node_id()] -def _lookup_ray_node_with_free_gpus( - num_gpus: int, allowed_gpu_nodes: Optional[Set[str]] = None -) -> Optional[str]: # pragma: no cover - cfg = get_global_config_dict() - head = cfg["ray_head_node_address"] - - node_avail_gpu_dict = defaultdict(int) - node_states = ray.util.state.list_nodes(head, detail=True) - for state in node_states: - assert state.node_id is not None - if allowed_gpu_nodes is not None and state.node_id not in allowed_gpu_nodes: - continue - node_avail_gpu_dict[state.node_id] += state.resources_total.get("GPU", 0) - - while True: - retry = False - node_used_gpu_dict = defaultdict(int) - actor_states = ray.util.state.list_actors(head, detail=True) - for state in actor_states: - if state.state == "DEAD": - continue - if state.state == "PENDING_CREATION" or state.node_id is None: - retry = True - break - node_used_gpu_dict[state.node_id] += state.required_resources.get("GPU", 0) - if retry: - sleep(2) - continue - break - - for node_id, avail_num_gpus in node_avail_gpu_dict.items(): - used_num_gpus = node_used_gpu_dict[node_id] - if used_num_gpus + num_gpus <= avail_num_gpus: - return node_id - return None - - def spinup_single_ray_gpu_node_worker( worker_cls: ActorClass, num_gpus: int, From de2dd338b40d37b740cbbf79ce30656d1e2e5c54 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Mon, 8 Dec 2025 17:12:16 -0800 Subject: [PATCH 40/48] Sync vllm_model pyproject.toml. Signed-off-by: Peter Jin --- nemo_gym/global_config.py | 4 ++++ responses_api_models/vllm_model/pyproject.toml | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/nemo_gym/global_config.py b/nemo_gym/global_config.py index fb0d2c93c..bef3852e5 100644 --- a/nemo_gym/global_config.py +++ b/nemo_gym/global_config.py @@ -45,6 +45,8 @@ DISALLOWED_PORTS_KEY_NAME = "disallowed_ports" HEAD_SERVER_DEPS_KEY_NAME = "head_server_deps" PYTHON_VERSION_KEY_NAME = "python_version" +RAY_HEAD_NODE_ADDRESS_KEY_NAME = "ray_head_node_address" +RAY_NAMESPACE_KEY_NAME = "ray_namespace" RAY_GPU_NODES_KEY_NAME = "ray_gpu_nodes" RAY_NUM_GPUS_PER_NODE_KEY_NAME = "ray_num_gpus_per_node" USE_ABSOLUTE_IP = "use_absolute_ip" @@ -56,6 +58,8 @@ DISALLOWED_PORTS_KEY_NAME, HEAD_SERVER_DEPS_KEY_NAME, PYTHON_VERSION_KEY_NAME, + RAY_HEAD_NODE_ADDRESS_KEY_NAME, + RAY_NAMESPACE_KEY_NAME, RAY_GPU_NODES_KEY_NAME, RAY_NUM_GPUS_PER_NODE_KEY_NAME, USE_ABSOLUTE_IP, diff --git a/responses_api_models/vllm_model/pyproject.toml b/responses_api_models/vllm_model/pyproject.toml index 44452fa27..48d9a54cc 100644 --- a/responses_api_models/vllm_model/pyproject.toml +++ b/responses_api_models/vllm_model/pyproject.toml @@ -32,4 +32,3 @@ include = ["vllm_model"] [tool.uv.sources] nemo-gym = { path = "../..", editable = true } - From 7d399f3174f1f9400353c3e59107388027f8e9af Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Mon, 8 Dec 2025 18:28:25 -0800 Subject: [PATCH 41/48] This is just a list of node IDs (as of RL commit: 07a71f7b1656adb99f64a70fd3405ffdeb7906e4). Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 186f67e4c..ad1b2a10f 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -92,7 +92,7 @@ def _post_init(self) -> None: # the listed Ray GPU nodes for scheduling GPU actors. allowed_gpu_nodes = self.cfg.get(RAY_GPU_NODES_KEY_NAME, None) if allowed_gpu_nodes is not None: - allowed_gpu_nodes = set([node["node_id"] if "node_id" in node else node for node in allowed_gpu_nodes]) + allowed_gpu_nodes = set(allowed_gpu_nodes) head = self.cfg["ray_head_node_address"] node_states = ray.util.state.list_nodes(head, detail=True) From 5a8bb0c40a5943cbd6a3d4f94064e1bf45d7a419 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 9 Dec 2025 10:39:47 -0800 Subject: [PATCH 42/48] Minimum version of vllm >= 0.11.2. Signed-off-by: Peter Jin --- responses_api_models/vllm_model/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/responses_api_models/vllm_model/pyproject.toml b/responses_api_models/vllm_model/pyproject.toml index 48d9a54cc..3459458bd 100644 --- a/responses_api_models/vllm_model/pyproject.toml +++ b/responses_api_models/vllm_model/pyproject.toml @@ -19,7 +19,7 @@ version = "0.2.0rc0" requires-python = ">=3.12" dependencies = [ "nemo-gym[dev]", - "vllm", + "vllm>=0.11.2", ] [build-system] From 9058505a68ceff0dd42c55c6ff19f58def51b099 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Tue, 9 Dec 2025 18:16:35 -0800 Subject: [PATCH 43/48] Fix for recent VLLM. Signed-off-by: Peter Jin --- responses_api_models/vllm_model/app.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/responses_api_models/vllm_model/app.py b/responses_api_models/vllm_model/app.py index cc969baa5..621db1558 100644 --- a/responses_api_models/vllm_model/app.py +++ b/responses_api_models/vllm_model/app.py @@ -92,7 +92,7 @@ def _start_vllm_server(config: VLLMModelConfig, server_host: str, server_port: i import vllm.engine.arg_utils import vllm.entrypoints.openai.api_server import vllm.entrypoints.openai.cli_args - import vllm.utils + import vllm.utils.argparse_utils argv = [] argv.append("--model") @@ -118,7 +118,7 @@ def _start_vllm_server(config: VLLMModelConfig, server_host: str, server_port: i argv.append(arg_key) argv.append(f"{v}") - server_args = vllm.utils.FlexibleArgumentParser() + server_args = vllm.utils.argparse_utils.FlexibleArgumentParser() server_args = vllm.entrypoints.openai.cli_args.make_arg_parser(server_args) server_args = server_args.parse_args(argv) vllm.entrypoints.openai.cli_args.validate_parsed_serve_args(server_args) From 413c1ff2c2ce19d9d0524da3e3be08c08800e48a Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 11 Dec 2025 13:40:02 -0800 Subject: [PATCH 44/48] Simplified node IP address lookup. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index ad1b2a10f..d672c46e5 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -21,6 +21,7 @@ import ray import ray.util.state from ray.actor import ActorClass, ActorProxy +from ray.util import get_node_ip_address from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from nemo_gym.global_config import ( @@ -127,7 +128,7 @@ def lookup_current_ray_node_id() -> str: # pragma: no cover def lookup_current_ray_node_ip() -> str: # pragma: no cover - return lookup_ray_node_id_to_ip_dict()[lookup_current_ray_node_id()] + return get_node_ip_address() def spinup_single_ray_gpu_node_worker( From 38e730cc0cbdbe937a3ef6eb236eb01778ec679d Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 11 Dec 2025 13:51:19 -0800 Subject: [PATCH 45/48] Using ray actor type hints. Signed-off-by: Peter Jin --- nemo_gym/cli.py | 5 +- nemo_gym/ray_utils.py | 105 +++++++++++++++++++++++------------------- 2 files changed, 60 insertions(+), 50 deletions(-) diff --git a/nemo_gym/cli.py b/nemo_gym/cli.py index a87c118e1..c7cf269b6 100644 --- a/nemo_gym/cli.py +++ b/nemo_gym/cli.py @@ -28,7 +28,7 @@ from subprocess import Popen from threading import Thread from time import sleep -from typing import Any, Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple import psutil import rich @@ -50,6 +50,7 @@ get_global_config_dict, ) from nemo_gym.ray_utils import ( + _NeMoGymRayGPUSchedulingHelperActorProxy, _start_global_ray_gpu_scheduling_helper, ) from nemo_gym.server_utils import ( @@ -165,7 +166,7 @@ class ServerInstanceDisplayConfig(BaseModel): class RunHelper: # pragma: no cover _head_server: uvicorn.Server _head_server_thread: Thread - _head_ray_gpu_helper: Any + _head_ray_gpu_helper: _NeMoGymRayGPUSchedulingHelperActorProxy _processes: Dict[str, Popen] _server_instance_display_configs: List[ServerInstanceDisplayConfig] diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index d672c46e5..264bb37b1 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -31,54 +31,6 @@ ) -def _prepare_ray_worker_env_vars() -> Dict[str, str]: # pragma: no cover - worker_env_vars = { - **os.environ, - } - pop_env_vars = [ - "CUDA_VISIBLE_DEVICES", - "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES", - "RAY_JOB_ID", - "RAY_RAYLET_PID", - ] - for k in pop_env_vars: - worker_env_vars.pop(k, None) - return worker_env_vars - - -def _start_global_ray_gpu_scheduling_helper(node_id: Optional[str] = None) -> ActorProxy: # pragma: no cover - cfg = get_global_config_dict() - helper_options = { - "name": "_NeMoGymRayGPUSchedulingHelper", - "num_cpus": 0, - } - if node_id is not None: - helper_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( - node_id=node_id, - soft=True, - ) - helper = _NeMoGymRayGPUSchedulingHelper.options(**helper_options).remote(cfg) - ray.get(helper._post_init.remote()) - return helper - - -def get_global_ray_gpu_scheduling_helper() -> ActorProxy: # pragma: no cover - cfg = get_global_config_dict() - while True: - try: - get_actor_args = { - "name": "_NeMoGymRayGPUSchedulingHelper", - } - ray_namespace = cfg.get("ray_namespace", None) - if ray_namespace is None: - ray_namespace = "nemo_gym" - get_actor_args["namespace"] = ray_namespace - worker = ray.get_actor(**get_actor_args) - return worker - except ValueError: - sleep(3) - - @ray.remote class _NeMoGymRayGPUSchedulingHelper: # pragma: no cover def __init__(self, cfg): @@ -86,6 +38,7 @@ def __init__(self, cfg): self.avail_gpus_dict = defaultdict(int) self.used_gpus_dict = defaultdict(int) + @ray.method def _post_init(self) -> None: # If value of RAY_GPU_NODES_KEY_NAME is None, then Gym will use all Ray GPU nodes # for scheduling GPU actors. @@ -113,6 +66,47 @@ def alloc_gpu_node(self, num_gpus: int) -> Optional[str]: return None +_NeMoGymRayGPUSchedulingHelperActor: ActorClass[_NeMoGymRayGPUSchedulingHelper] = ray.remote( + _NeMoGymRayGPUSchedulingHelper +) # pragma: no cover +_NeMoGymRayGPUSchedulingHelperActorProxy = ActorProxy[_NeMoGymRayGPUSchedulingHelper] # pragma: no cover + + +def _start_global_ray_gpu_scheduling_helper( + node_id: Optional[str] = None, +) -> _NeMoGymRayGPUSchedulingHelperActorProxy: # pragma: no cover + cfg = get_global_config_dict() + helper_options = { + "name": "_NeMoGymRayGPUSchedulingHelper", + "num_cpus": 0, + } + if node_id is not None: + helper_options["scheduling_strategy"] = NodeAffinitySchedulingStrategy( + node_id=node_id, + soft=True, + ) + helper = _NeMoGymRayGPUSchedulingHelperActor.options(**helper_options).remote(cfg) + ray.get(helper._post_init.remote()) + return helper + + +def get_global_ray_gpu_scheduling_helper() -> _NeMoGymRayGPUSchedulingHelperActorProxy: # pragma: no cover + cfg = get_global_config_dict() + while True: + try: + get_actor_args = { + "name": "_NeMoGymRayGPUSchedulingHelper", + } + ray_namespace = cfg.get("ray_namespace", None) + if ray_namespace is None: + ray_namespace = "nemo_gym" + get_actor_args["namespace"] = ray_namespace + worker = ray.get_actor(**get_actor_args) + return worker + except ValueError: + sleep(3) + + def lookup_ray_node_id_to_ip_dict() -> Dict[str, str]: # pragma: no cover cfg = get_global_config_dict() head = cfg["ray_head_node_address"] @@ -131,6 +125,21 @@ def lookup_current_ray_node_ip() -> str: # pragma: no cover return get_node_ip_address() +def _prepare_ray_worker_env_vars() -> Dict[str, str]: # pragma: no cover + worker_env_vars = { + **os.environ, + } + pop_env_vars = [ + "CUDA_VISIBLE_DEVICES", + "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES", + "RAY_JOB_ID", + "RAY_RAYLET_PID", + ] + for k in pop_env_vars: + worker_env_vars.pop(k, None) + return worker_env_vars + + def spinup_single_ray_gpu_node_worker( worker_cls: ActorClass, num_gpus: int, From 472896a257656a446b46ca60a7d9d51ca5d1d1ba Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 11 Dec 2025 14:06:00 -0800 Subject: [PATCH 46/48] Remove. Signed-off-by: Peter Jin --- nemo_gym/ray_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nemo_gym/ray_utils.py b/nemo_gym/ray_utils.py index 264bb37b1..f8fd14193 100644 --- a/nemo_gym/ray_utils.py +++ b/nemo_gym/ray_utils.py @@ -31,7 +31,6 @@ ) -@ray.remote class _NeMoGymRayGPUSchedulingHelper: # pragma: no cover def __init__(self, cfg): self.cfg = cfg From fb6091b95a94aebff18c34f8705d1194a7a86e80 Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Thu, 11 Dec 2025 16:33:35 -0800 Subject: [PATCH 47/48] Add sglang_model. Signed-off-by: Peter Jin --- responses_api_models/sglang_model/README.md | 0 responses_api_models/sglang_model/app.py | 411 ++++++++++++++++++ .../sglang_model/pyproject.toml | 38 ++ 3 files changed, 449 insertions(+) create mode 100644 responses_api_models/sglang_model/README.md create mode 100644 responses_api_models/sglang_model/app.py create mode 100644 responses_api_models/sglang_model/pyproject.toml diff --git a/responses_api_models/sglang_model/README.md b/responses_api_models/sglang_model/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/responses_api_models/sglang_model/app.py b/responses_api_models/sglang_model/app.py new file mode 100644 index 000000000..26efaedc6 --- /dev/null +++ b/responses_api_models/sglang_model/app.py @@ -0,0 +1,411 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +from multiprocessing import Process +from time import sleep, time +from typing import Any, Dict, List, Optional, Union +from uuid import uuid4 + +import ray +from aiohttp.client_exceptions import ClientResponseError +from fastapi import Request + +from nemo_gym.base_responses_api_model import ( + BaseResponsesAPIModelConfig, + Body, + SimpleResponsesAPIModel, +) +from nemo_gym.global_config import find_open_port +from nemo_gym.openai_utils import ( + NeMoGymAsyncOpenAI, + NeMoGymChatCompletion, + NeMoGymChatCompletionCreateParamsNonStreaming, + NeMoGymChatCompletionMessage, + NeMoGymChoice, + NeMoGymResponse, + NeMoGymResponseCreateParamsNonStreaming, +) +from nemo_gym.ray_utils import ( + lookup_current_ray_node_ip, + spinup_single_ray_gpu_node_worker, +) +from nemo_gym.server_utils import SESSION_ID_KEY +from responses_api_models.vllm_model.app import ( + VLLMConverter, + _vllm_server_heartbeat, +) + + +class SGLangModelConfig(BaseResponsesAPIModelConfig): + base_url: Union[str, List[str]] + api_key: str + model: str + return_token_id_information: bool + + uses_reasoning_parser: bool + replace_developer_role_with_system: bool = False + + spinup_server: bool = False + server_args: Optional[Dict[str, Any]] = None + + router_dp_size: int = 1 + + def model_post_init(self, context): + if isinstance(self.base_url, str): + self.base_url = [self.base_url] + return super().model_post_init(context) + + +def _start_sglang_server(config: SGLangModelConfig, server_host: str, server_port: int, router_dp_rank: int) -> None: + import sglang.srt.entrypoints.http_server + import sglang.srt.server_args + + argv = [] + argv.append("--model-path") + argv.append(config.model) + argv.append("--host") + argv.append(server_host) + argv.append("--port") + argv.append(f"{server_port}") + for k, v in (config.server_args or {}).items(): + k2 = k.replace("_", "-") + if v is None: + pass + elif isinstance(v, bool): + if not v: + arg_key = f"--no-{k2}" + else: + arg_key = f"--{k2}" + argv.append(arg_key) + else: + arg_key = f"--{k2}" + argv.append(arg_key) + argv.append(f"{v}") + + server_args = sglang.srt.server_args.prepare_server_args(argv) + sglang.srt.entrypoints.http_server.launch_server(server_args) + + +@ray.remote +class SGLangServerSpinupWorker: + def __init__(self, config: SGLangModelConfig, working_dir: Optional[str], router_dp_rank: int): + self.config = config + self.working_dir = working_dir + self.router_dp_rank = router_dp_rank + self._server_host = lookup_current_ray_node_ip() + self._server_port = find_open_port() + + if self.working_dir is not None: + os.chdir(self.working_dir) + + server_proc = Process( + target=_start_sglang_server, + args=( + self.config, + self._server_host, + self._server_port, + self.router_dp_rank, + ), + daemon=False, + ) + server_proc.start() + self._server_proc = server_proc + + def _get_ip(self) -> int: + return self._server_host + + def _get_port(self) -> int: + return self._server_port + + +class SGLangModel(SimpleResponsesAPIModel): + config: SGLangModelConfig + + def model_post_init(self, context): + working_dir = os.getcwd() + + if self.config.spinup_server: + self._server_urls = [] + self._server_workers = [] + self._clients = [] + + # TODO: support for other parallel sizes. + server_tp_size = (self.config.server_args or {}).get("tensor_parallel_size", 1) + server_dp_size = (self.config.server_args or {}).get("data_parallel_size", 1) + + assert server_dp_size == 1 + + router_dp_size = max(1, self.config.router_dp_size) + + for router_dp_rank in range(router_dp_size): + server_worker = spinup_single_ray_gpu_node_worker( + SGLangServerSpinupWorker, + server_tp_size, + config=self.config, + working_dir=working_dir, + router_dp_rank=router_dp_rank, + ) + + server_ip = ray.get(server_worker._get_ip.remote()) + server_port = ray.get(server_worker._get_port.remote()) + server_url = f"http://{server_ip}:{server_port}/v1" + + self._server_urls.append(server_url) + self._server_workers.append(server_worker) + + self._clients.append( + NeMoGymAsyncOpenAI( + base_url=server_url, + api_key=self.config.api_key, + ) + ) + + for server_url in self._server_urls: + while True: + try: + _vllm_server_heartbeat(server_url) + break + except Exception: + sleep(3) + continue + + else: + self._server_urls = None + self._server_workers = None + self._clients = [ + NeMoGymAsyncOpenAI( + base_url=base_url, + api_key=self.config.api_key, + ) + for base_url in self.config.base_url + ] + + self._session_id_to_client: Dict[str, NeMoGymAsyncOpenAI] = dict() + + # TODO: sglang converter. + self._converter = VLLMConverter( + return_token_id_information=self.config.return_token_id_information, + ) + + return super().model_post_init(context) + + async def responses( + self, request: Request, body: NeMoGymResponseCreateParamsNonStreaming = Body() + ) -> NeMoGymResponse: + # Response Create Params -> Chat Completion Create Params + chat_completion_create_params = self._converter.responses_to_chat_completion_create_params(body) + body.model = self.config.model + + # Chat Completion Create Params -> Chat Completion + chat_completion_response = await self.chat_completions(request, chat_completion_create_params) + + choice = chat_completion_response.choices[0] + + response_output = self._converter.postprocess_chat_response(choice) + response_output_dicts = [item.model_dump() for item in response_output] + + # Chat Completion -> Response + return NeMoGymResponse( + id=f"resp_{uuid4().hex}", + created_at=int(time()), + model=body.model, + object="response", + output=response_output_dicts, + tool_choice=body.tool_choice if "tool_choice" in body else "auto", + parallel_tool_calls=body.parallel_tool_calls, + tools=body.tools, + temperature=body.temperature, + top_p=body.top_p, + background=body.background, + max_output_tokens=body.max_output_tokens, + max_tool_calls=body.max_tool_calls, + previous_response_id=body.previous_response_id, + prompt=body.prompt, + reasoning=body.reasoning, + service_tier=body.service_tier, + text=body.text, + top_logprobs=body.top_logprobs, + truncation=body.truncation, + metadata=body.metadata, + instructions=body.instructions, + user=body.user, + ) + + async def chat_completions( + self, request: Request, body: NeMoGymChatCompletionCreateParamsNonStreaming = Body() + ) -> NeMoGymChatCompletion: + if self.config.replace_developer_role_with_system: + for message in body.messages: + if message["role"] == "developer": + message["role"] = "system" + + body_dict = body.model_dump(exclude_unset=True) + body_dict["model"] = self.config.model + + session_id = request.session[SESSION_ID_KEY] + if session_id not in self._session_id_to_client: + # There is probably a better way to select the endpoint for this request. But this will do for now. + client_idx = len(self._session_id_to_client) % len(self._clients) + client = self._clients[client_idx] + self._session_id_to_client[session_id] = client + client = self._session_id_to_client[session_id] + + create_params = body_dict + + if self.config.return_token_id_information: + create_params |= dict( + logprobs=True, + # Typically passed via OpenAI client extra_body. + return_tokens_as_token_ids=True, + # TODO add this when NeMo RL upgrades to vLLM 0.10.2 support for prompt token ids + # For prompt and generation token IDs + # return_token_ids=True, + # For prompt token IDs + # prompt_logprobs=0, + ) + + if self.config.uses_reasoning_parser: + for message_dict in body_dict["messages"]: + if message_dict.get("role") != "assistant" or "content" not in message_dict: + continue + + content = message_dict["content"] + if isinstance(content, str): + reasoning_matches, remaining_content = self._converter._extract_reasoning_from_content(content) + message_dict["content"] = remaining_content + if reasoning_matches: + message_dict["reasoning_content"] = reasoning_matches[0] + elif isinstance(content, list): + reasoning_content = None + for content_item_dict in content: + reasoning_matches, remaining_content = self._converter._extract_reasoning_from_content( + content_item_dict["text"] + ) + assert reasoning_content is None or not reasoning_matches, ( + f"Found multiple reasoning matches in a single assistant message content item list!\nMessage: {message_dict}" + ) + + # Even though we set the reasoning content already here, we still loop through all the content item dicts for the assert above. + content_item_dict["text"] = remaining_content + if reasoning_matches: + message_dict["reasoning_content"] = reasoning_matches[0] + elif not content: + # No content or content None is a no-op + pass + else: + raise NotImplementedError + + try: + chat_completion_dict = await client.create_chat_completion(**create_params) + except ClientResponseError as e: + """ + Example messages for out of context length: + + 1. https://github.com/vllm-project/vllm/blob/685c99ee77b4818dcdd15b30fe0e0eff0d5d22ec/vllm/entrypoints/openai/serving_engine.py#L914 + ```json + {"object":"error","message":"This model\'s maximum context length is 32768 tokens. However, you requested 32818 tokens in the messages, Please reduce the length of the messages. None","type":"BadRequestError","param":null,"code":400} + ``` + 2. https://github.com/vllm-project/vllm/blob/685c99ee77b4818dcdd15b30fe0e0eff0d5d22ec/vllm/entrypoints/openai/serving_engine.py#L940 + 3. https://github.com/vllm-project/vllm/blob/685c99ee77b4818dcdd15b30fe0e0eff0d5d22ec/vllm/entrypoints/openai/serving_engine.py#L948 + 4. https://github.com/vllm-project/vllm/blob/685c99ee77b4818dcdd15b30fe0e0eff0d5d22ec/vllm/sampling_params.py#L463 + """ + result_content_str = e.response_content.decode() + + is_out_of_context_length = e.status == 400 and ( + "context length" in result_content_str or "max_tokens" in result_content_str + ) + if is_out_of_context_length: + return NeMoGymChatCompletion( + id="chtcmpl-123", + object="chat.completion", + created=int(time()), + model=self.config.model, + choices=[ + NeMoGymChoice( + index=0, + finish_reason="stop", + message=NeMoGymChatCompletionMessage( + role="assistant", + content=None, + tool_calls=None, + ), + ) + ], + ) + else: + raise e + + choice_dict = chat_completion_dict["choices"][0] + if self.config.uses_reasoning_parser: + reasoning_content = choice_dict["message"].get("reasoning_content") + if reasoning_content: + choice_dict["message"].pop("reasoning_content") + + # We wrap this here in think tags for Gym's sake and to return a valid OpenAI Chat Completions response. + choice_dict["message"]["content"] = self._converter._wrap_reasoning_in_think_tags( + [reasoning_content] + ) + (choice_dict["message"]["content"] or "") + else: + assert not choice_dict["message"].get("reasoning_content"), ( + "Please do not use a reasoning parser in vLLM! There is one source of truth for handling data (including reasoning), which is NeMo Gym!" + ) + + if self.config.return_token_id_information: + log_probs = choice_dict["logprobs"]["content"] + generation_log_probs = [log_prob["logprob"] for log_prob in log_probs] + + """ + START TODO remove this when NeMo RL upgrades to vLLM 0.10.2 support for prompt token ids + """ + # Looks like `"token_id:151667"` + generation_token_ids = [log_prob["token"].removeprefix("token_id:") for log_prob in log_probs] + + # The tokenize endpoint doesn't accept any sampling parameters + # The only relevant params are model, messages, and tools. + tokenize_body_dict = dict() + for key in ("model", "messages", "tools"): + if key in body_dict: + tokenize_body_dict[key] = body_dict[key] + + # The base url has /v1 at the end but vLLM's tokenize endpoint does not have v1, hence the .. + # I can't believe the path is resolved correctly LOL + tokenize_response = await client.create_tokenize(**tokenize_body_dict) + """ + END + """ + + message_dict = choice_dict["message"] + message_dict.update( + dict( + # TODO add this when NeMo RL upgrades to vLLM 0.10.2 support for prompt token ids + # prompt_token_ids=chat_completion_dict["prompt_token_ids"], + prompt_token_ids=tokenize_response["tokens"], + # generation_token_ids=choice_dict["token_ids"], + generation_token_ids=generation_token_ids, + generation_log_probs=generation_log_probs, + ) + ) + + # Clean the duplicated information + choice_dict.pop("logprobs") + # TODO add this when NeMo RL upgrades to vLLM 0.10.2 support for prompt token ids + # chat_completion_dict.pop("prompt_token_ids") + # choice_dict.pop("token_ids") + + return NeMoGymChatCompletion.model_validate(chat_completion_dict) + + +if __name__ == "__main__": + SGLangModel.run_webserver() diff --git a/responses_api_models/sglang_model/pyproject.toml b/responses_api_models/sglang_model/pyproject.toml new file mode 100644 index 000000000..6b82c62c2 --- /dev/null +++ b/responses_api_models/sglang_model/pyproject.toml @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[project] +name = "sglang-model" +version = "0.2.0rc0" +requires-python = ">=3.12" +dependencies = [ + "nemo-gym[dev]", + + # We specifically pin the sglang dependency because we have tested on this version. + # Updated Thu Dec 11 2025 with sglang[all]==0.5.2 + # License: Apache 2.0 https://github.com/sgl-project/sglang/blob/62a4a339ebc1b2a9ecf5deac10ebf1de9108bca3/LICENSE + "sglang[all]==0.5.2", +] + +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools>=61", "setuptools-scm"] + +[tool.setuptools.packages.find] +where = [".."] +include = ["sglang_model"] + +[tool.uv.sources] +nemo-gym = { path = "../..", editable = true } From e1b3bde31eebfbc0e4fbb38c2906889c5f97775f Mon Sep 17 00:00:00 2001 From: Peter Jin Date: Sat, 20 Dec 2025 16:01:08 -0800 Subject: [PATCH 48/48] Cherrypick: fix args bugs. Signed-off-by: Jiaqi Zeng Signed-off-by: Peter Jin --- responses_api_models/vllm_model/app.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/responses_api_models/vllm_model/app.py b/responses_api_models/vllm_model/app.py index a8879026e..429495c26 100644 --- a/responses_api_models/vllm_model/app.py +++ b/responses_api_models/vllm_model/app.py @@ -113,6 +113,11 @@ def _start_vllm_server(config: VLLMModelConfig, server_host: str, server_port: i else: arg_key = f"--{k2}" argv.append(arg_key) + elif isinstance(v, dict): + # Dict values must be passed as JSON strings to vLLM CLI + arg_key = f"--{k2}" + argv.append(arg_key) + argv.append(json.dumps(v)) else: arg_key = f"--{k2}" argv.append(arg_key)