diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index cf1f881c826a..939dbdb79192 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -16,6 +16,7 @@ from ray.data._internal.execution.interfaces.ref_bundle import RefBundle from ray.data._internal.memory_tracing import trace_allocation from ray.data.block import BlockMetadata +from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR if TYPE_CHECKING: from ray.data._internal.execution.interfaces.physical_operator import ( @@ -684,7 +685,11 @@ def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]: if bytes_per_output is None: if context.target_max_block_size is None: return None - bytes_per_output = context.target_max_block_size + else: + # Block size can be up to MAX_SAFE_BLOCK_SIZE_FACTOR larger before being sliced. + bytes_per_output = ( + context.target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR + ) num_pending_outputs = context._max_num_blocks_in_streaming_gen_buffer if self.average_num_outputs_per_task is not None: diff --git a/python/ray/data/tests/test_executor_resource_management.py b/python/ray/data/tests/test_executor_resource_management.py index f6abd4603b86..91f68092d235 100644 --- a/python/ray/data/tests/test_executor_resource_management.py +++ b/python/ray/data/tests/test_executor_resource_management.py @@ -9,7 +9,7 @@ from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.output_splitter import OutputSplitter from ray.data._internal.execution.util import make_ref_bundles -from ray.data.context import DataContext +from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR, DataContext from ray.data.tests.conftest import * # noqa from ray.data.tests.test_operators import _mul2_map_data_prcessor from ray.data.tests.util import run_op_tasks_sync @@ -312,6 +312,7 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co inc_obj_store_mem = ( data_context._max_num_blocks_in_streaming_gen_buffer * data_context.target_max_block_size + * MAX_SAFE_BLOCK_SIZE_FACTOR ) min_resource_usage, _ = op.min_max_resource_requirements() assert min_resource_usage == ExecutionResources( @@ -426,6 +427,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared): inc_obj_store_mem = ( data_context._max_num_blocks_in_streaming_gen_buffer * data_context.target_max_block_size + * MAX_SAFE_BLOCK_SIZE_FACTOR ) min_resource_usage, _ = op.min_max_resource_requirements() assert min_resource_usage == ExecutionResources( diff --git a/python/ray/data/tests/test_op_runtime_metrics.py b/python/ray/data/tests/test_op_runtime_metrics.py index ecef3aa8afe5..7f35d8608b3c 100644 --- a/python/ray/data/tests/test_op_runtime_metrics.py +++ b/python/ray/data/tests/test_op_runtime_metrics.py @@ -11,6 +11,10 @@ ) from ray.data._internal.util import KiB from ray.data.block import BlockExecStats, BlockMetadata +from ray.data.context import ( + MAX_SAFE_BLOCK_SIZE_FACTOR, + DataContext, +) def test_average_max_uss_per_task(): @@ -213,6 +217,127 @@ def create_bundle_with_rows(num_rows): metrics.block_size_rows._bucket_counts[expected_bucket] = 0 +@pytest.fixture +def metrics_config_no_sample_with_target(restore_data_context): # noqa: F811 + """Fixture for no-sample scenario with target_max_block_size set.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = 128 * 1024 * 1024 # 128MB + ctx._max_num_blocks_in_streaming_gen_buffer = 2 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + return metrics + + +@pytest.fixture +def metrics_config_no_sample_with_none(restore_data_context): # noqa: F811 + """Fixture for no-sample scenario with target_max_block_size=None.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = None + ctx._max_num_blocks_in_streaming_gen_buffer = 1 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + return metrics + + +@pytest.fixture +def metrics_config_with_sample(restore_data_context): # noqa: F811 + """Fixture for scenario with average_bytes_per_output available.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = 128 * 1024 * 1024 # 128MB + ctx._max_num_blocks_in_streaming_gen_buffer = 1 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + + # Simulate having samples: set bytes_task_outputs_generated and + # num_task_outputs_generated to make average_bytes_per_output available + actual_block_size = 150 * 1024 * 1024 # 150MB + metrics.bytes_task_outputs_generated = actual_block_size + metrics.num_task_outputs_generated = 1 + + return metrics + + +@pytest.fixture +def metrics_config_pending_outputs_no_sample( + restore_data_context, # noqa: F811 +): + """Fixture for pending outputs during no-sample with target set.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = 64 * 1024 * 1024 # 64MB + ctx._max_num_blocks_in_streaming_gen_buffer = 2 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + metrics.num_tasks_running = 3 + return metrics + + +@pytest.fixture +def metrics_config_pending_outputs_none(restore_data_context): # noqa: F811 + """Fixture for pending outputs during no-sample with target=None.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = None + ctx._max_num_blocks_in_streaming_gen_buffer = 1 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + metrics.num_tasks_running = 2 + return metrics + + +@pytest.mark.parametrize( + "metrics_fixture,test_property,expected_calculator", + [ + ( + "metrics_config_no_sample_with_target", + "obj_store_mem_max_pending_output_per_task", + lambda m: ( + m._op.data_context.target_max_block_size + * MAX_SAFE_BLOCK_SIZE_FACTOR + * m._op.data_context._max_num_blocks_in_streaming_gen_buffer + ), + ), + ( + "metrics_config_with_sample", + "obj_store_mem_max_pending_output_per_task", + lambda m: ( + m.average_bytes_per_output + * m._op.data_context._max_num_blocks_in_streaming_gen_buffer + ), + ), + ( + "metrics_config_pending_outputs_no_sample", + "obj_store_mem_pending_task_outputs", + lambda m: ( + m.num_tasks_running + * m._op.data_context.target_max_block_size + * MAX_SAFE_BLOCK_SIZE_FACTOR + * m._op.data_context._max_num_blocks_in_streaming_gen_buffer + ), + ), + ], +) +def test_obj_store_mem_estimation( + request, metrics_fixture, test_property, expected_calculator +): + """Test object store memory estimation for various scenarios.""" + metrics = request.getfixturevalue(metrics_fixture) + actual = getattr(metrics, test_property) + expected = expected_calculator(metrics) + + assert ( + actual == expected + ), f"Expected {test_property} to be {expected}, got {actual}" + + if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index d63dec3b2f10..168773a5032c 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -25,7 +25,7 @@ build_streaming_topology, ) from ray.data._internal.execution.util import make_ref_bundles -from ray.data.context import DataContext +from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR, DataContext from ray.data.tests.conftest import * # noqa @@ -333,14 +333,19 @@ def test_object_store_usage(self, restore_data_context): assert resource_manager.get_op_usage(o2).object_store_memory == 0 assert resource_manager.get_op_usage(o3).object_store_memory == 0 - # Operators estimate pending task outputs using the target max block size. - # In this case, the target max block size is 2 and there is at most 1 block - # in the streaming generator buffer, so the estimated usage is 2. + # Operators estimate pending task outputs using the target max block size + # multiplied by MAX_SAFE_BLOCK_SIZE_FACTOR (1.5) during no-sample phase. + # In this case, the target max block size is 2, MAX_SAFE_BLOCK_SIZE_FACTOR + # is 1.5, and there is at most 1 block in the streaming generator buffer, + # so the estimated usage is 2 * 1.5 * 1 = 3. o2.metrics.on_input_dequeued(input) o2.metrics.on_task_submitted(0, input) resource_manager.update_usages() + # target_max_block_size * factor * max_blocks + expected_usage = 2 * MAX_SAFE_BLOCK_SIZE_FACTOR * 1 assert resource_manager.get_op_usage(o1).object_store_memory == 0 - assert resource_manager.get_op_usage(o2).object_store_memory == 2 + op2_usage = resource_manager.get_op_usage(o2).object_store_memory + assert op2_usage == expected_usage assert resource_manager.get_op_usage(o3).object_store_memory == 0 # When the task finishes, we move the data from the streaming generator to the @@ -367,15 +372,19 @@ def test_object_store_usage(self, restore_data_context): assert resource_manager.get_op_usage(o2).object_store_memory == 1 assert resource_manager.get_op_usage(o3).object_store_memory == 0 - # Task inputs count toward the previous operator's object store memory usage, - # and task outputs count toward the current operator's object store memory - # usage. + # Task inputs count toward the previous operator's object store memory + # usage, and task outputs count toward the current operator's object + # store memory usage. During no-sample phase, pending outputs are + # estimated using target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR. o3.metrics.on_input_dequeued(input) o3.metrics.on_task_submitted(0, input) resource_manager.update_usages() assert resource_manager.get_op_usage(o1).object_store_memory == 0 assert resource_manager.get_op_usage(o2).object_store_memory == 1 - assert resource_manager.get_op_usage(o3).object_store_memory == 2 + # target_max_block_size (2) * factor (1.5) * max_blocks (1) = 3 + expected_o3_usage = 2 * MAX_SAFE_BLOCK_SIZE_FACTOR * 1 + op3_usage = resource_manager.get_op_usage(o3).object_store_memory + assert op3_usage == expected_o3_usage # Task inputs no longer count once the task is finished. o3.metrics.on_output_queued(input)