Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data._internal.memory_tracing import trace_allocation
from ray.data.block import BlockMetadata
from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces.physical_operator import (
Expand Down Expand Up @@ -684,7 +685,11 @@ def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]:
if bytes_per_output is None:
if context.target_max_block_size is None:
return None
bytes_per_output = context.target_max_block_size
else:
# Block size can be up to MAX_SAFE_BLOCK_SIZE_FACTOR larger before being sliced.
bytes_per_output = (
context.target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR
)

num_pending_outputs = context._max_num_blocks_in_streaming_gen_buffer
if self.average_num_outputs_per_task is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.output_splitter import OutputSplitter
from ray.data._internal.execution.util import make_ref_bundles
from ray.data.context import DataContext
from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR, DataContext
from ray.data.tests.conftest import * # noqa
from ray.data.tests.test_operators import _mul2_map_data_prcessor
from ray.data.tests.util import run_op_tasks_sync
Expand Down Expand Up @@ -312,6 +312,7 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co
inc_obj_store_mem = (
data_context._max_num_blocks_in_streaming_gen_buffer
* data_context.target_max_block_size
* MAX_SAFE_BLOCK_SIZE_FACTOR
)
min_resource_usage, _ = op.min_max_resource_requirements()
assert min_resource_usage == ExecutionResources(
Expand Down Expand Up @@ -426,6 +427,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
inc_obj_store_mem = (
data_context._max_num_blocks_in_streaming_gen_buffer
* data_context.target_max_block_size
* MAX_SAFE_BLOCK_SIZE_FACTOR
)
min_resource_usage, _ = op.min_max_resource_requirements()
assert min_resource_usage == ExecutionResources(
Expand Down
125 changes: 125 additions & 0 deletions python/ray/data/tests/test_op_runtime_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
)
from ray.data._internal.util import KiB
from ray.data.block import BlockExecStats, BlockMetadata
from ray.data.context import (
MAX_SAFE_BLOCK_SIZE_FACTOR,
DataContext,
)


def test_average_max_uss_per_task():
Expand Down Expand Up @@ -213,6 +217,127 @@ def create_bundle_with_rows(num_rows):
metrics.block_size_rows._bucket_counts[expected_bucket] = 0


@pytest.fixture
def metrics_config_no_sample_with_target(restore_data_context): # noqa: F811
"""Fixture for no-sample scenario with target_max_block_size set."""
ctx = DataContext.get_current()
ctx.target_max_block_size = 128 * 1024 * 1024 # 128MB
ctx._max_num_blocks_in_streaming_gen_buffer = 2

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)
return metrics


@pytest.fixture
def metrics_config_no_sample_with_none(restore_data_context): # noqa: F811
"""Fixture for no-sample scenario with target_max_block_size=None."""
ctx = DataContext.get_current()
ctx.target_max_block_size = None
ctx._max_num_blocks_in_streaming_gen_buffer = 1

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)
return metrics


@pytest.fixture
def metrics_config_with_sample(restore_data_context): # noqa: F811
"""Fixture for scenario with average_bytes_per_output available."""
ctx = DataContext.get_current()
ctx.target_max_block_size = 128 * 1024 * 1024 # 128MB
ctx._max_num_blocks_in_streaming_gen_buffer = 1

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)

# Simulate having samples: set bytes_task_outputs_generated and
# num_task_outputs_generated to make average_bytes_per_output available
actual_block_size = 150 * 1024 * 1024 # 150MB
metrics.bytes_task_outputs_generated = actual_block_size
metrics.num_task_outputs_generated = 1

return metrics


@pytest.fixture
def metrics_config_pending_outputs_no_sample(
restore_data_context, # noqa: F811
):
"""Fixture for pending outputs during no-sample with target set."""
ctx = DataContext.get_current()
ctx.target_max_block_size = 64 * 1024 * 1024 # 64MB
ctx._max_num_blocks_in_streaming_gen_buffer = 2

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)
metrics.num_tasks_running = 3
return metrics


@pytest.fixture
def metrics_config_pending_outputs_none(restore_data_context): # noqa: F811
"""Fixture for pending outputs during no-sample with target=None."""
ctx = DataContext.get_current()
ctx.target_max_block_size = None
ctx._max_num_blocks_in_streaming_gen_buffer = 1

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)
metrics.num_tasks_running = 2
return metrics


