From 206470a124a811fc8317ecb6c2f50013a056007e Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Thu, 20 Nov 2025 23:31:44 +0000 Subject: [PATCH 1/3] [Data] Fix obj_store_mem_max_pending_output_per_task reporting Signed-off-by: Srinath Krishnamachari --- .../interfaces/op_runtime_metrics.py | 18 ++- .../ray/data/tests/test_op_runtime_metrics.py | 145 ++++++++++++++++++ .../ray/data/tests/test_resource_manager.py | 27 ++-- 3 files changed, 177 insertions(+), 13 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index cf1f881c826a..cee5005a011a 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -16,6 +16,10 @@ from ray.data._internal.execution.interfaces.ref_bundle import RefBundle from ray.data._internal.memory_tracing import trace_allocation from ray.data.block import BlockMetadata +from ray.data.context import ( + DEFAULT_TARGET_MAX_BLOCK_SIZE, + MAX_SAFE_BLOCK_SIZE_FACTOR, +) if TYPE_CHECKING: from ray.data._internal.execution.interfaces.physical_operator import ( @@ -679,12 +683,18 @@ def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]: return None bytes_per_output = self.average_bytes_per_output - # If we don’t have a sample yet and the limit is “unlimited”, we can’t - # estimate – just bail out. if bytes_per_output is None: + # If we don't have a sample yet, use target_max_block_size, but + # account for the fact that blocks can be up to + # MAX_SAFE_BLOCK_SIZE_FACTOR larger before being sliced. if context.target_max_block_size is None: - return None - bytes_per_output = context.target_max_block_size + bytes_per_output = ( + DEFAULT_TARGET_MAX_BLOCK_SIZE * MAX_SAFE_BLOCK_SIZE_FACTOR + ) + else: + bytes_per_output = ( + context.target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR + ) num_pending_outputs = context._max_num_blocks_in_streaming_gen_buffer if self.average_num_outputs_per_task is not None: diff --git a/python/ray/data/tests/test_op_runtime_metrics.py b/python/ray/data/tests/test_op_runtime_metrics.py index ecef3aa8afe5..85a7cf3d30f8 100644 --- a/python/ray/data/tests/test_op_runtime_metrics.py +++ b/python/ray/data/tests/test_op_runtime_metrics.py @@ -11,6 +11,11 @@ ) from ray.data._internal.util import KiB from ray.data.block import BlockExecStats, BlockMetadata +from ray.data.context import ( + DEFAULT_TARGET_MAX_BLOCK_SIZE, + MAX_SAFE_BLOCK_SIZE_FACTOR, + DataContext, +) def test_average_max_uss_per_task(): @@ -213,6 +218,146 @@ def create_bundle_with_rows(num_rows): metrics.block_size_rows._bucket_counts[expected_bucket] = 0 +@pytest.fixture +def metrics_config_no_sample_with_target(restore_data_context): # noqa: F811 + """Fixture for no-sample scenario with target_max_block_size set.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = 128 * 1024 * 1024 # 128MB + ctx._max_num_blocks_in_streaming_gen_buffer = 2 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + return metrics + + +@pytest.fixture +def metrics_config_no_sample_with_none(restore_data_context): # noqa: F811 + """Fixture for no-sample scenario with target_max_block_size=None.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = None + ctx._max_num_blocks_in_streaming_gen_buffer = 1 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + return metrics + + +@pytest.fixture +def metrics_config_with_sample(restore_data_context): # noqa: F811 + """Fixture for scenario with average_bytes_per_output available.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = 128 * 1024 * 1024 # 128MB + ctx._max_num_blocks_in_streaming_gen_buffer = 1 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + + # Simulate having samples: set bytes_task_outputs_generated and + # num_task_outputs_generated to make average_bytes_per_output available + actual_block_size = 150 * 1024 * 1024 # 150MB + metrics.bytes_task_outputs_generated = actual_block_size + metrics.num_task_outputs_generated = 1 + + return metrics + + +@pytest.fixture +def metrics_config_pending_outputs_no_sample( + restore_data_context, # noqa: F811 +): + """Fixture for pending outputs during no-sample with target set.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = 64 * 1024 * 1024 # 64MB + ctx._max_num_blocks_in_streaming_gen_buffer = 2 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + metrics.num_tasks_running = 3 + return metrics + + +@pytest.fixture +def metrics_config_pending_outputs_none(restore_data_context): # noqa: F811 + """Fixture for pending outputs during no-sample with target=None.""" + ctx = DataContext.get_current() + ctx.target_max_block_size = None + ctx._max_num_blocks_in_streaming_gen_buffer = 1 + + op = MagicMock() + op.data_context = ctx + metrics = OpRuntimeMetrics(op) + metrics.num_tasks_running = 2 + return metrics + + +@pytest.mark.parametrize( + "metrics_fixture,test_property,expected_calculator", + [ + ( + "metrics_config_no_sample_with_target", + "obj_store_mem_max_pending_output_per_task", + lambda m: ( + m._op.data_context.target_max_block_size + * MAX_SAFE_BLOCK_SIZE_FACTOR + * m._op.data_context._max_num_blocks_in_streaming_gen_buffer + ), + ), + ( + "metrics_config_no_sample_with_none", + "obj_store_mem_max_pending_output_per_task", + lambda m: ( + DEFAULT_TARGET_MAX_BLOCK_SIZE + * MAX_SAFE_BLOCK_SIZE_FACTOR + * m._op.data_context._max_num_blocks_in_streaming_gen_buffer + ), + ), + ( + "metrics_config_with_sample", + "obj_store_mem_max_pending_output_per_task", + lambda m: ( + m.average_bytes_per_output + * m._op.data_context._max_num_blocks_in_streaming_gen_buffer + ), + ), + ( + "metrics_config_pending_outputs_no_sample", + "obj_store_mem_pending_task_outputs", + lambda m: ( + m.num_tasks_running + * m._op.data_context.target_max_block_size + * MAX_SAFE_BLOCK_SIZE_FACTOR + * m._op.data_context._max_num_blocks_in_streaming_gen_buffer + ), + ), + ( + "metrics_config_pending_outputs_none", + "obj_store_mem_pending_task_outputs", + lambda m: ( + m.num_tasks_running + * DEFAULT_TARGET_MAX_BLOCK_SIZE + * MAX_SAFE_BLOCK_SIZE_FACTOR + * m._op.data_context._max_num_blocks_in_streaming_gen_buffer + ), + ), + ], +) +def test_obj_store_mem_estimation( + request, metrics_fixture, test_property, expected_calculator +): + """Test object store memory estimation for various scenarios.""" + metrics = request.getfixturevalue(metrics_fixture) + actual = getattr(metrics, test_property) + expected = expected_calculator(metrics) + + assert ( + actual == expected + ), f"Expected {test_property} to be {expected}, got {actual}" + + if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index d63dec3b2f10..168773a5032c 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -25,7 +25,7 @@ build_streaming_topology, ) from ray.data._internal.execution.util import make_ref_bundles -from ray.data.context import DataContext +from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR, DataContext from ray.data.tests.conftest import * # noqa @@ -333,14 +333,19 @@ def test_object_store_usage(self, restore_data_context): assert resource_manager.get_op_usage(o2).object_store_memory == 0 assert resource_manager.get_op_usage(o3).object_store_memory == 0 - # Operators estimate pending task outputs using the target max block size. - # In this case, the target max block size is 2 and there is at most 1 block - # in the streaming generator buffer, so the estimated usage is 2. + # Operators estimate pending task outputs using the target max block size + # multiplied by MAX_SAFE_BLOCK_SIZE_FACTOR (1.5) during no-sample phase. + # In this case, the target max block size is 2, MAX_SAFE_BLOCK_SIZE_FACTOR + # is 1.5, and there is at most 1 block in the streaming generator buffer, + # so the estimated usage is 2 * 1.5 * 1 = 3. o2.metrics.on_input_dequeued(input) o2.metrics.on_task_submitted(0, input) resource_manager.update_usages() + # target_max_block_size * factor * max_blocks + expected_usage = 2 * MAX_SAFE_BLOCK_SIZE_FACTOR * 1 assert resource_manager.get_op_usage(o1).object_store_memory == 0 - assert resource_manager.get_op_usage(o2).object_store_memory == 2 + op2_usage = resource_manager.get_op_usage(o2).object_store_memory + assert op2_usage == expected_usage assert resource_manager.get_op_usage(o3).object_store_memory == 0 # When the task finishes, we move the data from the streaming generator to the @@ -367,15 +372,19 @@ def test_object_store_usage(self, restore_data_context): assert resource_manager.get_op_usage(o2).object_store_memory == 1 assert resource_manager.get_op_usage(o3).object_store_memory == 0 - # Task inputs count toward the previous operator's object store memory usage, - # and task outputs count toward the current operator's object store memory - # usage. + # Task inputs count toward the previous operator's object store memory + # usage, and task outputs count toward the current operator's object + # store memory usage. During no-sample phase, pending outputs are + # estimated using target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR. o3.metrics.on_input_dequeued(input) o3.metrics.on_task_submitted(0, input) resource_manager.update_usages() assert resource_manager.get_op_usage(o1).object_store_memory == 0 assert resource_manager.get_op_usage(o2).object_store_memory == 1 - assert resource_manager.get_op_usage(o3).object_store_memory == 2 + # target_max_block_size (2) * factor (1.5) * max_blocks (1) = 3 + expected_o3_usage = 2 * MAX_SAFE_BLOCK_SIZE_FACTOR * 1 + op3_usage = resource_manager.get_op_usage(o3).object_store_memory + assert op3_usage == expected_o3_usage # Task inputs no longer count once the task is finished. o3.metrics.on_output_queued(input) From 97e57dd62fdbe4d448a17d57fddf0c4392abc46e Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Fri, 21 Nov 2025 01:44:58 +0000 Subject: [PATCH 2/3] Fixes Signed-off-by: Srinath Krishnamachari --- python/ray/data/tests/test_executor_resource_management.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_executor_resource_management.py b/python/ray/data/tests/test_executor_resource_management.py index f6abd4603b86..91f68092d235 100644 --- a/python/ray/data/tests/test_executor_resource_management.py +++ b/python/ray/data/tests/test_executor_resource_management.py @@ -9,7 +9,7 @@ from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.output_splitter import OutputSplitter from ray.data._internal.execution.util import make_ref_bundles -from ray.data.context import DataContext +from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR, DataContext from ray.data.tests.conftest import * # noqa from ray.data.tests.test_operators import _mul2_map_data_prcessor from ray.data.tests.util import run_op_tasks_sync @@ -312,6 +312,7 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co inc_obj_store_mem = ( data_context._max_num_blocks_in_streaming_gen_buffer * data_context.target_max_block_size + * MAX_SAFE_BLOCK_SIZE_FACTOR ) min_resource_usage, _ = op.min_max_resource_requirements() assert min_resource_usage == ExecutionResources( @@ -426,6 +427,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared): inc_obj_store_mem = ( data_context._max_num_blocks_in_streaming_gen_buffer * data_context.target_max_block_size + * MAX_SAFE_BLOCK_SIZE_FACTOR ) min_resource_usage, _ = op.min_max_resource_requirements() assert min_resource_usage == ExecutionResources( From 523bb08f7f23aee39aa47c6d9b2d21dfede01d59 Mon Sep 17 00:00:00 2001 From: Srinath Krishnamachari Date: Mon, 24 Nov 2025 01:05:57 +0000 Subject: [PATCH 3/3] Fixes Signed-off-by: Srinath Krishnamachari --- .../interfaces/op_runtime_metrics.py | 15 +++++--------- .../ray/data/tests/test_op_runtime_metrics.py | 20 ------------------- 2 files changed, 5 insertions(+), 30 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index cee5005a011a..939dbdb79192 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -16,10 +16,7 @@ from ray.data._internal.execution.interfaces.ref_bundle import RefBundle from ray.data._internal.memory_tracing import trace_allocation from ray.data.block import BlockMetadata -from ray.data.context import ( - DEFAULT_TARGET_MAX_BLOCK_SIZE, - MAX_SAFE_BLOCK_SIZE_FACTOR, -) +from ray.data.context import MAX_SAFE_BLOCK_SIZE_FACTOR if TYPE_CHECKING: from ray.data._internal.execution.interfaces.physical_operator import ( @@ -683,15 +680,13 @@ def obj_store_mem_max_pending_output_per_task(self) -> Optional[float]: return None bytes_per_output = self.average_bytes_per_output + # If we don’t have a sample yet and the limit is “unlimited”, we can’t + # estimate – just bail out. if bytes_per_output is None: - # If we don't have a sample yet, use target_max_block_size, but - # account for the fact that blocks can be up to - # MAX_SAFE_BLOCK_SIZE_FACTOR larger before being sliced. if context.target_max_block_size is None: - bytes_per_output = ( - DEFAULT_TARGET_MAX_BLOCK_SIZE * MAX_SAFE_BLOCK_SIZE_FACTOR - ) + return None else: + # Block size can be up to MAX_SAFE_BLOCK_SIZE_FACTOR larger before being sliced. bytes_per_output = ( context.target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR ) diff --git a/python/ray/data/tests/test_op_runtime_metrics.py b/python/ray/data/tests/test_op_runtime_metrics.py index 85a7cf3d30f8..7f35d8608b3c 100644 --- a/python/ray/data/tests/test_op_runtime_metrics.py +++ b/python/ray/data/tests/test_op_runtime_metrics.py @@ -12,7 +12,6 @@ from ray.data._internal.util import KiB from ray.data.block import BlockExecStats, BlockMetadata from ray.data.context import ( - DEFAULT_TARGET_MAX_BLOCK_SIZE, MAX_SAFE_BLOCK_SIZE_FACTOR, DataContext, ) @@ -306,15 +305,6 @@ def metrics_config_pending_outputs_none(restore_data_context): # noqa: F811 * m._op.data_context._max_num_blocks_in_streaming_gen_buffer ), ), - ( - "metrics_config_no_sample_with_none", - "obj_store_mem_max_pending_output_per_task", - lambda m: ( - DEFAULT_TARGET_MAX_BLOCK_SIZE - * MAX_SAFE_BLOCK_SIZE_FACTOR - * m._op.data_context._max_num_blocks_in_streaming_gen_buffer - ), - ), ( "metrics_config_with_sample", "obj_store_mem_max_pending_output_per_task", @@ -333,16 +323,6 @@ def metrics_config_pending_outputs_none(restore_data_context): # noqa: F811 * m._op.data_context._max_num_blocks_in_streaming_gen_buffer ), ), - ( - "metrics_config_pending_outputs_none", - "obj_store_mem_pending_task_outputs", - lambda m: ( - m.num_tasks_running - * DEFAULT_TARGET_MAX_BLOCK_SIZE - * MAX_SAFE_BLOCK_SIZE_FACTOR - * m._op.data_context._max_num_blocks_in_streaming_gen_buffer - ), - ), ], ) def test_obj_store_mem_estimation(