Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data._internal.memory_tracing import trace_allocation
from ray.data.block import BlockMetadata
from ray.data.context import (
DEFAULT_TARGET_MAX_BLOCK_SIZE,
MAX_SAFE_BLOCK_SIZE_FACTOR,
)

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces.physical_operator import (
Expand Down Expand Up @@ -679,12 +683,18 @@ def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]:
return None

bytes_per_output = self.average_bytes_per_output
# If we don’t have a sample yet and the limit is “unlimited”, we can’t
# estimate – just bail out.
if bytes_per_output is None:
# If we don't have a sample yet, use target_max_block_size, but
# account for the fact that blocks can be up to
# MAX_SAFE_BLOCK_SIZE_FACTOR larger before being sliced.
if context.target_max_block_size is None:
return None
bytes_per_output = context.target_max_block_size
bytes_per_output = (
DEFAULT_TARGET_MAX_BLOCK_SIZE * MAX_SAFE_BLOCK_SIZE_FACTOR
)
else:
bytes_per_output = (
context.target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR
)
Comment on lines 685 to 692
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for determining bytes_per_output when a sample is not available can be simplified to reduce code duplication. Both branches of the if/else statement perform the same multiplication with MAX_SAFE_BLOCK_SIZE_FACTOR. You can determine the target_block_size first and then perform the multiplication once.

            target_block_size = context.target_max_block_size
            if target_block_size is None:
                target_block_size = DEFAULT_TARGET_MAX_BLOCK_SIZE
            bytes_per_output = target_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR


num_pending_outputs = context._max_num_blocks_in_streaming_gen_buffer
if self.average_num_outputs_per_task is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.output_splitter import OutputSplitter
from ray.data._internal.execution.util import make_ref_bundles
from ray.data.context import DataContext
from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR, DataContext
from ray.data.tests.conftest import * # noqa
from ray.data.tests.test_operators import _mul2_map_data_prcessor
from ray.data.tests.util import run_op_tasks_sync
Expand Down Expand Up @@ -312,6 +312,7 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co
inc_obj_store_mem = (
data_context._max_num_blocks_in_streaming_gen_buffer
* data_context.target_max_block_size
* MAX_SAFE_BLOCK_SIZE_FACTOR
)
min_resource_usage, _ = op.min_max_resource_requirements()
assert min_resource_usage == ExecutionResources(
Expand Down Expand Up @@ -426,6 +427,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
inc_obj_store_mem = (
data_context._max_num_blocks_in_streaming_gen_buffer
* data_context.target_max_block_size
* MAX_SAFE_BLOCK_SIZE_FACTOR
)
min_resource_usage, _ = op.min_max_resource_requirements()
assert min_resource_usage == ExecutionResources(
Expand Down
145 changes: 145 additions & 0 deletions python/ray/data/tests/test_op_runtime_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
)
from ray.data._internal.util import KiB
from ray.data.block import BlockExecStats, BlockMetadata
from ray.data.context import (
DEFAULT_TARGET_MAX_BLOCK_SIZE,
MAX_SAFE_BLOCK_SIZE_FACTOR,
DataContext,
)


def test_average_max_uss_per_task():
Expand Down Expand Up @@ -213,6 +218,146 @@ def create_bundle_with_rows(num_rows):
metrics.block_size_rows._bucket_counts[expected_bucket] = 0


@pytest.fixture
def metrics_config_no_sample_with_target(restore_data_context): # noqa: F811
"""Fixture for no-sample scenario with target_max_block_size set."""
ctx = DataContext.get_current()
ctx.target_max_block_size = 128 * 1024 * 1024 # 128MB
ctx._max_num_blocks_in_streaming_gen_buffer = 2

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)
return metrics


@pytest.fixture
def metrics_config_no_sample_with_none(restore_data_context): # noqa: F811
"""Fixture for no-sample scenario with target_max_block_size=None."""
ctx = DataContext.get_current()
ctx.target_max_block_size = None
ctx._max_num_blocks_in_streaming_gen_buffer = 1

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)
return metrics


@pytest.fixture
def metrics_config_with_sample(restore_data_context): # noqa: F811
"""Fixture for scenario with average_bytes_per_output available."""
ctx = DataContext.get_current()
ctx.target_max_block_size = 128 * 1024 * 1024 # 128MB
ctx._max_num_blocks_in_streaming_gen_buffer = 1

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)

# Simulate having samples: set bytes_task_outputs_generated and
# num_task_outputs_generated to make average_bytes_per_output available
actual_block_size = 150 * 1024 * 1024 # 150MB
metrics.bytes_task_outputs_generated = actual_block_size
metrics.num_task_outputs_generated = 1

return metrics


@pytest.fixture
def metrics_config_pending_outputs_no_sample(
restore_data_context, # noqa: F811
):
"""Fixture for pending outputs during no-sample with target set."""
ctx = DataContext.get_current()
ctx.target_max_block_size = 64 * 1024 * 1024 # 64MB
ctx._max_num_blocks_in_streaming_gen_buffer = 2

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)
metrics.num_tasks_running = 3
return metrics


@pytest.fixture
def metrics_config_pending_outputs_none(restore_data_context): # noqa: F811
"""Fixture for pending outputs during no-sample with target=None."""
ctx = DataContext.get_current()
ctx.target_max_block_size = None
ctx._max_num_blocks_in_streaming_gen_buffer = 1