@pytest.mark.parametrize(
"metrics_fixture,test_property,expected_calculator",
[
(
"metrics_config_no_sample_with_target",
"obj_store_mem_max_pending_output_per_task",
lambda m: (
m._op.data_context.target_max_block_size
* MAX_SAFE_BLOCK_SIZE_FACTOR
* m._op.data_context._max_num_blocks_in_streaming_gen_buffer
),
),
(
"metrics_config_with_sample",
"obj_store_mem_max_pending_output_per_task",
lambda m: (
m.average_bytes_per_output
* m._op.data_context._max_num_blocks_in_streaming_gen_buffer
),
),
(
"metrics_config_pending_outputs_no_sample",
"obj_store_mem_pending_task_outputs",
lambda m: (
m.num_tasks_running
* m._op.data_context.target_max_block_size
* MAX_SAFE_BLOCK_SIZE_FACTOR
* m._op.data_context._max_num_blocks_in_streaming_gen_buffer
),
),
],
)
def test_obj_store_mem_estimation(
request, metrics_fixture, test_property, expected_calculator
):
"""Test object store memory estimation for various scenarios."""
metrics = request.getfixturevalue(metrics_fixture)
actual = getattr(metrics, test_property)
expected = expected_calculator(metrics)

assert (
actual == expected
), f"Expected {test_property} to be {expected}, got {actual}"


if __name__ == "__main__":
import sys

Expand Down
27 changes: 18 additions & 9 deletions python/ray/data/tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
build_streaming_topology,
)
from ray.data._internal.execution.util import make_ref_bundles
from ray.data.context import DataContext
from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR, DataContext
from ray.data.tests.conftest import * # noqa


Expand Down Expand Up @@ -333,14 +333,19 @@ def test_object_store_usage(self, restore_data_context):
assert resource_manager.get_op_usage(o2).object_store_memory == 0
assert resource_manager.get_op_usage(o3).object_store_memory == 0

# Operators estimate pending task outputs using the target max block size.
# In this case, the target max block size is 2 and there is at most 1 block
# in the streaming generator buffer, so the estimated usage is 2.
# Operators estimate pending task outputs using the target max block size
# multiplied by MAX_SAFE_BLOCK_SIZE_FACTOR (1.5) during no-sample phase.
# In this case, the target max block size is 2, MAX_SAFE_BLOCK_SIZE_FACTOR
# is 1.5, and there is at most 1 block in the streaming generator buffer,
# so the estimated usage is 2 * 1.5 * 1 = 3.
o2.metrics.on_input_dequeued(input)
o2.metrics.on_task_submitted(0, input)
resource_manager.update_usages()
# target_max_block_size * factor * max_blocks
expected_usage = 2 * MAX_SAFE_BLOCK_SIZE_FACTOR * 1
assert resource_manager.get_op_usage(o1).object_store_memory == 0
assert resource_manager.get_op_usage(o2).object_store_memory == 2
op2_usage = resource_manager.get_op_usage(o2).object_store_memory
assert op2_usage == expected_usage
assert resource_manager.get_op_usage(o3).object_store_memory == 0

# When the task finishes, we move the data from the streaming generator to the
Expand All @@ -367,15 +372,19 @@ def test_object_store_usage(self, restore_data_context):
assert resource_manager.get_op_usage(o2).object_store_memory == 1
assert resource_manager.get_op_usage(o3).object_store_memory == 0

# Task inputs count toward the previous operator's object store memory usage,
# and task outputs count toward the current operator's object store memory
# usage.
# Task inputs count toward the previous operator's object store memory
# usage, and task outputs count toward the current operator's object
# store memory usage. During no-sample phase, pending outputs are
# estimated using target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR.
o3.metrics.on_input_dequeued(input)
o3.metrics.on_task_submitted(0, input)
resource_manager.update_usages()
assert resource_manager.get_op_usage(o1).object_store_memory == 0
assert resource_manager.get_op_usage(o2).object_store_memory == 1
assert resource_manager.get_op_usage(o3).object_store_memory == 2
# target_max_block_size (2) * factor (1.5) * max_blocks (1) = 3
expected_o3_usage = 2 * MAX_SAFE_BLOCK_SIZE_FACTOR * 1
op3_usage = resource_manager.get_op_usage(o3).object_store_memory
assert op3_usage == expected_o3_usage

# Task inputs no longer count once the task is finished.
o3.metrics.on_output_queued(input)
Expand Down