Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/air/tests/test_new_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def _run_data_config_resource_test(data_config):
num_train_cpus = num_workers * cpus_per_worker + default_trainer_cpus
num_train_gpus = num_workers * gpus_per_worker + default_trainer_gpus

original_execution_options = data_config._execution_options
original_execution_options = data_config._get_execution_options("train")

ray.init(num_cpus=cluster_cpus, num_gpus=cluster_gpus)

Expand Down
36 changes: 26 additions & 10 deletions python/ray/train/_internal/data_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
from collections import defaultdict
from typing import Dict, List, Literal, Optional, Union

import ray
Expand All @@ -19,7 +20,9 @@ class DataConfig:
def __init__(
self,
datasets_to_split: Union[Literal["all"], List[str]] = "all",
execution_options: Optional[ExecutionOptions] = None,
execution_options: Optional[
Union[ExecutionOptions, Dict[str, ExecutionOptions]]
] = None,
enable_shard_locality: bool = True,
):
"""Construct a DataConfig.
Expand All @@ -28,12 +31,14 @@ def __init__(
datasets_to_split: Specifies which datasets should be split among workers.
Can be set to "all" or a list of dataset names. Defaults to "all",
i.e. split all datasets.
execution_options: The execution options to pass to Ray Data. By default,
the options will be optimized for data ingest. When overriding this,
base your options off of `DataConfig.default_ingest_options()`.
enable_shard_locality: If true, when sharding the datasets across Train
workers, locality will be considered to minimize cross-node data transfer.
This is on by default.
execution_options: The execution options to pass to Ray Data. Can be either:
1. A single ExecutionOptions object that is applied to all datasets.
2. A dict mapping dataset names to ExecutionOptions. If a dataset name
is not in the dict, it defaults to ``DataConfig.default_ingest_options()``.
By default, the options are optimized for data ingest. When overriding,
base your options off ``DataConfig.default_ingest_options()``.
enable_shard_locality: If true, dataset sharding across Train workers will
consider locality to minimize cross-node data transfer. Enabled by default.
"""
if isinstance(datasets_to_split, list) or datasets_to_split == "all":
self._datasets_to_split = datasets_to_split
Expand All @@ -44,9 +49,16 @@ def __init__(
f"{type(datasets_to_split).__name__} with value {datasets_to_split}."
)