op = MagicMock()
op.data_context = ctx
metrics = OpRuntimeMetrics(op)
metrics.num_tasks_running = 2
return metrics


@pytest.mark.parametrize(
"metrics_fixture,test_property,expected_calculator",
[
(
"metrics_config_no_sample_with_target",
"obj_store_mem_max_pending_output_per_task",
lambda m: (
m._op.data_context.target_max_block_size
* MAX_SAFE_BLOCK_SIZE_FACTOR
* m._op.data_context._max_num_blocks_in_streaming_gen_buffer
),
),
(
"metrics_config_no_sample_with_none",
"obj_store_mem_max_pending_output_per_task",
lambda m: (
DEFAULT_TARGET_MAX_BLOCK_SIZE
* MAX_SAFE_BLOCK_SIZE_FACTOR
* m._op.data_context._max_num_blocks_in_streaming_gen_buffer
),
),
(
"metrics_config_with_sample",
"obj_store_mem_max_pending_output_per_task",
lambda m: (
m.average_bytes_per_output
* m._op.data_context._max_num_blocks_in_streaming_gen_buffer
),
),
(
"metrics_config_pending_outputs_no_sample",
"obj_store_mem_pending_task_outputs",
lambda m: (
m.num_tasks_running
* m._op.data_context.target_max_block_size
* MAX_SAFE_BLOCK_SIZE_FACTOR
* m._op.data_context._max_num_blocks_in_streaming_gen_buffer
),
),
(
"metrics_config_pending_outputs_none",
"obj_store_mem_pending_task_outputs",
lambda m: (
m.num_tasks_running
* DEFAULT_TARGET_MAX_BLOCK_SIZE
* MAX_SAFE_BLOCK_SIZE_FACTOR
* m._op.data_context._max_num_blocks_in_streaming_gen_buffer
),
),
],
)
def test_obj_store_mem_estimation(
request, metrics_fixture, test_property, expected_calculator
):
"""Test object store memory estimation for various scenarios."""
metrics = request.getfixturevalue(metrics_fixture)
actual = getattr(metrics, test_property)
expected = expected_calculator(metrics)

assert (
actual == expected
), f"Expected {test_property} to be {expected}, got {actual}"


if __name__ == "__main__":
import sys

Expand Down
27 changes: 18 additions & 9 deletions python/ray/data/tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
build_streaming_topology,
)
from ray.data._internal.execution.util import make_ref_bundles
from ray.data.context import DataContext
from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR, DataContext
from ray.data.tests.conftest import * # noqa


Expand Down Expand Up @@ -333,14 +333,19 @@ def test_object_store_usage(self, restore_data_context):
assert resource_manager.get_op_usage(o2).object_store_memory == 0
assert resource_manager.get_op_usage(o3).object_store_memory == 0

# Operators estimate pending task outputs using the target max block size.
# In this case, the target max block size is 2 and there is at most 1 block
# in the streaming generator buffer, so the estimated usage is 2.
# Operators estimate pending task outputs using the target max block size
# multiplied by MAX_SAFE_BLOCK_SIZE_FACTOR (1.5) during no-sample phase.
# In this case, the target max block size is 2, MAX_SAFE_BLOCK_SIZE_FACTOR
# is 1.5, and there is at most 1 block in the streaming generator buffer,
# so the estimated usage is 2 * 1.5 * 1 = 3.
o2.metrics.on_input_dequeued(input)
o2.metrics.on_task_submitted(0, input)
resource_manager.update_usages()
# target_max_block_size * factor * max_blocks
expected_usage = 2 * MAX_SAFE_BLOCK_SIZE_FACTOR * 1
assert resource_manager.get_op_usage(o1).object_store_memory == 0
assert resource_manager.get_op_usage(o2).object_store_memory == 2
op2_usage = resource_manager.get_op_usage(o2).object_store_memory
assert op2_usage == expected_usage
assert resource_manager.get_op_usage(o3).object_store_memory == 0

# When the task finishes, we move the data from the streaming generator to the
Expand All @@ -367,15 +372,19 @@ def test_object_store_usage(self, restore_data_context):
assert resource_manager.get_op_usage(o2).object_store_memory == 1
assert resource_manager.get_op_usage(o3).object_store_memory == 0

# Task inputs count toward the previous operator's object store memory usage,
# and task outputs count toward the current operator's object store memory
# usage.
# Task inputs count toward the previous operator's object store memory
# usage, and task outputs count toward the current operator's object
# store memory usage. During no-sample phase, pending outputs are
# estimated using target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR.
o3.metrics.on_input_dequeued(input)
o3.metrics.on_task_submitted(0, input)
resource_manager.update_usages()
assert resource_manager.get_op_usage(o1).object_store_memory == 0
assert resource_manager.get_op_usage(o2).object_store_memory == 1
assert resource_manager.get_op_usage(o3).object_store_memory == 2
# target_max_block_size (2) * factor (1.5) * max_blocks (1) = 3
expected_o3_usage = 2 * MAX_SAFE_BLOCK_SIZE_FACTOR * 1
op3_usage = resource_manager.get_op_usage(o3).object_store_memory
assert op3_usage == expected_o3_usage

# Task inputs no longer count once the task is finished.
o3.metrics.on_output_queued(input)
Expand Down