From a716c381cf2eeb667490d23c25f77056d375c3fb Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Mon, 10 Nov 2025 14:12:01 -0800 Subject: [PATCH 1/5] [data] Add ranker interface in streaming executor Signed-off-by: iamjustinhsu --- python/ray/data/BUILD.bazel | 41 +++++++ .../ray/data/_internal/execution/__init__.py | 14 +++ python/ray/data/_internal/execution/ranker.py | 100 ++++++++++++++++++ .../_internal/execution/streaming_executor.py | 24 +++-- .../execution/streaming_executor_state.py | 46 +------- python/ray/data/tests/test_ranker.py | 82 ++++++++++++++ .../ray/data/tests/test_streaming_executor.py | 20 +++- 7 files changed, 271 insertions(+), 56 deletions(-) create mode 100644 python/ray/data/_internal/execution/ranker.py create mode 100644 python/ray/data/tests/test_ranker.py diff --git a/python/ray/data/BUILD.bazel b/python/ray/data/BUILD.bazel index 66449628ecc7..e90d0ad9df1e 100644 --- a/python/ray/data/BUILD.bazel +++ b/python/ray/data/BUILD.bazel @@ -53,6 +53,19 @@ py_test_module_list( ], ) +py_test_module_list( + size = "small", + files = glob(["tests/anyscale/test_*.py"]), + tags = [ + "exclusive", + "team:data", + ], + deps = [ + ":conftest", + "//:ray_lib", + ], +) + py_test( name = "test_formats", size = "medium", @@ -96,6 +109,20 @@ py_test( ], ) +py_test( + name = "test_issue_detection", + size = "small", + srcs = ["tests/test_issue_detection.py"], + tags = [ + "exclusive", + "team:data", + ], + deps = [ + ":conftest", + "//:ray_lib", + ], +) + py_test( name = "test_numpy_support", size = "small", @@ -1349,6 +1376,20 @@ py_test( ], ) +py_test( + name = "test_ranker", + size = "small", + srcs = ["tests/test_ranker.py"], + tags = [ + "exclusive", + "team:data", + ], + deps = [ + ":conftest", + "//:ray_lib", + ], +) + py_test( name = "test_raydp", size = "medium", diff --git a/python/ray/data/_internal/execution/__init__.py b/python/ray/data/_internal/execution/__init__.py index e69de29bb2d1..8569bde8d3fd 100644 --- a/python/ray/data/_internal/execution/__init__.py +++ b/python/ray/data/_internal/execution/__init__.py @@ -0,0 +1,14 @@ +from .ranker import DefaultRanker, Ranker + + +def create_ranker() -> Ranker: + """Create a ranker instance based on environment and configuration.""" + from ray._private.ray_constants import env_bool + + # Check if RayTurbo ranker should be used + if env_bool("RAY_DATA_USE_TURBO_RANKER", True): + from ray.anyscale.data._internal.execution.ranker import LocationAwareRanker + + return LocationAwareRanker() + else: + return DefaultRanker() diff --git a/python/ray/data/_internal/execution/ranker.py b/python/ray/data/_internal/execution/ranker.py new file mode 100644 index 000000000000..72d22b901305 --- /dev/null +++ b/python/ray/data/_internal/execution/ranker.py @@ -0,0 +1,100 @@ +"""Ranker component for operator selection in streaming executor.""" + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Generic, List, Protocol, Tuple, TypeVar + +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.execution.resource_manager import ResourceManager + +if TYPE_CHECKING: + from ray.data._internal.execution.streaming_executor_state import Topology + +# Protocol for comparable ranking values +class Comparable(Protocol): + """Protocol for types that can be compared for ranking.""" + + def __lt__(self, other: "Comparable") -> bool: + ... + + def __le__(self, other: "Comparable") -> bool: + ... + + def __gt__(self, other: "Comparable") -> bool: + ... + + def __ge__(self, other: "Comparable") -> bool: + ... + + def __eq__(self, other: "Comparable") -> bool: + ... + + +# Generic type for comparable ranking values +RankingValue = TypeVar("RankingValue", bound=Comparable) + + +class Ranker(ABC, Generic[RankingValue]): + """Abstract base class for operator ranking strategies.""" + + @abstractmethod + def rank_operator( + self, + op: PhysicalOperator, + topology: "Topology", + resource_manager: ResourceManager, + ) -> RankingValue: + """Rank operator for execution priority. + + Operator to run next is selected as the one with the *smallest* value + of the lexicographically ordered ranks composed of (in order): + + Args: + ops: Operator to rank + topology: Current execution topology + resource_manager: Resource manager for usage information + + Returns: + Rank (tuple) for operator + """ + pass + + def rank_operators( + self, + ops: List[PhysicalOperator], + topology: "Topology", + resource_manager: ResourceManager, + ) -> List[RankingValue]: + + assert len(ops) > 0 + return [self.rank_operator(op, topology, resource_manager) for op in ops] + + +class DefaultRanker(Ranker[Tuple[int, int]]): + """Ranker implementation.""" + + def rank_operator( + self, + op: PhysicalOperator, + topology: "Topology", + resource_manager: ResourceManager, + ) -> Tuple[int, int]: + """Computes rank for op. *Lower means better rank* + + 1. Whether operator's could be throttled (int) + 2. Operators' object store utilization + + Args: + op: Operators to rank + topology: Current execution topology + resource_manager: Resource manager for usage information + + Returns: + Rank (tuple) for operator + """ + + throttling_disabled = 0 if op.throttling_disabled() else 1 + + return ( + throttling_disabled, + resource_manager.get_op_usage(op).object_store_memory, + ) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 15af300b6094..f1007a6dce0a 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -8,6 +8,7 @@ create_actor_autoscaler, ) from ray.data._internal.cluster_autoscaler import create_cluster_autoscaler +from ray.data._internal.execution import create_ranker from ray.data._internal.execution.backpressure_policy import ( BackpressurePolicy, get_backpressure_policies, @@ -77,19 +78,13 @@ def __init__( dataset_id: str = "unknown_dataset", ): self._data_context = data_context + self._ranker = create_ranker() self._start_time: Optional[float] = None self._initial_stats: Optional[DatasetStats] = None self._final_stats: Optional[DatasetStats] = None self._global_info: Optional[ProgressBar] = None self._progress_manager: Optional[RichExecutionProgressManager] = None - if not self._use_rich_progress() and log_once("rich_progress_disabled"): - logger.info( - "A new progress UI is available. To enable, set " - "`ray.data.DataContext.get_current()." - "enable_rich_progress_bars = True`." - ) - # The executor can be shutdown while still running. self._shutdown_lock = threading.RLock() self._execution_started = False @@ -486,6 +481,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: # If consumer is idling (there's nothing for it to consume) # enforce liveness, ie that at least a single task gets scheduled ensure_liveness=self._consumer_idling(), + ranker=self._ranker, ) if op is None: @@ -669,7 +665,19 @@ def _update_stats_metrics(self, state: str, force_update: bool = False): ) def _use_rich_progress(self): - return self._data_context.enable_rich_progress_bars + rich_enabled = self._data_context.enable_rich_progress_bars + use_ray_tqdm = self._data_context.use_ray_tqdm + + if not rich_enabled or use_ray_tqdm: + if log_once("ray_data_rich_progress_disabled"): + logger.info( + "[dataset]: A new progress UI is available. To enable, " + "set `ray.data.DataContext.get_current()." + "enable_rich_progress_bars = True` and `ray.data." + "DataContext.get_current().use_ray_tqdm = False`." + ) + return False + return True def _validate_dag(dag: PhysicalOperator, limits: ExecutionResources) -> None: diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 5cea878816f6..e8db6b64f4f2 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -34,6 +34,7 @@ HashShuffleProgressBarMixin, ) from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.execution.ranker import Ranker from ray.data._internal.execution.resource_manager import ( ResourceManager, ) @@ -746,6 +747,7 @@ def select_operator_to_run( resource_manager: ResourceManager, backpressure_policies: List[BackpressurePolicy], ensure_liveness: bool, + ranker: "Ranker", ) -> Optional[PhysicalOperator]: """Select next operator to launch new tasks. @@ -769,7 +771,7 @@ def select_operator_to_run( if not eligible_ops: return None - ranks = _rank_operators(eligible_ops, resource_manager) + ranks = ranker.rank_operators(eligible_ops, topology, resource_manager) assert len(eligible_ops) == len(ranks), (eligible_ops, ranks) @@ -778,48 +780,6 @@ def select_operator_to_run( return next_op -def _rank_operators( - ops: List[PhysicalOperator], resource_manager: ResourceManager -) -> List[Tuple]: - """Picks operator to run according to the following semantic: - - Operator to run next is selected as the one with the *smallest* value - of the lexicographically ordered ranks composed of (in order): - - 1. Whether operator's could be throttled (bool) - 2. Operators' object store utilization - - Consider following examples: - - Example 1: - - Operator 1 with rank (True, 1024 bytes) - Operator 2 with rank (False, 2048 bytes) - - In that case Operator 2 will be selected. - - Example 2: - - Operator 1 with rank (True, 1024 bytes) - Operator 2 with rank (True, 2048 bytes) - - In that case Operator 1 will be selected. - """ - - assert len(ops) > 0, ops - - def _ranker(op): - # Rank composition: - # 1. Whether throttling is enabled - # 2. Estimated Object Store usage - return ( - not op.throttling_disabled(), - resource_manager.get_op_usage(op).object_store_memory, - ) - - return [_ranker(op) for op in ops] - - def _actor_info_summary_str(info: _ActorPoolInfo) -> str: total = info.running + info.pending + info.restarting base = f"Actors: {total}" diff --git a/python/ray/data/tests/test_ranker.py b/python/ray/data/tests/test_ranker.py new file mode 100644 index 000000000000..bca6a4fb0a10 --- /dev/null +++ b/python/ray/data/tests/test_ranker.py @@ -0,0 +1,82 @@ +"""Comprehensive tests for the generic ranker type system.""" + +from unittest.mock import MagicMock + +import pytest + +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.execution.ranker import DefaultRanker, Ranker +from ray.data._internal.execution.resource_manager import ResourceManager +from ray.data._internal.execution.streaming_executor_state import Topology + + +def test_default_ranker(): + """Test that the ranker interface works correctly.""" + ranker = DefaultRanker() + + # Mock objects + op1 = MagicMock() + op1.throttling_disabled.return_value = False + op2 = MagicMock() + op2.throttling_disabled.return_value = True + topology = {} + resource_manager = MagicMock() + resource_manager.get_op_usage.return_value = MagicMock() + resource_manager.get_op_usage.return_value.object_store_memory = 1024 + + # Test rank_operator for first op + rank1 = ranker.rank_operator(op1, topology, resource_manager) + assert rank1 == (1, 1024) # throttling_disabled=False -> 1, memory=1024 + + # Test rank_operator for second op + rank2 = ranker.rank_operator(op2, topology, resource_manager) + assert rank2 == (0, 1024) # throttling_disabled=True -> 0, memory=1024 + + # Test rank_operators with both ops + ops = [op1, op2] + ranks = ranker.rank_operators(ops, topology, resource_manager) + assert ranks == [(1, 1024), (0, 1024)] + + +class IntRanker(Ranker[int]): + """Ranker that returns integer rankings.""" + + def rank_operator( + self, + op: PhysicalOperator, + topology: "Topology", + resource_manager: ResourceManager, + ) -> int: + """Return integer ranking.""" + return resource_manager.get_op_usage(op).object_store_memory + + +def test_generic_types(): + """Test that specific generic types work correctly.""" + # Test integer ranker + int_ranker = IntRanker() + op1 = MagicMock() + op2 = MagicMock() + topology = {} + resource_manager = MagicMock() + resource_manager.get_op_usage.return_value = MagicMock() + resource_manager.get_op_usage.return_value.object_store_memory = 1024 + + # Test rank_operator for first op + rank1 = int_ranker.rank_operator(op1, topology, resource_manager) + assert rank1 == 1024 + + # Test rank_operator for second op + rank2 = int_ranker.rank_operator(op2, topology, resource_manager) + assert rank2 == 1024 + + # Test rank_operators with both ops + ops = [op1, op2] + ranks = int_ranker.rank_operators(ops, topology, resource_manager) + assert ranks == [1024, 1024] + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index a6c2629b09d9..9f0bc8ee33a9 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -40,6 +40,7 @@ BlockMapTransformFn, MapTransformer, ) +from ray.data._internal.execution.ranker import DefaultRanker from ray.data._internal.execution.resource_manager import ResourceManager from ray.data._internal.execution.streaming_executor import ( StreamingExecutor, @@ -49,7 +50,6 @@ from ray.data._internal.execution.streaming_executor_state import ( OpBufferQueue, OpState, - _rank_operators, build_streaming_topology, get_eligible_operators, process_completed_tasks, @@ -380,9 +380,10 @@ def _get_op_usage_mocked(op): resource_manager.get_op_usage.side_effect = _get_op_usage_mocked - ranks = _rank_operators([o1, o2, o3, o4], resource_manager) + ranker = DefaultRanker() + ranks = ranker.rank_operators([o1, o2, o3, o4], {}, resource_manager) - assert [(True, 1024), (True, 2048), (True, 4096), (False, 8092)] == ranks + assert [(1, 1024), (1, 2048), (1, 4096), (0, 8092)] == ranks def test_select_ops_to_run(ray_start_regular_shared): @@ -428,7 +429,11 @@ def _get_op_usage_mocked(op): topo = build_streaming_topology(o4, opts) selected = select_operator_to_run( - topo, resource_manager, [], ensure_liveness=ensure_liveness + topo, + resource_manager, + [], + ensure_liveness=ensure_liveness, + ranker=DefaultRanker(), ) assert selected is o4 @@ -439,7 +444,11 @@ def _get_op_usage_mocked(op): topo = build_streaming_topology(o3, opts) selected = select_operator_to_run( - topo, resource_manager, [], ensure_liveness=ensure_liveness + topo, + resource_manager, + [], + ensure_liveness=ensure_liveness, + ranker=DefaultRanker(), ) assert selected is o1 @@ -808,6 +817,7 @@ def test_callbacks_initialized_once(): mock_init.assert_not_called() +@pytest.mark.skip("Fails on RayTurbo because different read logical ops") def test_execution_callbacks_executor_arg(tmp_path, restore_data_context): """Test the executor arg in ExecutionCallback.""" From 9ab06d34f307a04ec089b409e3ebba308eea0d1c Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Mon, 10 Nov 2025 14:15:36 -0800 Subject: [PATCH 2/5] clean Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/__init__.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/python/ray/data/_internal/execution/__init__.py b/python/ray/data/_internal/execution/__init__.py index 8569bde8d3fd..c6a4c3a208b7 100644 --- a/python/ray/data/_internal/execution/__init__.py +++ b/python/ray/data/_internal/execution/__init__.py @@ -5,10 +5,4 @@ def create_ranker() -> Ranker: """Create a ranker instance based on environment and configuration.""" from ray._private.ray_constants import env_bool - # Check if RayTurbo ranker should be used - if env_bool("RAY_DATA_USE_TURBO_RANKER", True): - from ray.anyscale.data._internal.execution.ranker import LocationAwareRanker - - return LocationAwareRanker() - else: - return DefaultRanker() + return DefaultRanker() From aa93a5b1dcb50948bcdcb749757ffdfcb85cf3be Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Mon, 10 Nov 2025 14:17:29 -0800 Subject: [PATCH 3/5] docstring Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/ranker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/execution/ranker.py b/python/ray/data/_internal/execution/ranker.py index 72d22b901305..d45af0701af6 100644 --- a/python/ray/data/_internal/execution/ranker.py +++ b/python/ray/data/_internal/execution/ranker.py @@ -49,7 +49,7 @@ def rank_operator( of the lexicographically ordered ranks composed of (in order): Args: - ops: Operator to rank + op: Operator to rank topology: Current execution topology resource_manager: Resource manager for usage information @@ -84,7 +84,7 @@ def rank_operator( 2. Operators' object store utilization Args: - op: Operators to rank + op: Operator to rank topology: Current execution topology resource_manager: Resource manager for usage information From e00e6aad83ef0210906681888c05186e3644cce3 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Mon, 10 Nov 2025 14:19:19 -0800 Subject: [PATCH 4/5] clean Signed-off-by: iamjustinhsu --- python/ray/data/BUILD.bazel | 13 ------------- python/ray/data/tests/test_streaming_executor.py | 1 - 2 files changed, 14 deletions(-) diff --git a/python/ray/data/BUILD.bazel b/python/ray/data/BUILD.bazel index a6c13e48431b..cb205eca356f 100644 --- a/python/ray/data/BUILD.bazel +++ b/python/ray/data/BUILD.bazel @@ -66,19 +66,6 @@ py_test_module_list( ], ) -py_test_module_list( - size = "small", - files = glob(["tests/anyscale/test_*.py"]), - tags = [ - "exclusive", - "team:data", - ], - deps = [ - ":conftest", - "//:ray_lib", - ], -) - py_test( name = "test_formats", size = "medium", diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 9f0bc8ee33a9..c48bb80a4abf 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -817,7 +817,6 @@ def test_callbacks_initialized_once(): mock_init.assert_not_called() -@pytest.mark.skip("Fails on RayTurbo because different read logical ops") def test_execution_callbacks_executor_arg(tmp_path, restore_data_context): """Test the executor arg in ExecutionCallback.""" From 4bbcbbd29aff9637b038b13b4fbb951ce19798ab Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Mon, 10 Nov 2025 14:20:06 -0800 Subject: [PATCH 5/5] clean Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/ray/data/_internal/execution/__init__.py b/python/ray/data/_internal/execution/__init__.py index c6a4c3a208b7..49ea853c77cd 100644 --- a/python/ray/data/_internal/execution/__init__.py +++ b/python/ray/data/_internal/execution/__init__.py @@ -3,6 +3,4 @@ def create_ranker() -> Ranker: """Create a ranker instance based on environment and configuration.""" - from ray._private.ray_constants import env_bool - return DefaultRanker()