diff --git a/python/ray/data/BUILD.bazel b/python/ray/data/BUILD.bazel index 696860549b92..cb205eca356f 100644 --- a/python/ray/data/BUILD.bazel +++ b/python/ray/data/BUILD.bazel @@ -1255,6 +1255,20 @@ py_test( ], ) +py_test( + name = "test_ranker", + size = "small", + srcs = ["tests/test_ranker.py"], + tags = [ + "exclusive", + "team:data", + ], + deps = [ + ":conftest", + "//:ray_lib", + ], +) + py_test( name = "test_raydp", size = "medium", diff --git a/python/ray/data/_internal/execution/__init__.py b/python/ray/data/_internal/execution/__init__.py index e69de29bb2d1..49ea853c77cd 100644 --- a/python/ray/data/_internal/execution/__init__.py +++ b/python/ray/data/_internal/execution/__init__.py @@ -0,0 +1,6 @@ +from .ranker import DefaultRanker, Ranker + + +def create_ranker() -> Ranker: + """Create a ranker instance based on environment and configuration.""" + return DefaultRanker() diff --git a/python/ray/data/_internal/execution/ranker.py b/python/ray/data/_internal/execution/ranker.py new file mode 100644 index 000000000000..d45af0701af6 --- /dev/null +++ b/python/ray/data/_internal/execution/ranker.py @@ -0,0 +1,100 @@ +"""Ranker component for operator selection in streaming executor.""" + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Generic, List, Protocol, Tuple, TypeVar + +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.execution.resource_manager import ResourceManager + +if TYPE_CHECKING: + from ray.data._internal.execution.streaming_executor_state import Topology + +# Protocol for comparable ranking values +class Comparable(Protocol): + """Protocol for types that can be compared for ranking.""" + + def __lt__(self, other: "Comparable") -> bool: + ... + + def __le__(self, other: "Comparable") -> bool: + ... + + def __gt__(self, other: "Comparable") -> bool: + ... + + def __ge__(self, other: "Comparable") -> bool: + ... + + def __eq__(self, other: "Comparable") -> bool: + ... + + +# Generic type for comparable ranking values +RankingValue = TypeVar("RankingValue", bound=Comparable) + + +class Ranker(ABC, Generic[RankingValue]): + """Abstract base class for operator ranking strategies.""" + + @abstractmethod + def rank_operator( + self, + op: PhysicalOperator, + topology: "Topology", + resource_manager: ResourceManager, + ) -> RankingValue: + """Rank operator for execution priority. + + Operator to run next is selected as the one with the *smallest* value + of the lexicographically ordered ranks composed of (in order): + + Args: + op: Operator to rank + topology: Current execution topology + resource_manager: Resource manager for usage information + + Returns: + Rank (tuple) for operator + """ + pass + + def rank_operators( + self, + ops: List[PhysicalOperator], + topology: "Topology", + resource_manager: ResourceManager, + ) -> List[RankingValue]: + + assert len(ops) > 0 + return [self.rank_operator(op, topology, resource_manager) for op in ops] + + +class DefaultRanker(Ranker[Tuple[int, int]]): + """Ranker implementation.""" + + def rank_operator( + self, + op: PhysicalOperator, + topology: "Topology", + resource_manager: ResourceManager, + ) -> Tuple[int, int]: + """Computes rank for op. *Lower means better rank* + + 1. Whether operator's could be throttled (int) + 2. Operators' object store utilization + + Args: + op: Operator to rank + topology: Current execution topology + resource_manager: Resource manager for usage information + + Returns: + Rank (tuple) for operator + """ + + throttling_disabled = 0 if op.throttling_disabled() else 1 + + return ( + throttling_disabled, + resource_manager.get_op_usage(op).object_store_memory, + ) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index c8f57ea2f79e..f1007a6dce0a 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -8,6 +8,7 @@ create_actor_autoscaler, ) from ray.data._internal.cluster_autoscaler import create_cluster_autoscaler +from ray.data._internal.execution import create_ranker from ray.data._internal.execution.backpressure_policy import ( BackpressurePolicy, get_backpressure_policies, @@ -77,6 +78,7 @@ def __init__( dataset_id: str = "unknown_dataset", ): self._data_context = data_context + self._ranker = create_ranker() self._start_time: Optional[float] = None self._initial_stats: Optional[DatasetStats] = None self._final_stats: Optional[DatasetStats] = None @@ -479,6 +481,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: # If consumer is idling (there's nothing for it to consume) # enforce liveness, ie that at least a single task gets scheduled ensure_liveness=self._consumer_idling(), + ranker=self._ranker, ) if op is None: diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 5cea878816f6..e8db6b64f4f2 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -34,6 +34,7 @@ HashShuffleProgressBarMixin, ) from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.execution.ranker import Ranker from ray.data._internal.execution.resource_manager import ( ResourceManager, ) @@ -746,6 +747,7 @@ def select_operator_to_run( resource_manager: ResourceManager, backpressure_policies: List[BackpressurePolicy], ensure_liveness: bool, + ranker: "Ranker", ) -> Optional[PhysicalOperator]: """Select next operator to launch new tasks. @@ -769,7 +771,7 @@ def select_operator_to_run( if not eligible_ops: return None - ranks = _rank_operators(eligible_ops, resource_manager) + ranks = ranker.rank_operators(eligible_ops, topology, resource_manager) assert len(eligible_ops) == len(ranks), (eligible_ops, ranks) @@ -778,48 +780,6 @@ def select_operator_to_run( return next_op -def _rank_operators( - ops: List[PhysicalOperator], resource_manager: ResourceManager -) -> List[Tuple]: - """Picks operator to run according to the following semantic: - - Operator to run next is selected as the one with the *smallest* value - of the lexicographically ordered ranks composed of (in order): - - 1. Whether operator's could be throttled (bool) - 2. Operators' object store utilization - - Consider following examples: - - Example 1: - - Operator 1 with rank (True, 1024 bytes) - Operator 2 with rank (False, 2048 bytes) - - In that case Operator 2 will be selected. - - Example 2: - - Operator 1 with rank (True, 1024 bytes) - Operator 2 with rank (True, 2048 bytes) - - In that case Operator 1 will be selected. - """ - - assert len(ops) > 0, ops - - def _ranker(op): - # Rank composition: - # 1. Whether throttling is enabled - # 2. Estimated Object Store usage - return ( - not op.throttling_disabled(), - resource_manager.get_op_usage(op).object_store_memory, - ) - - return [_ranker(op) for op in ops] - - def _actor_info_summary_str(info: _ActorPoolInfo) -> str: total = info.running + info.pending + info.restarting base = f"Actors: {total}" diff --git a/python/ray/data/tests/test_ranker.py b/python/ray/data/tests/test_ranker.py new file mode 100644 index 000000000000..bca6a4fb0a10 --- /dev/null +++ b/python/ray/data/tests/test_ranker.py @@ -0,0 +1,82 @@ +"""Comprehensive tests for the generic ranker type system.""" + +from unittest.mock import MagicMock + +import pytest + +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.execution.ranker import DefaultRanker, Ranker +from ray.data._internal.execution.resource_manager import ResourceManager +from ray.data._internal.execution.streaming_executor_state import Topology + + +def test_default_ranker(): + """Test that the ranker interface works correctly.""" + ranker = DefaultRanker() + + # Mock objects + op1 = MagicMock() + op1.throttling_disabled.return_value = False + op2 = MagicMock() + op2.throttling_disabled.return_value = True + topology = {} + resource_manager = MagicMock() + resource_manager.get_op_usage.return_value = MagicMock() + resource_manager.get_op_usage.return_value.object_store_memory = 1024 + + # Test rank_operator for first op + rank1 = ranker.rank_operator(op1, topology, resource_manager) + assert rank1 == (1, 1024) # throttling_disabled=False -> 1, memory=1024 + + # Test rank_operator for second op + rank2 = ranker.rank_operator(op2, topology, resource_manager) + assert rank2 == (0, 1024) # throttling_disabled=True -> 0, memory=1024 + + # Test rank_operators with both ops + ops = [op1, op2] + ranks = ranker.rank_operators(ops, topology, resource_manager) + assert ranks == [(1, 1024), (0, 1024)] + + +class IntRanker(Ranker[int]): + """Ranker that returns integer rankings.""" + + def rank_operator( + self, + op: PhysicalOperator, + topology: "Topology", + resource_manager: ResourceManager, + ) -> int: + """Return integer ranking.""" + return resource_manager.get_op_usage(op).object_store_memory + + +def test_generic_types(): + """Test that specific generic types work correctly.""" + # Test integer ranker + int_ranker = IntRanker() + op1 = MagicMock() + op2 = MagicMock() + topology = {} + resource_manager = MagicMock() + resource_manager.get_op_usage.return_value = MagicMock() + resource_manager.get_op_usage.return_value.object_store_memory = 1024 + + # Test rank_operator for first op + rank1 = int_ranker.rank_operator(op1, topology, resource_manager) + assert rank1 == 1024 + + # Test rank_operator for second op + rank2 = int_ranker.rank_operator(op2, topology, resource_manager) + assert rank2 == 1024 + + # Test rank_operators with both ops + ops = [op1, op2] + ranks = int_ranker.rank_operators(ops, topology, resource_manager) + assert ranks == [1024, 1024] + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index a6c2629b09d9..c48bb80a4abf 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -40,6 +40,7 @@ BlockMapTransformFn, MapTransformer, ) +from ray.data._internal.execution.ranker import DefaultRanker from ray.data._internal.execution.resource_manager import ResourceManager from ray.data._internal.execution.streaming_executor import ( StreamingExecutor, @@ -49,7 +50,6 @@ from ray.data._internal.execution.streaming_executor_state import ( OpBufferQueue, OpState, - _rank_operators, build_streaming_topology, get_eligible_operators, process_completed_tasks, @@ -380,9 +380,10 @@ def _get_op_usage_mocked(op): resource_manager.get_op_usage.side_effect = _get_op_usage_mocked - ranks = _rank_operators([o1, o2, o3, o4], resource_manager) + ranker = DefaultRanker() + ranks = ranker.rank_operators([o1, o2, o3, o4], {}, resource_manager) - assert [(True, 1024), (True, 2048), (True, 4096), (False, 8092)] == ranks + assert [(1, 1024), (1, 2048), (1, 4096), (0, 8092)] == ranks def test_select_ops_to_run(ray_start_regular_shared): @@ -428,7 +429,11 @@ def _get_op_usage_mocked(op): topo = build_streaming_topology(o4, opts) selected = select_operator_to_run( - topo, resource_manager, [], ensure_liveness=ensure_liveness + topo, + resource_manager, + [], + ensure_liveness=ensure_liveness, + ranker=DefaultRanker(), ) assert selected is o4 @@ -439,7 +444,11 @@ def _get_op_usage_mocked(op): topo = build_streaming_topology(o3, opts) selected = select_operator_to_run( - topo, resource_manager, [], ensure_liveness=ensure_liveness + topo, + resource_manager, + [], + ensure_liveness=ensure_liveness, + ranker=DefaultRanker(), ) assert selected is o1