From ed4c526345a94cf7b418a609ff59a133c0fb5190 Mon Sep 17 00:00:00 2001 From: xgui Date: Thu, 6 Nov 2025 17:10:54 +0000 Subject: [PATCH 1/6] remove unnecessary storage_context validation Signed-off-by: xgui --- python/ray/train/v2/_internal/execution/storage.py | 6 ++++-- python/ray/train/v2/api/result.py | 1 + python/ray/train/v2/tests/test_result.py | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/python/ray/train/v2/_internal/execution/storage.py b/python/ray/train/v2/_internal/execution/storage.py index abf80697da36..df41a6585868 100644 --- a/python/ray/train/v2/_internal/execution/storage.py +++ b/python/ray/train/v2/_internal/execution/storage.py @@ -383,6 +383,7 @@ def __init__( storage_path: Union[str, os.PathLike], experiment_dir_name: str, storage_filesystem: Optional[pyarrow.fs.FileSystem] = None, + skip_validation: bool = False, ): self.custom_fs_provided = storage_filesystem is not None @@ -395,8 +396,9 @@ def __init__( ) self.storage_fs_path = Path(self.storage_fs_path).as_posix() - self._create_validation_file() - self._check_validation_file() + if not skip_validation: + self._create_validation_file() + self._check_validation_file() def __str__(self): return ( diff --git a/python/ray/train/v2/api/result.py b/python/ray/train/v2/api/result.py index 908f2b63c3a5..490b2c2c0800 100644 --- a/python/ray/train/v2/api/result.py +++ b/python/ray/train/v2/api/result.py @@ -68,6 +68,7 @@ def from_path( storage_path=storage_path, experiment_dir_name=experiment_dir_name, storage_filesystem=fs, + skip_validation=True, ) # Validate that the checkpoint manager snapshot file exists diff --git a/python/ray/train/v2/tests/test_result.py b/python/ray/train/v2/tests/test_result.py index 676e4447d060..1db329f43177 100644 --- a/python/ray/train/v2/tests/test_result.py +++ b/python/ray/train/v2/tests/test_result.py @@ -160,6 +160,7 @@ def test_result_restore( storage_context = StorageContext( storage_path=storage_path, experiment_dir_name=exp_name, + skip_validation=True, ) trial_dir = storage_context.experiment_fs_path From 881ba7b625c63118a18f0713e6caef7c540b216e Mon Sep 17 00:00:00 2001 From: xgui Date: Thu, 6 Nov 2025 17:17:32 +0000 Subject: [PATCH 2/6] remove unnecessary storage_context validation Signed-off-by: xgui --- python/ray/train/v2/_internal/execution/storage.py | 11 +++++++++++ python/ray/train/v2/tests/test_result.py | 5 ++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/python/ray/train/v2/_internal/execution/storage.py b/python/ray/train/v2/_internal/execution/storage.py index df41a6585868..bc645aae09c4 100644 --- a/python/ray/train/v2/_internal/execution/storage.py +++ b/python/ray/train/v2/_internal/execution/storage.py @@ -313,6 +313,17 @@ class StorageContext: """Shared context that holds the source of truth for all paths and storage utilities, passed along from the driver to workers. + Args: + storage_path: Path where all results and checkpoints are persisted. + Can be a local directory or a remote URI (e.g., s3://bucket/path). + experiment_dir_name: Name of the experiment directory within the storage path. + storage_filesystem: Optional custom PyArrow filesystem to use for storage. + If not provided, will be auto-resolved from the storage_path URI. + skip_validation: If True, skips creating and checking the storage validation + marker file. This should be set to True for read-only operations (e.g., + restoring from an existing experiment directory) to avoid unnecessary + file system writes. Defaults to False. + This object defines a few types of paths: 1. *_fs_path: A path on the `storage_filesystem`. This is a regular path which has been prefix-stripped by pyarrow.fs.FileSystem.from_uri and diff --git a/python/ray/train/v2/tests/test_result.py b/python/ray/train/v2/tests/test_result.py index 1db329f43177..5ff1c3668481 100644 --- a/python/ray/train/v2/tests/test_result.py +++ b/python/ray/train/v2/tests/test_result.py @@ -188,12 +188,11 @@ def test_result_restore( assert len(result.best_checkpoints) == num_checkpoints """ - Top-3 checkpoints with metrics: + Top-2 checkpoints with metrics: | iter | metric_a metric_b - checkpoint_000004 4 4 -4 - checkpoint_000003 3 3 -3 checkpoint_000002 2 2 -2 + checkpoint_000001 1 1 -1 """ # Check if the checkpoints bounded with correct metrics best_ckpt_a = result.get_best_checkpoint(metric="metric_a", mode="max") From bafb1c31c7ce9c2351c496881f07329669798638 Mon Sep 17 00:00:00 2001 From: xgui Date: Thu, 6 Nov 2025 21:30:55 +0000 Subject: [PATCH 3/6] try to fix timeout Signed-off-by: xgui --- python/ray/train/v2/tests/conftest.py | 24 --------------- python/ray/train/v2/tests/test_result.py | 37 ++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/python/ray/train/v2/tests/conftest.py b/python/ray/train/v2/tests/conftest.py index 4f2c84d0af57..7218f7bc306a 100644 --- a/python/ray/train/v2/tests/conftest.py +++ b/python/ray/train/v2/tests/conftest.py @@ -1,11 +1,9 @@ import logging -import boto3 import pytest import ray from ray import runtime_context -from ray._common.test_utils import simulate_s3_bucket from ray.cluster_utils import Cluster from ray.train.v2._internal.constants import ( ENABLE_STATE_ACTOR_RECONCILIATION_ENV_VAR, @@ -82,25 +80,3 @@ def mock_current_actor(self): ) yield - - -@pytest.fixture -def mock_s3_bucket_uri(): - from ray.air._internal.uri_utils import URI - - port = 5002 - region = "us-west-2" - with simulate_s3_bucket(port=port, region=region) as s3_uri: - s3 = boto3.client( - "s3", region_name=region, endpoint_url=f"http://localhost:{port}" - ) - # Bucket name will be autogenerated/unique per test - bucket_name = URI(s3_uri).name - s3.create_bucket( - Bucket=bucket_name, - CreateBucketConfiguration={"LocationConstraint": region}, - ) - # Disable server HTTP request logging - logging.getLogger("werkzeug").setLevel(logging.WARNING) - yield s3_uri - logging.getLogger("werkzeug").setLevel(logging.INFO) diff --git a/python/ray/train/v2/tests/test_result.py b/python/ray/train/v2/tests/test_result.py index 5ff1c3668481..ee10a7b4d1c1 100644 --- a/python/ray/train/v2/tests/test_result.py +++ b/python/ray/train/v2/tests/test_result.py @@ -1,11 +1,15 @@ +import logging +import uuid from pathlib import Path from urllib.parse import urlparse, urlunparse +import boto3 import pyarrow.fs import pytest import ray from ray import train +from ray._common.test_utils import simulate_s3_bucket from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig from ray.train.tests.util import create_dict_checkpoint, load_dict_checkpoint from ray.train.torch import TorchTrainer @@ -15,6 +19,34 @@ from ray.train.v2.api.result import Result +@pytest.fixture(scope="module") +def mock_s3_bucket_uri(): + """Mock S3 bucket for testing. + + Module-scoped to avoid the overhead of starting/stopping moto server + for each test. The bucket name is unique per module, and individual + tests write to unique experiment directories within the bucket. + """ + from ray.air._internal.uri_utils import URI + + port = 5002 + region = "us-west-2" + with simulate_s3_bucket(port=port, region=region) as s3_uri: + s3 = boto3.client( + "s3", region_name=region, endpoint_url=f"http://localhost:{port}" + ) + # Bucket name will be autogenerated/unique per test + bucket_name = URI(s3_uri).name + s3.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={"LocationConstraint": region}, + ) + # Disable server HTTP request logging + logging.getLogger("werkzeug").setLevel(logging.WARNING) + yield s3_uri + logging.getLogger("werkzeug").setLevel(logging.INFO) + + def uri_join(base_uri: str, *paths: str) -> str: """ Join a base URI (local or remote) with one or more subpaths. @@ -116,13 +148,13 @@ def test_get_best_checkpoint(): ) +@pytest.mark.timeout(240) @pytest.mark.parametrize("storage", ["local", "remote"]) @pytest.mark.parametrize("path_type", ["str", "PathLike"]) @pytest.mark.parametrize("pass_storage_filesystem", [True, False]) @pytest.mark.parametrize("trailing_slash", [False, True]) def test_result_restore( ray_start_4_cpus, - monkeypatch, tmp_path, storage, mock_s3_bucket_uri, @@ -144,7 +176,8 @@ def test_result_restore( elif storage == "remote": storage_path = str(mock_s3_bucket_uri) - exp_name = "test_result_restore_v2" + # Add UUID to ensure test isolation when sharing module-scoped S3 mock + exp_name = f"test_result_restore_v2-{uuid.uuid4().hex[:8]}" trainer = build_dummy_trainer( exp_name, From 81bf60a8343c66c5f5fc22ae21112618d7bd82f8 Mon Sep 17 00:00:00 2001 From: xgui Date: Thu, 6 Nov 2025 21:33:10 +0000 Subject: [PATCH 4/6] avoid confusion Signed-off-by: xgui --- .../ray/train/v2/_internal/execution/storage.py | 16 ---------------- python/ray/train/v2/api/result.py | 1 - python/ray/train/v2/tests/test_result.py | 1 - 3 files changed, 18 deletions(-) diff --git a/python/ray/train/v2/_internal/execution/storage.py b/python/ray/train/v2/_internal/execution/storage.py index bc645aae09c4..fc6299c957bb 100644 --- a/python/ray/train/v2/_internal/execution/storage.py +++ b/python/ray/train/v2/_internal/execution/storage.py @@ -313,17 +313,6 @@ class StorageContext: """Shared context that holds the source of truth for all paths and storage utilities, passed along from the driver to workers. - Args: - storage_path: Path where all results and checkpoints are persisted. - Can be a local directory or a remote URI (e.g., s3://bucket/path). - experiment_dir_name: Name of the experiment directory within the storage path. - storage_filesystem: Optional custom PyArrow filesystem to use for storage. - If not provided, will be auto-resolved from the storage_path URI. - skip_validation: If True, skips creating and checking the storage validation - marker file. This should be set to True for read-only operations (e.g., - restoring from an existing experiment directory) to avoid unnecessary - file system writes. Defaults to False. - This object defines a few types of paths: 1. *_fs_path: A path on the `storage_filesystem`. This is a regular path which has been prefix-stripped by pyarrow.fs.FileSystem.from_uri and @@ -394,7 +383,6 @@ def __init__( storage_path: Union[str, os.PathLike], experiment_dir_name: str, storage_filesystem: Optional[pyarrow.fs.FileSystem] = None, - skip_validation: bool = False, ): self.custom_fs_provided = storage_filesystem is not None @@ -407,10 +395,6 @@ def __init__( ) self.storage_fs_path = Path(self.storage_fs_path).as_posix() - if not skip_validation: - self._create_validation_file() - self._check_validation_file() - def __str__(self): return ( "StorageContext<\n" diff --git a/python/ray/train/v2/api/result.py b/python/ray/train/v2/api/result.py index 490b2c2c0800..908f2b63c3a5 100644 --- a/python/ray/train/v2/api/result.py +++ b/python/ray/train/v2/api/result.py @@ -68,7 +68,6 @@ def from_path( storage_path=storage_path, experiment_dir_name=experiment_dir_name, storage_filesystem=fs, - skip_validation=True, ) # Validate that the checkpoint manager snapshot file exists diff --git a/python/ray/train/v2/tests/test_result.py b/python/ray/train/v2/tests/test_result.py index ee10a7b4d1c1..58c1bebe6a28 100644 --- a/python/ray/train/v2/tests/test_result.py +++ b/python/ray/train/v2/tests/test_result.py @@ -193,7 +193,6 @@ def test_result_restore( storage_context = StorageContext( storage_path=storage_path, experiment_dir_name=exp_name, - skip_validation=True, ) trial_dir = storage_context.experiment_fs_path From 7e5574c44b380fcb727c08bd85b5051625abc50e Mon Sep 17 00:00:00 2001 From: xgui Date: Thu, 6 Nov 2025 21:34:47 +0000 Subject: [PATCH 5/6] fix Signed-off-by: xgui --- python/ray/train/v2/_internal/execution/storage.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/train/v2/_internal/execution/storage.py b/python/ray/train/v2/_internal/execution/storage.py index fc6299c957bb..abf80697da36 100644 --- a/python/ray/train/v2/_internal/execution/storage.py +++ b/python/ray/train/v2/_internal/execution/storage.py @@ -395,6 +395,9 @@ def __init__( ) self.storage_fs_path = Path(self.storage_fs_path).as_posix() + self._create_validation_file() + self._check_validation_file() + def __str__(self): return ( "StorageContext<\n" From 9fa99db175908f36a89b9f30ee5e85cec975d2f8 Mon Sep 17 00:00:00 2001 From: xgui Date: Thu, 6 Nov 2025 22:39:46 +0000 Subject: [PATCH 6/6] remove remote tests for now Signed-off-by: xgui --- python/ray/train/v2/tests/test_result.py | 66 ++---------------------- 1 file changed, 4 insertions(+), 62 deletions(-) diff --git a/python/ray/train/v2/tests/test_result.py b/python/ray/train/v2/tests/test_result.py index 58c1bebe6a28..d6c05008660d 100644 --- a/python/ray/train/v2/tests/test_result.py +++ b/python/ray/train/v2/tests/test_result.py @@ -1,15 +1,11 @@ -import logging import uuid from pathlib import Path from urllib.parse import urlparse, urlunparse -import boto3 -import pyarrow.fs import pytest import ray from ray import train -from ray._common.test_utils import simulate_s3_bucket from ray.train import Checkpoint, CheckpointConfig, RunConfig, ScalingConfig from ray.train.tests.util import create_dict_checkpoint, load_dict_checkpoint from ray.train.torch import TorchTrainer @@ -19,34 +15,6 @@ from ray.train.v2.api.result import Result -@pytest.fixture(scope="module") -def mock_s3_bucket_uri(): - """Mock S3 bucket for testing. - - Module-scoped to avoid the overhead of starting/stopping moto server - for each test. The bucket name is unique per module, and individual - tests write to unique experiment directories within the bucket. - """ - from ray.air._internal.uri_utils import URI - - port = 5002 - region = "us-west-2" - with simulate_s3_bucket(port=port, region=region) as s3_uri: - s3 = boto3.client( - "s3", region_name=region, endpoint_url=f"http://localhost:{port}" - ) - # Bucket name will be autogenerated/unique per test - bucket_name = URI(s3_uri).name - s3.create_bucket( - Bucket=bucket_name, - CreateBucketConfiguration={"LocationConstraint": region}, - ) - # Disable server HTTP request logging - logging.getLogger("werkzeug").setLevel(logging.WARNING) - yield s3_uri - logging.getLogger("werkzeug").setLevel(logging.INFO) - - def uri_join(base_uri: str, *paths: str) -> str: """ Join a base URI (local or remote) with one or more subpaths. @@ -148,33 +116,22 @@ def test_get_best_checkpoint(): ) -@pytest.mark.timeout(240) -@pytest.mark.parametrize("storage", ["local", "remote"]) @pytest.mark.parametrize("path_type", ["str", "PathLike"]) @pytest.mark.parametrize("pass_storage_filesystem", [True, False]) @pytest.mark.parametrize("trailing_slash", [False, True]) def test_result_restore( ray_start_4_cpus, tmp_path, - storage, - mock_s3_bucket_uri, path_type, pass_storage_filesystem, trailing_slash, ): """Test Result.from_path functionality similar to v1 test_result_restore.""" - if path_type == "PathLike" and storage == "remote": - # Path will collapse URI scheme separators (s3:// becomes s3:/) - return - num_iterations = 3 num_checkpoints = 2 - if storage == "local": - storage_path = str(tmp_path) - elif storage == "remote": - storage_path = str(mock_s3_bucket_uri) + storage_path = str(tmp_path) # Add UUID to ensure test isolation when sharing module-scoped S3 mock exp_name = f"test_result_restore_v2-{uuid.uuid4().hex[:8]}" @@ -237,36 +194,21 @@ def test_result_restore( result.get_best_checkpoint(metric="invalid_metric", mode="max") -@pytest.mark.parametrize("storage", ["local", "remote"]) def test_result_from_path_validation( ray_start_4_cpus, tmp_path, - storage, - mock_s3_bucket_uri, ): """Test that Result.from_path raises RuntimeError when folder or snapshot file doesn't exist.""" - if storage == "local": - storage_path = str(tmp_path) - nonexistent_folder = str(tmp_path / "nonexistent_experiment") - existing_folder = str(tmp_path / "existing_experiment") - elif storage == "remote": - storage_path = str(mock_s3_bucket_uri) - nonexistent_folder = uri_join(storage_path, "nonexistent_experiment") - existing_folder = uri_join(storage_path, "existing_experiment") + nonexistent_folder = str(tmp_path / "nonexistent_experiment") + existing_folder = str(tmp_path / "existing_experiment") # Test 1: Folder doesn't exist with pytest.raises(RuntimeError, match="Experiment folder .* doesn't exist."): Result.from_path(nonexistent_folder) # Test 2: Folder exists but snapshot file doesn't exist - if storage == "local": - Path(existing_folder).mkdir(parents=True, exist_ok=True) - else: - # For S3, we need to create a dummy file to ensure the folder exists - fs, fs_path = pyarrow.fs.FileSystem.from_uri(existing_folder) - with fs.open_output_stream(f"{fs_path}/.dummy") as f: - f.write(b"dummy") + Path(existing_folder).mkdir(parents=True, exist_ok=True) with pytest.raises( RuntimeError,