Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions python/ray/data/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ py_test_module_list(
],
)

py_test_module_list(
size = "small",
files = glob(["tests/anyscale/test_*.py"]),
tags = [
"exclusive",
"team:data",
],
deps = [
":conftest",
"//:ray_lib",
],
)

py_test(
name = "test_formats",
size = "medium",
Expand Down Expand Up @@ -1255,6 +1268,20 @@ py_test(
],
)

py_test(
name = "test_ranker",
size = "small",
srcs = ["tests/test_ranker.py"],
tags = [
"exclusive",
"team:data",
],
deps = [
":conftest",
"//:ray_lib",
],
)

py_test(
name = "test_raydp",
size = "medium",
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/_internal/execution/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .ranker import DefaultRanker, Ranker


def create_ranker() -> Ranker:
"""Create a ranker instance based on environment and configuration."""
from ray._private.ray_constants import env_bool

return DefaultRanker()
100 changes: 100 additions & 0 deletions python/ray/data/_internal/execution/ranker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Ranker component for operator selection in streaming executor."""

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Generic, List, Protocol, Tuple, TypeVar

from ray.data._internal.execution.interfaces import PhysicalOperator
from ray.data._internal.execution.resource_manager import ResourceManager

if TYPE_CHECKING:
from ray.data._internal.execution.streaming_executor_state import Topology

# Protocol for comparable ranking values
class Comparable(Protocol):
"""Protocol for types that can be compared for ranking."""

def __lt__(self, other: "Comparable") -> bool:
...

def __le__(self, other: "Comparable") -> bool:
...

def __gt__(self, other: "Comparable") -> bool:
...

def __ge__(self, other: "Comparable") -> bool:
...

def __eq__(self, other: "Comparable") -> bool:
...


# Generic type for comparable ranking values
RankingValue = TypeVar("RankingValue", bound=Comparable)


class Ranker(ABC, Generic[RankingValue]):
"""Abstract base class for operator ranking strategies."""

@abstractmethod
def rank_operator(
self,
op: PhysicalOperator,
topology: "Topology",
resource_manager: ResourceManager,
) -> RankingValue:
"""Rank operator for execution priority.

Operator to run next is selected as the one with the *smallest* value
of the lexicographically ordered ranks composed of (in order):

Args:
ops: Operator to rank
topology: Current execution topology
resource_manager: Resource manager for usage information

Returns:
Rank (tuple) for operator
"""
pass

def rank_operators(
self,
ops: List[PhysicalOperator],
topology: "Topology",
resource_manager: ResourceManager,
) -> List[RankingValue]:

assert len(ops) > 0
return [self.rank_operator(op, topology, resource_manager) for op in ops]


class DefaultRanker(Ranker[Tuple[int, int]]):
"""Ranker implementation."""

def rank_operator(
self,
op: PhysicalOperator,
topology: "Topology",
resource_manager: ResourceManager,
) -> Tuple[int, int]:
"""Computes rank for op. *Lower means better rank*

1. Whether operator's could be throttled (int)
2. Operators' object store utilization

Args:
op: Operators to rank
topology: Current execution topology
resource_manager: Resource manager for usage information

Returns:
Rank (tuple) for operator
"""

throttling_disabled = 0 if op.throttling_disabled() else 1

return (
throttling_disabled,
resource_manager.get_op_usage(op).object_store_memory,
)
3 changes: 3 additions & 0 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
create_actor_autoscaler,
)
from ray.data._internal.cluster_autoscaler import create_cluster_autoscaler
from ray.data._internal.execution import create_ranker
from ray.data._internal.execution.backpressure_policy import (
BackpressurePolicy,
get_backpressure_policies,
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
dataset_id: str = "unknown_dataset",
):
self._data_context = data_context
self._ranker = create_ranker()
self._start_time: Optional[float] = None
self._initial_stats: Optional[DatasetStats] = None
self._final_stats: Optional[DatasetStats] = None
Expand Down Expand Up @@ -479,6 +481,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
# If consumer is idling (there's nothing for it to consume)
# enforce liveness, ie that at least a single task gets scheduled
ensure_liveness=self._consumer_idling(),
ranker=self._ranker,
)

if op is None:
Expand Down
46 changes: 3 additions & 43 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
HashShuffleProgressBarMixin,
)
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.ranker import Ranker
from ray.data._internal.execution.resource_manager import (
ResourceManager,
)
Expand Down Expand Up @@ -746,6 +747,7 @@ def select_operator_to_run(
resource_manager: ResourceManager,
backpressure_policies: List[BackpressurePolicy],
ensure_liveness: bool,
ranker: "Ranker",
) -> Optional[PhysicalOperator]:
"""Select next operator to launch new tasks.

Expand All @@ -769,7 +771,7 @@ def select_operator_to_run(
if not eligible_ops:
return None

ranks = _rank_operators(eligible_ops, resource_manager)
ranks = ranker.rank_operators(eligible_ops, topology, resource_manager)

assert len(eligible_ops) == len(ranks), (eligible_ops, ranks)

Expand All @@ -778,48 +780,6 @@ def select_operator_to_run(
return next_op


def _rank_operators(
ops: List[PhysicalOperator], resource_manager: ResourceManager
) -> List[Tuple]:
"""Picks operator to run according to the following semantic:

Operator to run next is selected as the one with the *smallest* value
of the lexicographically ordered ranks composed of (in order):

1. Whether operator's could be throttled (bool)
2. Operators' object store utilization

Consider following examples:

Example 1:

Operator 1 with rank (True, 1024 bytes)
Operator 2 with rank (False, 2048 bytes)

In that case Operator 2 will be selected.

Example 2:

Operator 1 with rank (True, 1024 bytes)
Operator 2 with rank (True, 2048 bytes)

In that case Operator 1 will be selected.
"""

assert len(ops) > 0, ops

def _ranker(op):
# Rank composition:
# 1. Whether throttling is enabled
# 2. Estimated Object Store usage
return (
not op.throttling_disabled(),
resource_manager.get_op_usage(op).object_store_memory,
)

return [_ranker(op) for op in ops]


def _actor_info_summary_str(info: _ActorPoolInfo) -> str:
total = info.running + info.pending + info.restarting
base = f"Actors: {total}"
Expand Down
82 changes: 82 additions & 0 deletions python/ray/data/tests/test_ranker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Comprehensive tests for the generic ranker type system."""

from unittest.mock import MagicMock

import pytest

from ray.data._internal.execution.interfaces import PhysicalOperator
from ray.data._internal.execution.ranker import DefaultRanker, Ranker
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.streaming_executor_state import Topology


def test_default_ranker():
"""Test that the ranker interface works correctly."""
ranker = DefaultRanker()

# Mock objects
op1 = MagicMock()
op1.throttling_disabled.return_value = False
op2 = MagicMock()
op2.throttling_disabled.return_value = True
topology = {}
resource_manager = MagicMock()
resource_manager.get_op_usage.return_value = MagicMock()
resource_manager.get_op_usage.return_value.object_store_memory = 1024

# Test rank_operator for first op
rank1 = ranker.rank_operator(op1, topology, resource_manager)
assert rank1 == (1, 1024) # throttling_disabled=False -> 1, memory=1024

# Test rank_operator for second op
rank2 = ranker.rank_operator(op2, topology, resource_manager)
assert rank2 == (0, 1024) # throttling_disabled=True -> 0, memory=1024

# Test rank_operators with both ops
ops = [op1, op2]
ranks = ranker.rank_operators(ops, topology, resource_manager)
assert ranks == [(1, 1024), (0, 1024)]


class IntRanker(Ranker[int]):
"""Ranker that returns integer rankings."""

def rank_operator(
self,
op: PhysicalOperator,
topology: "Topology",
resource_manager: ResourceManager,
) -> int:
"""Return integer ranking."""
return resource_manager.get_op_usage(op).object_store_memory


def test_generic_types():
"""Test that specific generic types work correctly."""
# Test integer ranker
int_ranker = IntRanker()
op1 = MagicMock()
op2 = MagicMock()
topology = {}
resource_manager = MagicMock()
resource_manager.get_op_usage.return_value = MagicMock()
resource_manager.get_op_usage.return_value.object_store_memory = 1024

# Test rank_operator for first op
rank1 = int_ranker.rank_operator(op1, topology, resource_manager)
assert rank1 == 1024

# Test rank_operator for second op
rank2 = int_ranker.rank_operator(op2, topology, resource_manager)
assert rank2 == 1024

# Test rank_operators with both ops
ops = [op1, op2]
ranks = int_ranker.rank_operators(ops, topology, resource_manager)
assert ranks == [1024, 1024]


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
20 changes: 15 additions & 5 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
BlockMapTransformFn,
MapTransformer,
)
from ray.data._internal.execution.ranker import DefaultRanker
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.streaming_executor import (
StreamingExecutor,
Expand All @@ -49,7 +50,6 @@
from ray.data._internal.execution.streaming_executor_state import (
OpBufferQueue,
OpState,
_rank_operators,
build_streaming_topology,
get_eligible_operators,
process_completed_tasks,
Expand Down Expand Up @@ -380,9 +380,10 @@ def _get_op_usage_mocked(op):

resource_manager.get_op_usage.side_effect = _get_op_usage_mocked

ranks = _rank_operators([o1, o2, o3, o4], resource_manager)
ranker = DefaultRanker()
ranks = ranker.rank_operators([o1, o2, o3, o4], {}, resource_manager)

assert [(True, 1024), (True, 2048), (True, 4096), (False, 8092)] == ranks
assert [(1, 1024), (1, 2048), (1, 4096), (0, 8092)] == ranks


def test_select_ops_to_run(ray_start_regular_shared):
Expand Down Expand Up @@ -428,7 +429,11 @@ def _get_op_usage_mocked(op):
topo = build_streaming_topology(o4, opts)

selected = select_operator_to_run(
topo, resource_manager, [], ensure_liveness=ensure_liveness
topo,
resource_manager,
[],
ensure_liveness=ensure_liveness,
ranker=DefaultRanker(),
)

assert selected is o4
Expand All @@ -439,7 +444,11 @@ def _get_op_usage_mocked(op):
topo = build_streaming_topology(o3, opts)

selected = select_operator_to_run(
topo, resource_manager, [], ensure_liveness=ensure_liveness
topo,
resource_manager,
[],
ensure_liveness=ensure_liveness,
ranker=DefaultRanker(),
)

assert selected is o1
Expand Down Expand Up @@ -808,6 +817,7 @@ def test_callbacks_initialized_once():
mock_init.assert_not_called()


@pytest.mark.skip("Fails on RayTurbo because different read logical ops")
def test_execution_callbacks_executor_arg(tmp_path, restore_data_context):
"""Test the executor arg in ExecutionCallback."""

Expand Down