self._execution_options: ExecutionOptions = (
execution_options or DataConfig.default_ingest_options()
default_execution_options = DataConfig.default_ingest_options()
if isinstance(execution_options, ExecutionOptions):
default_execution_options = execution_options
# If None, all datasets will use the default ingest options.
self._execution_options: Dict[str, ExecutionOptions] = defaultdict(
lambda: copy.deepcopy(default_execution_options)
)
if isinstance(execution_options, dict):
self._execution_options.update(execution_options)

self._enable_shard_locality = enable_shard_locality

self._num_train_cpus = 0.0
Expand All @@ -62,6 +74,10 @@ def set_train_total_resources(self, num_train_cpus: float, num_train_gpus: float
self._num_train_cpus = num_train_cpus
self._num_train_gpus = num_train_gpus

def _get_execution_options(self, dataset_name: str) -> ExecutionOptions:
"""Return a copy of the configured execution options for a given dataset name."""
return copy.deepcopy(self._execution_options[dataset_name])

@DeveloperAPI
def configure(
self,
Expand Down Expand Up @@ -98,7 +114,7 @@ def configure(

locality_hints = worker_node_ids if self._enable_shard_locality else None
for name, ds in datasets.items():
execution_options = copy.deepcopy(self._execution_options)
execution_options = self._get_execution_options(name)

if execution_options.is_resource_limits_default():
# If "resource_limits" is not overriden by the user,
Expand Down
16 changes: 16 additions & 0 deletions python/ray/train/v2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,19 @@ py_test(
"//:ray_lib",
],
)

py_test(
name = "test_data_config",
size = "medium",
srcs = ["tests/test_data_config.py"],
env = {"RAY_TRAIN_V2_ENABLED": "1"},
tags = [
"exclusive",
"team:ml",
"train_v2",
],
deps = [
":conftest",
"//:ray_lib",
],
)
104 changes: 104 additions & 0 deletions python/ray/train/v2/tests/test_data_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionOptions,
)
from ray.train import DataConfig


def test_per_dataset_execution_options_single(ray_start_4_cpus):
"""Test that a single ExecutionOptions object applies to all datasets."""
# Create execution options with specific settings
execution_options = ExecutionOptions()
execution_options.preserve_order = True
execution_options.verbose_progress = True

data_config = DataConfig(execution_options=execution_options)

# Verify that all datasets get the same execution options
train_options = data_config._get_execution_options("train")
test_options = data_config._get_execution_options("test")
val_options = data_config._get_execution_options("val")

assert train_options.preserve_order is True
assert train_options.verbose_progress is True
assert test_options.preserve_order is True
assert test_options.verbose_progress is True
assert val_options.preserve_order is True
assert val_options.verbose_progress is True


def test_per_dataset_execution_options_dict(ray_start_4_cpus):
"""Test that a dict of ExecutionOptions maps to specific datasets, and datasets
not in the dict get default ingest options. Also tests resource limits."""
# Create different execution options for different datasets
train_options = ExecutionOptions()
train_options.preserve_order = True
train_options.verbose_progress = True
train_options.resource_limits = train_options.resource_limits.copy(cpu=4, gpu=2)

test_options = ExecutionOptions()
test_options.preserve_order = False
test_options.verbose_progress = False
test_options.resource_limits = test_options.resource_limits.copy(cpu=2, gpu=1)

execution_options_dict = {
"train": train_options,
"test": test_options,
}

data_config = DataConfig(execution_options=execution_options_dict)

# Verify that each dataset in the dict gets its specific options
retrieved_train_options = data_config._get_execution_options("train")
retrieved_test_options = data_config._get_execution_options("test")

assert retrieved_train_options.preserve_order is True
assert retrieved_train_options.verbose_progress is True
assert retrieved_test_options.preserve_order is False
assert retrieved_test_options.verbose_progress is False

# Verify resource limits
assert retrieved_train_options.resource_limits.cpu == 4
assert retrieved_train_options.resource_limits.gpu == 2
assert retrieved_test_options.resource_limits.cpu == 2
assert retrieved_test_options.resource_limits.gpu == 1

# Verify that a dataset not in the dict gets default options
default_options = DataConfig.default_ingest_options()
retrieved_val_options = data_config._get_execution_options("val")
assert retrieved_val_options.preserve_order == default_options.preserve_order
assert retrieved_val_options.verbose_progress == default_options.verbose_progress
assert (
retrieved_val_options.resource_limits.cpu == default_options.resource_limits.cpu
)
assert (
retrieved_val_options.resource_limits.gpu == default_options.resource_limits.gpu
)


def test_per_dataset_execution_options_default(ray_start_4_cpus):
"""Test that None or empty dict execution_options results in all datasets
using default options."""
# Test with None
data_config_none = DataConfig(execution_options=None)
default_options = DataConfig.default_ingest_options()
retrieved_train_options = data_config_none._get_execution_options("train")
retrieved_test_options = data_config_none._get_execution_options("test")

assert retrieved_train_options.preserve_order == default_options.preserve_order
assert retrieved_test_options.preserve_order == default_options.preserve_order

# Test with empty dict
data_config_empty = DataConfig(execution_options={})
retrieved_train_options = data_config_empty._get_execution_options("train")
retrieved_test_options = data_config_empty._get_execution_options("test")

assert retrieved_train_options.preserve_order == default_options.preserve_order
assert retrieved_test_options.preserve_order == default_options.preserve_order


if __name__ == "__main__":
import sys

import pytest

sys.exit(pytest.main(["-v", "-x", __file__]))
178 changes: 178 additions & 0 deletions python/ray/train/v2/tests/test_data_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,184 @@ def check_resource_limits(config):
trainer.fit()


def test_per_dataset_execution_options_single(ray_start_4_cpus):
"""Test that a single ExecutionOptions object applies to all datasets."""
NUM_ROWS = 100
NUM_WORKERS = 2

train_ds = ray.data.range(NUM_ROWS)
val_ds = ray.data.range(NUM_ROWS)

# Create execution options with specific settings
execution_options = ExecutionOptions()
execution_options.preserve_order = True
execution_options.verbose_progress = True

data_config = ray.train.DataConfig(execution_options=execution_options)

def train_fn():
train_shard = ray.train.get_dataset_shard("train")
val_shard = ray.train.get_dataset_shard("val")

# Verify both datasets have the same execution options
assert train_shard.get_context().execution_options.preserve_order is True
assert train_shard.get_context().execution_options.verbose_progress is True
assert val_shard.get_context().execution_options.preserve_order is True
assert val_shard.get_context().execution_options.verbose_progress is True

trainer = DataParallelTrainer(
train_fn,
datasets={"train": train_ds, "val": val_ds},
dataset_config=data_config,
scaling_config=ray.train.ScalingConfig(num_workers=NUM_WORKERS),
)
trainer.fit()


def test_per_dataset_execution_options_dict(ray_start_4_cpus):
"""Test that a dict of ExecutionOptions maps to specific datasets, and datasets not in the dict get default ingest options. Also tests resource limits."""
NUM_ROWS = 100
NUM_WORKERS = 2

train_ds = ray.data.range(NUM_ROWS)
val_ds = ray.data.range(NUM_ROWS)
test_ds = ray.data.range(NUM_ROWS)
test_ds_2 = ray.data.range(NUM_ROWS)

# Create different execution options for different datasets
train_options = ExecutionOptions()
train_options.preserve_order = True
train_options.verbose_progress = True
train_options.resource_limits = train_options.resource_limits.copy(cpu=4, gpu=2)

val_options = ExecutionOptions()
val_options.preserve_order = False
val_options.verbose_progress = False
val_options.resource_limits = val_options.resource_limits.copy(cpu=2, gpu=1)

execution_options_dict = {
"train": train_options,
"val": val_options,
}

data_config = ray.train.DataConfig(execution_options=execution_options_dict)

def train_fn():
train_shard = ray.train.get_dataset_shard("train")
val_shard = ray.train.get_dataset_shard("val")
test_shard = ray.train.get_dataset_shard("test")
test_shard_2 = ray.train.get_dataset_shard("test_2")

# Verify each dataset in the dict gets its specific options
assert train_shard.get_context().execution_options.preserve_order is True
assert train_shard.get_context().execution_options.verbose_progress is True
assert val_shard.get_context().execution_options.preserve_order is False
assert val_shard.get_context().execution_options.verbose_progress is False

# Verify resource limits
assert train_shard.get_context().execution_options.resource_limits.cpu == 4
assert train_shard.get_context().execution_options.resource_limits.gpu == 2
assert val_shard.get_context().execution_options.resource_limits.cpu == 2
assert val_shard.get_context().execution_options.resource_limits.gpu == 1

# Verify dataset not in the dict gets default options
assert (
test_shard.get_context().execution_options.preserve_order
== test_shard_2.get_context().execution_options.preserve_order
)
assert (
test_shard.get_context().execution_options.verbose_progress
== test_shard_2.get_context().execution_options.verbose_progress
)
assert (
test_shard.get_context().execution_options.resource_limits.cpu
== test_shard_2.get_context().execution_options.resource_limits.cpu
)
assert (
test_shard.get_context().execution_options.resource_limits.gpu
== test_shard_2.get_context().execution_options.resource_limits.gpu
)

trainer = DataParallelTrainer(
train_fn,
datasets={
"train": train_ds,
"val": val_ds,
"test": test_ds,
"test_2": test_ds_2,
},
dataset_config=data_config,
scaling_config=ray.train.ScalingConfig(num_workers=NUM_WORKERS),
)
trainer.fit()


def test_exclude_train_resources_applies_to_each_dataset(ray_start_4_cpus):
"""Test that the default behavior of excluding train worker resources
applies to each dataset individually when using per-dataset execution options."""
NUM_ROWS = 100
NUM_WORKERS = 2

# Create different execution options for different datasets
train_options = ExecutionOptions()
train_options.exclude_resources = train_options.exclude_resources.copy(cpu=2, gpu=1)

test_options = ExecutionOptions()
test_options.exclude_resources = test_options.exclude_resources.copy(cpu=1, gpu=0)

# val dataset not in dict, should get default options
execution_options_dict = {
"train": train_options,
"test": test_options,
}
data_config = ray.train.DataConfig(execution_options=execution_options_dict)

def train_fn():
# Check that each dataset has the train resources excluded,
# in addition to any per-dataset exclude_resources.

# Check train dataset
train_ds = ray.train.get_dataset_shard("train")
train_exec_options = train_ds.get_context().execution_options
assert train_exec_options.is_resource_limits_default()
# Train worker resources: NUM_WORKERS CPUs (default 1 CPU per worker)
expected_train_cpu = NUM_WORKERS + 2 # 2 from user-defined
expected_train_gpu = 0 + 1 # 1 from user-defined (no GPUs allocated)
assert train_exec_options.exclude_resources.cpu == expected_train_cpu
assert train_exec_options.exclude_resources.gpu == expected_train_gpu

# Check test dataset
test_ds = ray.train.get_dataset_shard("test")
test_exec_options = test_ds.get_context().execution_options
assert test_exec_options.is_resource_limits_default()
expected_test_cpu = NUM_WORKERS + 1 # 1 from user-defined
expected_test_gpu = 0 + 0 # 0 from user-defined
assert test_exec_options.exclude_resources.cpu == expected_test_cpu
assert test_exec_options.exclude_resources.gpu == expected_test_gpu

# Check val dataset (should have default + train resources excluded)
val_ds = ray.train.get_dataset_shard("val")
val_exec_options = val_ds.get_context().execution_options
assert val_exec_options.is_resource_limits_default()
default_options = ray.train.DataConfig.default_ingest_options()
expected_val_cpu = NUM_WORKERS + default_options.exclude_resources.cpu
expected_val_gpu = 0 + default_options.exclude_resources.gpu
assert val_exec_options.exclude_resources.cpu == expected_val_cpu
assert val_exec_options.exclude_resources.gpu == expected_val_gpu

trainer = DataParallelTrainer(
train_fn,
datasets={
"train": ray.data.range(NUM_ROWS),
"test": ray.data.range(NUM_ROWS),
"val": ray.data.range(NUM_ROWS),
},
dataset_config=data_config,
scaling_config=ray.train.ScalingConfig(num_workers=NUM_WORKERS),
)
trainer.fit()


if __name__ == "__main__":
import sys

Expand Down