From e2bb023cf302669a1098da4b11b28ccfbe49377b Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 23 Dec 2025 10:42:47 -0800 Subject: [PATCH 01/10] feat: Support prefetching of specific envs Signed-off-by: Hemil Desai --- nemo_rl/utils/prefetch_venvs.py | 138 ++++---------- tests/unit/utils/test_prefetch_venvs.py | 240 ++++++++++++++++++++++++ 2 files changed, 279 insertions(+), 99 deletions(-) create mode 100644 tests/unit/utils/test_prefetch_venvs.py diff --git a/nemo_rl/utils/prefetch_venvs.py b/nemo_rl/utils/prefetch_venvs.py index c6e95722c1..910eb59823 100755 --- a/nemo_rl/utils/prefetch_venvs.py +++ b/nemo_rl/utils/prefetch_venvs.py @@ -11,9 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os +import argparse import sys -from pathlib import Path from nemo_rl.distributed.ray_actor_environment_registry import ( ACTOR_ENVIRONMENT_REGISTRY, @@ -21,13 +20,24 @@ from nemo_rl.utils.venvs import create_local_venv -def prefetch_venvs(): - """Prefetch all virtual environments that will be used by workers.""" +def prefetch_venvs(filters=None): + """Prefetch all virtual environments that will be used by workers. + + Args: + filters: List of strings to match against actor FQNs. If provided, only + actors whose FQN contains at least one of the filter strings will + be prefetched. If None, all venvs are prefetched. + """ print("Prefetching virtual environments...") + if filters: + print(f"Filtering for: {filters}") # Group venvs by py_executable to avoid duplicating work venv_configs = {} for actor_fqn, py_executable in ACTOR_ENVIRONMENT_REGISTRY.items(): + # Apply filters if provided + if filters and not any(f in actor_fqn for f in filters): + continue # Skip system python as it doesn't need a venv if py_executable == "python" or py_executable == sys.executable: print(f"Skipping {actor_fqn} (uses system Python)") @@ -54,100 +64,30 @@ def prefetch_venvs(): print("\nVenv prefetching complete!") - # Create convenience python wrapper scripts for frozen environment support (container-only) - create_frozen_environment_symlinks(venv_configs) - - -def create_frozen_environment_symlinks(venv_configs): - """Create python-{ClassName} wrapper scripts in /usr/local/bin for frozen environment support. - - Only runs in container (when NRL_CONTAINER=1 is set). - - Args: - venv_configs: Dictionary mapping py_executable to list of actor FQNs - """ - # Only create wrapper scripts in container - if not os.environ.get("NRL_CONTAINER"): - print( - "\nSkipping frozen environment wrapper script creation (not in container)" - ) - return - - print("\nCreating frozen environment wrapper scripts...") - - # Collect all wrapper mappings: class_name -> venv_path - wrapper_mappings = {} - - for py_executable, actor_fqns in venv_configs.items(): - for actor_fqn in actor_fqns: - # Extract class name from FQN (last part) - # e.g., "nemo_rl.models.policy.megatron_policy_worker.MegatronPolicyWorker" -> "MegatronPolicyWorker" - class_name = actor_fqn.split(".")[-1] - - # Get the venv path that was created - try: - python_path = create_local_venv(py_executable, actor_fqn) - - # Check for collisions - if class_name in wrapper_mappings: - existing_path = wrapper_mappings[class_name] - if existing_path != python_path: - raise RuntimeError( - f"Collision detected: Multiple venvs want to use name '{class_name}'\n" - f" Existing: {existing_path}\n" - f" New: {python_path}\n" - f"This indicates two different worker classes have the same name." - ) - else: - wrapper_mappings[class_name] = python_path - except Exception as e: - print(f" Warning: Could not get venv path for {actor_fqn}: {e}") - continue - - # Create wrapper scripts - wrapper_dir = Path("/usr/local/bin") - created_wrappers = [] - - for class_name, python_path in sorted(wrapper_mappings.items()): - wrapper_name = f"python-{class_name}" - wrapper_path = wrapper_dir / wrapper_name - - # Get the venv directory path (parent of bin/python) - venv_path = Path(python_path).parent.parent - - # Create wrapper script content - wrapper_content = f"""#!/bin/bash -VENV_PATH="{venv_path}" -export VIRTUAL_ENV="$VENV_PATH" -export PATH="$VENV_PATH/bin:$PATH" -exec "$VENV_PATH/bin/python" "$@" -""" - - try: - # Remove existing wrapper if present - if wrapper_path.exists() or wrapper_path.is_symlink(): - wrapper_path.unlink() - - # Write wrapper script - wrapper_path.write_text(wrapper_content) - - # Make executable - wrapper_path.chmod(0o755) - - created_wrappers.append(wrapper_name) - print(f" Created: {wrapper_name} -> {python_path}") - except Exception as e: - print(f" Warning: Could not create wrapper script {wrapper_name}: {e}") - continue - - if created_wrappers: - print(f"\nCreated {len(created_wrappers)} frozen environment wrapper scripts") - print("Users can now use these python executables directly:") - for name in created_wrappers: - print(f" - {name}") - else: - print("\nNo frozen environment wrapper scripts were created") - if __name__ == "__main__": - prefetch_venvs() + parser = argparse.ArgumentParser( + description="Prefetch virtual environments for Ray actors.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Prefetch all venvs + python -m nemo_rl.utils.prefetch_venvs + + # Prefetch only vLLM-related venvs + python -m nemo_rl.utils.prefetch_venvs vllm + + # Prefetch multiple specific venvs + python -m nemo_rl.utils.prefetch_venvs vllm policy environment + """, + ) + parser.add_argument( + "filters", + nargs="*", + help="Filter strings to match against actor FQNs. Only actors whose FQN " + "contains at least one of these strings will be prefetched. " + "If not provided, all venvs are prefetched.", + ) + args = parser.parse_args() + + prefetch_venvs(filters=args.filters if args.filters else None) diff --git a/tests/unit/utils/test_prefetch_venvs.py b/tests/unit/utils/test_prefetch_venvs.py new file mode 100644 index 0000000000..ad46efbe53 --- /dev/null +++ b/tests/unit/utils/test_prefetch_venvs.py @@ -0,0 +1,240 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest.mock import patch + +import pytest + + +@pytest.fixture +def mock_registry(): + """Create a mock registry with various actor types.""" + return { + "nemo_rl.models.generation.vllm.vllm_worker.VllmGenerationWorker": "uv run --group vllm", + "nemo_rl.models.policy.workers.dtensor_policy_worker.DTensorPolicyWorker": "uv run --group vllm", + "nemo_rl.models.policy.workers.megatron_policy_worker.MegatronPolicyWorker": "uv run --group mcore", + "nemo_rl.environments.math_environment.MathEnvironment": "python", + "nemo_rl.environments.code_environment.CodeEnvironment": "python", + } + + +class TestPrefetchVenvs: + """Tests for the prefetch_venvs function.""" + + def test_prefetch_venvs_no_filters(self, mock_registry): + """Test that all uv-based venvs are prefetched when no filters are provided.""" + with ( + patch( + "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry + ), + patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, + ): + mock_create_venv.return_value = "/path/to/venv/bin/python" + + from nemo_rl.utils.prefetch_venvs import prefetch_venvs + + prefetch_venvs(filters=None) + + # Should create venvs for all uv-based actors (3 total) + assert mock_create_venv.call_count == 3 + + # Verify the actors that were called + call_args = [call[0] for call in mock_create_venv.call_args_list] + actor_fqns = [args[1] for args in call_args] + + assert ( + "nemo_rl.models.generation.vllm.vllm_worker.VllmGenerationWorker" + in actor_fqns + ) + assert ( + "nemo_rl.models.policy.workers.dtensor_policy_worker.DTensorPolicyWorker" + in actor_fqns + ) + assert ( + "nemo_rl.models.policy.workers.megatron_policy_worker.MegatronPolicyWorker" + in actor_fqns + ) + + def test_prefetch_venvs_single_filter(self, mock_registry): + """Test filtering with a single filter string.""" + with ( + patch( + "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry + ), + patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, + ): + mock_create_venv.return_value = "/path/to/venv/bin/python" + + from nemo_rl.utils.prefetch_venvs import prefetch_venvs + + prefetch_venvs(filters=["vllm"]) + + # Should only create venvs for actors containing "vllm" (1 actor) + assert mock_create_venv.call_count == 1 + + call_args = mock_create_venv.call_args[0] + assert ( + call_args[1] + == "nemo_rl.models.generation.vllm.vllm_worker.VllmGenerationWorker" + ) + + def test_prefetch_venvs_multiple_filters(self, mock_registry): + """Test filtering with multiple filter strings (OR logic).""" + with ( + patch( + "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry + ), + patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, + ): + mock_create_venv.return_value = "/path/to/venv/bin/python" + + from nemo_rl.utils.prefetch_venvs import prefetch_venvs + + prefetch_venvs(filters=["vllm", "megatron"]) + + # Should create venvs for actors containing "vllm" OR "megatron" (2 actors) + assert mock_create_venv.call_count == 2 + + call_args = [call[0] for call in mock_create_venv.call_args_list] + actor_fqns = [args[1] for args in call_args] + + assert ( + "nemo_rl.models.generation.vllm.vllm_worker.VllmGenerationWorker" + in actor_fqns + ) + assert ( + "nemo_rl.models.policy.workers.megatron_policy_worker.MegatronPolicyWorker" + in actor_fqns + ) + + def test_prefetch_venvs_filter_no_match(self, mock_registry): + """Test that no venvs are created when filter matches nothing.""" + with ( + patch( + "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry + ), + patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, + ): + mock_create_venv.return_value = "/path/to/venv/bin/python" + + from nemo_rl.utils.prefetch_venvs import prefetch_venvs + + prefetch_venvs(filters=["nonexistent"]) + + # Should not create any venvs + assert mock_create_venv.call_count == 0 + + def test_prefetch_venvs_skips_system_python(self, mock_registry): + """Test that system python actors are skipped even if they match filters.""" + with ( + patch( + "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry + ), + patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, + ): + mock_create_venv.return_value = "/path/to/venv/bin/python" + + from nemo_rl.utils.prefetch_venvs import prefetch_venvs + + # Filter for "environment" which matches system python actors + prefetch_venvs(filters=["environment"]) + + # Should not create any venvs since matching actors use system python + assert mock_create_venv.call_count == 0 + + def test_prefetch_venvs_partial_match(self, mock_registry): + """Test that filter matches partial strings within FQN.""" + with ( + patch( + "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry + ), + patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, + ): + mock_create_venv.return_value = "/path/to/venv/bin/python" + + from nemo_rl.utils.prefetch_venvs import prefetch_venvs + + # "policy" should match both dtensor_policy_worker and megatron_policy_worker + prefetch_venvs(filters=["policy"]) + + assert mock_create_venv.call_count == 2 + + call_args = [call[0] for call in mock_create_venv.call_args_list] + actor_fqns = [args[1] for args in call_args] + + assert ( + "nemo_rl.models.policy.workers.dtensor_policy_worker.DTensorPolicyWorker" + in actor_fqns + ) + assert ( + "nemo_rl.models.policy.workers.megatron_policy_worker.MegatronPolicyWorker" + in actor_fqns + ) + + def test_prefetch_venvs_empty_filter_list(self, mock_registry): + """Test that empty filter list is treated as no filtering (falsy).""" + with ( + patch( + "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry + ), + patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, + ): + mock_create_venv.return_value = "/path/to/venv/bin/python" + + from nemo_rl.utils.prefetch_venvs import prefetch_venvs + + # Empty list should be falsy and prefetch all + prefetch_venvs(filters=[]) + + # Should create venvs for all uv-based actors (3 total) + assert mock_create_venv.call_count == 3 + + def test_prefetch_venvs_continues_on_error(self, mock_registry): + """Test that prefetching continues even if one venv creation fails.""" + with ( + patch( + "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry + ), + patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, + ): + # First call raises, subsequent calls succeed + mock_create_venv.side_effect = [ + Exception("Test error"), + "/path/to/venv/bin/python", + "/path/to/venv/bin/python", + ] + + from nemo_rl.utils.prefetch_venvs import prefetch_venvs + + # Should not raise, should continue with other venvs + prefetch_venvs(filters=None) + + # All 3 uv-based actors should have been attempted + assert mock_create_venv.call_count == 3 + + def test_prefetch_venvs_case_sensitive_filter(self, mock_registry): + """Test that filters are case-sensitive.""" + with ( + patch( + "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry + ), + patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, + ): + mock_create_venv.return_value = "/path/to/venv/bin/python" + + from nemo_rl.utils.prefetch_venvs import prefetch_venvs + + # "VLLM" (uppercase) should not match "vllm" (lowercase) + prefetch_venvs(filters=["VLLM"]) + + assert mock_create_venv.call_count == 0 From 4482064606ec7e5c841226da88ff2de24ae35256 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 23 Dec 2025 10:46:07 -0800 Subject: [PATCH 02/10] fix Signed-off-by: Hemil Desai --- nemo_rl/utils/prefetch_venvs.py | 95 +++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/nemo_rl/utils/prefetch_venvs.py b/nemo_rl/utils/prefetch_venvs.py index 910eb59823..e88f1de97c 100755 --- a/nemo_rl/utils/prefetch_venvs.py +++ b/nemo_rl/utils/prefetch_venvs.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import argparse +import os import sys +from pathlib import Path from nemo_rl.distributed.ray_actor_environment_registry import ( ACTOR_ENVIRONMENT_REGISTRY, @@ -63,6 +65,99 @@ def prefetch_venvs(filters=None): continue print("\nVenv prefetching complete!") + # Create convenience python wrapper scripts for frozen environment support (container-only) + create_frozen_environment_symlinks(venv_configs) + + +def create_frozen_environment_symlinks(venv_configs): + """Create python-{ClassName} wrapper scripts in /usr/local/bin for frozen environment support. + + Only runs in container (when NRL_CONTAINER=1 is set). + + Args: + venv_configs: Dictionary mapping py_executable to list of actor FQNs + """ + # Only create wrapper scripts in container + if not os.environ.get("NRL_CONTAINER"): + print( + "\nSkipping frozen environment wrapper script creation (not in container)" + ) + return + + print("\nCreating frozen environment wrapper scripts...") + + # Collect all wrapper mappings: class_name -> venv_path + wrapper_mappings = {} + + for py_executable, actor_fqns in venv_configs.items(): + for actor_fqn in actor_fqns: + # Extract class name from FQN (last part) + # e.g., "nemo_rl.models.policy.megatron_policy_worker.MegatronPolicyWorker" -> "MegatronPolicyWorker" + class_name = actor_fqn.split(".")[-1] + + # Get the venv path that was created + try: + python_path = create_local_venv(py_executable, actor_fqn) + + # Check for collisions + if class_name in wrapper_mappings: + existing_path = wrapper_mappings[class_name] + if existing_path != python_path: + raise RuntimeError( + f"Collision detected: Multiple venvs want to use name '{class_name}'\n" + f" Existing: {existing_path}\n" + f" New: {python_path}\n" + f"This indicates two different worker classes have the same name." + ) + else: + wrapper_mappings[class_name] = python_path + except Exception as e: + print(f" Warning: Could not get venv path for {actor_fqn}: {e}") + continue + + # Create wrapper scripts + wrapper_dir = Path("/usr/local/bin") + created_wrappers = [] + + for class_name, python_path in sorted(wrapper_mappings.items()): + wrapper_name = f"python-{class_name}" + wrapper_path = wrapper_dir / wrapper_name + + # Get the venv directory path (parent of bin/python) + venv_path = Path(python_path).parent.parent + + # Create wrapper script content + wrapper_content = f"""#!/bin/bash +VENV_PATH="{venv_path}" +export VIRTUAL_ENV="$VENV_PATH" +export PATH="$VENV_PATH/bin:$PATH" +exec "$VENV_PATH/bin/python" "$@" +""" + + try: + # Remove existing wrapper if present + if wrapper_path.exists() or wrapper_path.is_symlink(): + wrapper_path.unlink() + + # Write wrapper script + wrapper_path.write_text(wrapper_content) + + # Make executable + wrapper_path.chmod(0o755) + + created_wrappers.append(wrapper_name) + print(f" Created: {wrapper_name} -> {python_path}") + except Exception as e: + print(f" Warning: Could not create wrapper script {wrapper_name}: {e}") + continue + + if created_wrappers: + print(f"\nCreated {len(created_wrappers)} frozen environment wrapper scripts") + print("Users can now use these python executables directly:") + for name in created_wrappers: + print(f" - {name}") + else: + print("\nNo frozen environment wrapper scripts were created") if __name__ == "__main__": From 6359646cae84199c3f3cb9f22280fa80396e33d1 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 23 Dec 2025 10:46:48 -0800 Subject: [PATCH 03/10] fix Signed-off-by: Hemil Desai --- nemo_rl/utils/prefetch_venvs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_rl/utils/prefetch_venvs.py b/nemo_rl/utils/prefetch_venvs.py index e88f1de97c..2fc97a4231 100755 --- a/nemo_rl/utils/prefetch_venvs.py +++ b/nemo_rl/utils/prefetch_venvs.py @@ -65,6 +65,7 @@ def prefetch_venvs(filters=None): continue print("\nVenv prefetching complete!") + # Create convenience python wrapper scripts for frozen environment support (container-only) create_frozen_environment_symlinks(venv_configs) From 718c5337d14f24efb9274ed79905932a0b1de2c6 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 23 Dec 2025 12:17:14 -0800 Subject: [PATCH 04/10] fix Signed-off-by: Hemil Desai --- tests/unit/utils/test_prefetch_venvs.py | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/tests/unit/utils/test_prefetch_venvs.py b/tests/unit/utils/test_prefetch_venvs.py index ad46efbe53..3e4c5122f6 100644 --- a/tests/unit/utils/test_prefetch_venvs.py +++ b/tests/unit/utils/test_prefetch_venvs.py @@ -15,6 +15,8 @@ import pytest +from nemo_rl.utils.prefetch_venvs import prefetch_venvs + @pytest.fixture def mock_registry(): @@ -41,11 +43,8 @@ def test_prefetch_venvs_no_filters(self, mock_registry): ): mock_create_venv.return_value = "/path/to/venv/bin/python" - from nemo_rl.utils.prefetch_venvs import prefetch_venvs - prefetch_venvs(filters=None) - # Should create venvs for all uv-based actors (3 total) assert mock_create_venv.call_count == 3 # Verify the actors that were called @@ -75,8 +74,6 @@ def test_prefetch_venvs_single_filter(self, mock_registry): ): mock_create_venv.return_value = "/path/to/venv/bin/python" - from nemo_rl.utils.prefetch_venvs import prefetch_venvs - prefetch_venvs(filters=["vllm"]) # Should only create venvs for actors containing "vllm" (1 actor) @@ -98,8 +95,6 @@ def test_prefetch_venvs_multiple_filters(self, mock_registry): ): mock_create_venv.return_value = "/path/to/venv/bin/python" - from nemo_rl.utils.prefetch_venvs import prefetch_venvs - prefetch_venvs(filters=["vllm", "megatron"]) # Should create venvs for actors containing "vllm" OR "megatron" (2 actors) @@ -127,8 +122,6 @@ def test_prefetch_venvs_filter_no_match(self, mock_registry): ): mock_create_venv.return_value = "/path/to/venv/bin/python" - from nemo_rl.utils.prefetch_venvs import prefetch_venvs - prefetch_venvs(filters=["nonexistent"]) # Should not create any venvs @@ -144,8 +137,6 @@ def test_prefetch_venvs_skips_system_python(self, mock_registry): ): mock_create_venv.return_value = "/path/to/venv/bin/python" - from nemo_rl.utils.prefetch_venvs import prefetch_venvs - # Filter for "environment" which matches system python actors prefetch_venvs(filters=["environment"]) @@ -162,8 +153,6 @@ def test_prefetch_venvs_partial_match(self, mock_registry): ): mock_create_venv.return_value = "/path/to/venv/bin/python" - from nemo_rl.utils.prefetch_venvs import prefetch_venvs - # "policy" should match both dtensor_policy_worker and megatron_policy_worker prefetch_venvs(filters=["policy"]) @@ -191,8 +180,6 @@ def test_prefetch_venvs_empty_filter_list(self, mock_registry): ): mock_create_venv.return_value = "/path/to/venv/bin/python" - from nemo_rl.utils.prefetch_venvs import prefetch_venvs - # Empty list should be falsy and prefetch all prefetch_venvs(filters=[]) @@ -214,8 +201,6 @@ def test_prefetch_venvs_continues_on_error(self, mock_registry): "/path/to/venv/bin/python", ] - from nemo_rl.utils.prefetch_venvs import prefetch_venvs - # Should not raise, should continue with other venvs prefetch_venvs(filters=None) @@ -232,8 +217,6 @@ def test_prefetch_venvs_case_sensitive_filter(self, mock_registry): ): mock_create_venv.return_value = "/path/to/venv/bin/python" - from nemo_rl.utils.prefetch_venvs import prefetch_venvs - # "VLLM" (uppercase) should not match "vllm" (lowercase) prefetch_venvs(filters=["VLLM"]) From 0120fed323e09f01c714f55c63baa9b09d173763 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 23 Dec 2025 13:19:48 -0800 Subject: [PATCH 05/10] fix Signed-off-by: Hemil Desai --- nemo_rl/utils/prefetch_venvs.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/nemo_rl/utils/prefetch_venvs.py b/nemo_rl/utils/prefetch_venvs.py index 2fc97a4231..b000bd92b8 100755 --- a/nemo_rl/utils/prefetch_venvs.py +++ b/nemo_rl/utils/prefetch_venvs.py @@ -34,15 +34,23 @@ def prefetch_venvs(filters=None): if filters: print(f"Filtering for: {filters}") + # Track statistics for summary + skipped_by_filter = [] + skipped_system_python = [] + prefetched = [] + failed = [] + # Group venvs by py_executable to avoid duplicating work venv_configs = {} for actor_fqn, py_executable in ACTOR_ENVIRONMENT_REGISTRY.items(): # Apply filters if provided if filters and not any(f in actor_fqn for f in filters): + skipped_by_filter.append(actor_fqn) continue # Skip system python as it doesn't need a venv if py_executable == "python" or py_executable == sys.executable: print(f"Skipping {actor_fqn} (uses system Python)") + skipped_system_python.append(actor_fqn) continue # Only create venvs for uv-based executables @@ -59,12 +67,25 @@ def prefetch_venvs(filters=None): try: python_path = create_local_venv(py_executable, actor_fqn) print(f" Success: {python_path}") + prefetched.append(actor_fqn) except Exception as e: print(f" Error: {e}") + failed.append(actor_fqn) # Continue with other venvs even if one fails continue - print("\nVenv prefetching complete!") + # Print summary + print("\n" + "=" * 50) + print("Venv prefetching complete! Summary:") + print("=" * 50) + print(f" Prefetched: {len(prefetched)}") + print(f" Skipped (system Python): {len(skipped_system_python)}") + if filters: + print(f" Skipped (filtered out): {len(skipped_by_filter)}") + if failed: + print(f" Failed: {len(failed)}") + for actor_fqn in failed: + print(f" - {actor_fqn}") # Create convenience python wrapper scripts for frozen environment support (container-only) create_frozen_environment_symlinks(venv_configs) From 7f6a317b800a6338b6f895290d60aaea91508de8 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 23 Dec 2025 13:24:01 -0800 Subject: [PATCH 06/10] fix Signed-off-by: Hemil Desai --- nemo_rl/utils/prefetch_venvs.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/nemo_rl/utils/prefetch_venvs.py b/nemo_rl/utils/prefetch_venvs.py index b000bd92b8..5423198359 100755 --- a/nemo_rl/utils/prefetch_venvs.py +++ b/nemo_rl/utils/prefetch_venvs.py @@ -78,12 +78,18 @@ def prefetch_venvs(filters=None): print("\n" + "=" * 50) print("Venv prefetching complete! Summary:") print("=" * 50) - print(f" Prefetched: {len(prefetched)}") + print(f" Prefetched: {len(prefetched)}") + for actor_fqn in prefetched: + print(f" - {actor_fqn}") print(f" Skipped (system Python): {len(skipped_system_python)}") + for actor_fqn in skipped_system_python: + print(f" - {actor_fqn}") if filters: - print(f" Skipped (filtered out): {len(skipped_by_filter)}") + print(f" Skipped (filtered out): {len(skipped_by_filter)}") + for actor_fqn in skipped_by_filter: + print(f" - {actor_fqn}") if failed: - print(f" Failed: {len(failed)}") + print(f" Failed: {len(failed)}") for actor_fqn in failed: print(f" - {actor_fqn}") From 24ac3d8eee10c207d336fe327aa6d7cf834c8f31 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 23 Dec 2025 14:52:28 -0800 Subject: [PATCH 07/10] fix Signed-off-by: Hemil Desai --- tests/unit/utils/test_prefetch_venvs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/utils/test_prefetch_venvs.py b/tests/unit/utils/test_prefetch_venvs.py index 3e4c5122f6..06766c001d 100644 --- a/tests/unit/utils/test_prefetch_venvs.py +++ b/tests/unit/utils/test_prefetch_venvs.py @@ -45,7 +45,7 @@ def test_prefetch_venvs_no_filters(self, mock_registry): prefetch_venvs(filters=None) - assert mock_create_venv.call_count == 3 + assert mock_create_venv.call_count > 0 # Verify the actors that were called call_args = [call[0] for call in mock_create_venv.call_args_list] From a65a717e48893884b02c9ad83b67ebe6da3e40eb Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 23 Dec 2025 18:26:23 -0800 Subject: [PATCH 08/10] fix Signed-off-by: Hemil Desai --- tests/unit/utils/test_prefetch_venvs.py | 197 +++++++++++++++--------- 1 file changed, 123 insertions(+), 74 deletions(-) diff --git a/tests/unit/utils/test_prefetch_venvs.py b/tests/unit/utils/test_prefetch_venvs.py index 06766c001d..f6b14961d0 100644 --- a/tests/unit/utils/test_prefetch_venvs.py +++ b/tests/unit/utils/test_prefetch_venvs.py @@ -11,11 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import importlib from unittest.mock import patch import pytest -from nemo_rl.utils.prefetch_venvs import prefetch_venvs +import nemo_rl.utils.prefetch_venvs as prefetch_venvs_module @pytest.fixture @@ -30,22 +31,32 @@ def mock_registry(): } +@pytest.fixture +def prefetch_venvs_func(mock_registry): + """Reload the module with mocked registry to ensure patches are applied.""" + with patch.dict( + "nemo_rl.distributed.ray_actor_environment_registry.ACTOR_ENVIRONMENT_REGISTRY", + mock_registry, + clear=True, + ): + # Reload the module so it picks up the patched registry + importlib.reload(prefetch_venvs_module) + yield prefetch_venvs_module.prefetch_venvs + + class TestPrefetchVenvs: """Tests for the prefetch_venvs function.""" - def test_prefetch_venvs_no_filters(self, mock_registry): + def test_prefetch_venvs_no_filters(self, prefetch_venvs_func): """Test that all uv-based venvs are prefetched when no filters are provided.""" - with ( - patch( - "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry - ), - patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, - ): + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: mock_create_venv.return_value = "/path/to/venv/bin/python" - prefetch_venvs(filters=None) + prefetch_venvs_func(filters=None) - assert mock_create_venv.call_count > 0 + assert mock_create_venv.call_count == 3 # Verify the actors that were called call_args = [call[0] for call in mock_create_venv.call_args_list] @@ -64,17 +75,14 @@ def test_prefetch_venvs_no_filters(self, mock_registry): in actor_fqns ) - def test_prefetch_venvs_single_filter(self, mock_registry): + def test_prefetch_venvs_single_filter(self, prefetch_venvs_func): """Test filtering with a single filter string.""" - with ( - patch( - "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry - ), - patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, - ): + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: mock_create_venv.return_value = "/path/to/venv/bin/python" - prefetch_venvs(filters=["vllm"]) + prefetch_venvs_func(filters=["vllm"]) # Should only create venvs for actors containing "vllm" (1 actor) assert mock_create_venv.call_count == 1 @@ -85,17 +93,14 @@ def test_prefetch_venvs_single_filter(self, mock_registry): == "nemo_rl.models.generation.vllm.vllm_worker.VllmGenerationWorker" ) - def test_prefetch_venvs_multiple_filters(self, mock_registry): + def test_prefetch_venvs_multiple_filters(self, prefetch_venvs_func): """Test filtering with multiple filter strings (OR logic).""" - with ( - patch( - "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry - ), - patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, - ): + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: mock_create_venv.return_value = "/path/to/venv/bin/python" - prefetch_venvs(filters=["vllm", "megatron"]) + prefetch_venvs_func(filters=["vllm", "megatron"]) # Should create venvs for actors containing "vllm" OR "megatron" (2 actors) assert mock_create_venv.call_count == 2 @@ -112,49 +117,40 @@ def test_prefetch_venvs_multiple_filters(self, mock_registry): in actor_fqns ) - def test_prefetch_venvs_filter_no_match(self, mock_registry): + def test_prefetch_venvs_filter_no_match(self, prefetch_venvs_func): """Test that no venvs are created when filter matches nothing.""" - with ( - patch( - "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry - ), - patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, - ): + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: mock_create_venv.return_value = "/path/to/venv/bin/python" - prefetch_venvs(filters=["nonexistent"]) + prefetch_venvs_func(filters=["nonexistent"]) # Should not create any venvs assert mock_create_venv.call_count == 0 - def test_prefetch_venvs_skips_system_python(self, mock_registry): + def test_prefetch_venvs_skips_system_python(self, prefetch_venvs_func): """Test that system python actors are skipped even if they match filters.""" - with ( - patch( - "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry - ), - patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, - ): + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: mock_create_venv.return_value = "/path/to/venv/bin/python" # Filter for "environment" which matches system python actors - prefetch_venvs(filters=["environment"]) + prefetch_venvs_func(filters=["environment"]) # Should not create any venvs since matching actors use system python assert mock_create_venv.call_count == 0 - def test_prefetch_venvs_partial_match(self, mock_registry): + def test_prefetch_venvs_partial_match(self, prefetch_venvs_func): """Test that filter matches partial strings within FQN.""" - with ( - patch( - "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry - ), - patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, - ): + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: mock_create_venv.return_value = "/path/to/venv/bin/python" # "policy" should match both dtensor_policy_worker and megatron_policy_worker - prefetch_venvs(filters=["policy"]) + prefetch_venvs_func(filters=["policy"]) assert mock_create_venv.call_count == 2 @@ -170,30 +166,24 @@ def test_prefetch_venvs_partial_match(self, mock_registry): in actor_fqns ) - def test_prefetch_venvs_empty_filter_list(self, mock_registry): + def test_prefetch_venvs_empty_filter_list(self, prefetch_venvs_func): """Test that empty filter list is treated as no filtering (falsy).""" - with ( - patch( - "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry - ), - patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, - ): + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: mock_create_venv.return_value = "/path/to/venv/bin/python" # Empty list should be falsy and prefetch all - prefetch_venvs(filters=[]) + prefetch_venvs_func(filters=[]) # Should create venvs for all uv-based actors (3 total) assert mock_create_venv.call_count == 3 - def test_prefetch_venvs_continues_on_error(self, mock_registry): + def test_prefetch_venvs_continues_on_error(self, prefetch_venvs_func): """Test that prefetching continues even if one venv creation fails.""" - with ( - patch( - "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry - ), - patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, - ): + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: # First call raises, subsequent calls succeed mock_create_venv.side_effect = [ Exception("Test error"), @@ -202,22 +192,81 @@ def test_prefetch_venvs_continues_on_error(self, mock_registry): ] # Should not raise, should continue with other venvs - prefetch_venvs(filters=None) + prefetch_venvs_func(filters=None) # All 3 uv-based actors should have been attempted assert mock_create_venv.call_count == 3 - def test_prefetch_venvs_case_sensitive_filter(self, mock_registry): + def test_prefetch_venvs_case_sensitive_filter(self, prefetch_venvs_func): """Test that filters are case-sensitive.""" - with ( - patch( - "nemo_rl.utils.prefetch_venvs.ACTOR_ENVIRONMENT_REGISTRY", mock_registry - ), - patch("nemo_rl.utils.prefetch_venvs.create_local_venv") as mock_create_venv, - ): + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: mock_create_venv.return_value = "/path/to/venv/bin/python" # "VLLM" (uppercase) should not match "vllm" (lowercase) - prefetch_venvs(filters=["VLLM"]) + prefetch_venvs_func(filters=["VLLM"]) assert mock_create_venv.call_count == 0 + + def test_prefetch_venvs_summary_no_filters(self, prefetch_venvs_func, capsys): + """Test that summary is printed with correct counts and names when no filters.""" + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: + mock_create_venv.return_value = "/path/to/venv/bin/python" + + prefetch_venvs_func(filters=None) + + captured = capsys.readouterr() + assert "Venv prefetching complete! Summary:" in captured.out + assert "Prefetched: 3" in captured.out + assert "Skipped (system Python): 2" in captured.out + # Verify prefetched env names are listed + assert "VllmGenerationWorker" in captured.out + assert "DTensorPolicyWorker" in captured.out + assert "MegatronPolicyWorker" in captured.out + # Verify skipped env names are listed + assert "MathEnvironment" in captured.out + assert "CodeEnvironment" in captured.out + # "Skipped (filtered out)" should not appear when no filters + assert "Skipped (filtered out)" not in captured.out + + def test_prefetch_venvs_summary_with_filters(self, prefetch_venvs_func, capsys): + """Test that summary includes filtered out names when filters are used.""" + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: + mock_create_venv.return_value = "/path/to/venv/bin/python" + + prefetch_venvs_func(filters=["vllm"]) + + captured = capsys.readouterr() + assert "Venv prefetching complete! Summary:" in captured.out + assert "Prefetched: 1" in captured.out + assert "Skipped (system Python): 0" in captured.out + assert "Skipped (filtered out): 4" in captured.out + # Verify prefetched env name is listed + assert "VllmGenerationWorker" in captured.out + # Verify filtered out env names are listed + assert "DTensorPolicyWorker" in captured.out + assert "MegatronPolicyWorker" in captured.out + + def test_prefetch_venvs_summary_with_failures(self, prefetch_venvs_func, capsys): + """Test that summary includes failed actor names when errors occur.""" + with patch( + "nemo_rl.utils.prefetch_venvs.create_local_venv" + ) as mock_create_venv: + # First call raises, subsequent calls succeed + mock_create_venv.side_effect = [ + Exception("Test error"), + "/path/to/venv/bin/python", + "/path/to/venv/bin/python", + ] + + prefetch_venvs_func(filters=None) + + captured = capsys.readouterr() + assert "Venv prefetching complete! Summary:" in captured.out + assert "Prefetched: 2" in captured.out + assert "Failed: 1" in captured.out From 8dcce6d2d5ae93fe5a745db489b43677d7e3f8d7 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 23 Dec 2025 20:38:57 -0800 Subject: [PATCH 09/10] fix Signed-off-by: Hemil Desai --- tests/unit/utils/test_prefetch_venvs.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/tests/unit/utils/test_prefetch_venvs.py b/tests/unit/utils/test_prefetch_venvs.py index f6b14961d0..9cb67fd50a 100644 --- a/tests/unit/utils/test_prefetch_venvs.py +++ b/tests/unit/utils/test_prefetch_venvs.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import importlib from unittest.mock import patch import pytest @@ -33,14 +32,10 @@ def mock_registry(): @pytest.fixture def prefetch_venvs_func(mock_registry): - """Reload the module with mocked registry to ensure patches are applied.""" - with patch.dict( - "nemo_rl.distributed.ray_actor_environment_registry.ACTOR_ENVIRONMENT_REGISTRY", - mock_registry, - clear=True, + """Patch the registry directly in the prefetch_venvs module.""" + with patch.object( + prefetch_venvs_module, "ACTOR_ENVIRONMENT_REGISTRY", mock_registry ): - # Reload the module so it picks up the patched registry - importlib.reload(prefetch_venvs_module) yield prefetch_venvs_module.prefetch_venvs From 75934bf95e2b114ac20053a4826406c80a0c3c48 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Wed, 24 Dec 2025 11:36:18 -0800 Subject: [PATCH 10/10] fix Signed-off-by: Hemil Desai --- tests/unit/utils/test_prefetch_venvs.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/unit/utils/test_prefetch_venvs.py b/tests/unit/utils/test_prefetch_venvs.py index 9cb67fd50a..62c1886189 100644 --- a/tests/unit/utils/test_prefetch_venvs.py +++ b/tests/unit/utils/test_prefetch_venvs.py @@ -11,12 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os from unittest.mock import patch import pytest import nemo_rl.utils.prefetch_venvs as prefetch_venvs_module +# When NRL_CONTAINER is set, create_frozen_environment_symlinks also calls +# create_local_venv for each actor, effectively doubling the call count +CALL_MULTIPLIER = 2 if os.environ.get("NRL_CONTAINER") else 1 + @pytest.fixture def mock_registry(): @@ -51,7 +56,7 @@ def test_prefetch_venvs_no_filters(self, prefetch_venvs_func): prefetch_venvs_func(filters=None) - assert mock_create_venv.call_count == 3 + assert mock_create_venv.call_count == 3 * CALL_MULTIPLIER # Verify the actors that were called call_args = [call[0] for call in mock_create_venv.call_args_list] @@ -80,7 +85,7 @@ def test_prefetch_venvs_single_filter(self, prefetch_venvs_func): prefetch_venvs_func(filters=["vllm"]) # Should only create venvs for actors containing "vllm" (1 actor) - assert mock_create_venv.call_count == 1 + assert mock_create_venv.call_count == 1 * CALL_MULTIPLIER call_args = mock_create_venv.call_args[0] assert ( @@ -98,7 +103,7 @@ def test_prefetch_venvs_multiple_filters(self, prefetch_venvs_func): prefetch_venvs_func(filters=["vllm", "megatron"]) # Should create venvs for actors containing "vllm" OR "megatron" (2 actors) - assert mock_create_venv.call_count == 2 + assert mock_create_venv.call_count == 2 * CALL_MULTIPLIER call_args = [call[0] for call in mock_create_venv.call_args_list] actor_fqns = [args[1] for args in call_args] @@ -147,7 +152,7 @@ def test_prefetch_venvs_partial_match(self, prefetch_venvs_func): # "policy" should match both dtensor_policy_worker and megatron_policy_worker prefetch_venvs_func(filters=["policy"]) - assert mock_create_venv.call_count == 2 + assert mock_create_venv.call_count == 2 * CALL_MULTIPLIER call_args = [call[0] for call in mock_create_venv.call_args_list] actor_fqns = [args[1] for args in call_args] @@ -172,25 +177,25 @@ def test_prefetch_venvs_empty_filter_list(self, prefetch_venvs_func): prefetch_venvs_func(filters=[]) # Should create venvs for all uv-based actors (3 total) - assert mock_create_venv.call_count == 3 + assert mock_create_venv.call_count == 3 * CALL_MULTIPLIER def test_prefetch_venvs_continues_on_error(self, prefetch_venvs_func): """Test that prefetching continues even if one venv creation fails.""" with patch( "nemo_rl.utils.prefetch_venvs.create_local_venv" ) as mock_create_venv: - # First call raises, subsequent calls succeed + # Provide enough return values for both prefetch and frozen env symlinks mock_create_venv.side_effect = [ Exception("Test error"), "/path/to/venv/bin/python", "/path/to/venv/bin/python", - ] + ] * CALL_MULTIPLIER # Should not raise, should continue with other venvs prefetch_venvs_func(filters=None) # All 3 uv-based actors should have been attempted - assert mock_create_venv.call_count == 3 + assert mock_create_venv.call_count == 3 * CALL_MULTIPLIER def test_prefetch_venvs_case_sensitive_filter(self, prefetch_venvs_func): """Test that filters are case-sensitive.""" @@ -252,12 +257,12 @@ def test_prefetch_venvs_summary_with_failures(self, prefetch_venvs_func, capsys) with patch( "nemo_rl.utils.prefetch_venvs.create_local_venv" ) as mock_create_venv: - # First call raises, subsequent calls succeed + # Provide enough return values for both prefetch and frozen env symlinks mock_create_venv.side_effect = [ Exception("Test error"), "/path/to/venv/bin/python", "/path/to/venv/bin/python", - ] + ] * CALL_MULTIPLIER prefetch_venvs_func(filters=None)