From 04c6b1df0c09f8dc3a57d5e1c703842a21df11c4 Mon Sep 17 00:00:00 2001 From: 400Ping Date: Wed, 26 Nov 2025 14:23:39 +0800 Subject: [PATCH 1/4] fix Signed-off-by: 400Ping --- .../operators/actor_pool_map_operator.py | 22 +++++++++++++++++++ .../execution/operators/map_operator.py | 13 ----------- .../operators/task_pool_map_operator.py | 13 +++++++++++ 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index 486842181127..1700fee5c920 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -401,6 +401,28 @@ def all_inputs_done(self): "configuring `override_num_blocks` earlier in the pipeline." ) + def clear_internal_input_queue(self) -> None: + """Clear internal input queues for the actor-pool map operator. + + This includes: + * The local bundle queue used to stage input bundles for actors. + * The shared block ref bundler inherited from MapOperator. + """ + while self._bundle_queue.has_next(): + bundle = self._bundle_queue.get_next() + self._metrics.on_input_dequeued(bundle) + + while self._block_ref_bundler.has_bundle(): + input_bundles, _ = self._block_ref_bundler.get_next_bundle() + for input_bundle in input_bundles: + self._metrics.on_input_dequeued(input_bundle) + + def clear_internal_output_queue(self) -> None: + """Clear internal output queue for the actor-pool map operator.""" + while self._output_queue.has_next(): + bundle = self._output_queue.get_next() + self._metrics.on_output_dequeued(bundle) + def _do_shutdown(self, force: bool = False): self._actor_pool.shutdown(force=force) # NOTE: It's critical for Actor Pool to release actors before calling into diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index d2c628dbcad6..d12d7f7a611c 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -206,19 +206,6 @@ def internal_output_queue_num_blocks(self) -> int: def internal_output_queue_num_bytes(self) -> int: return self._output_queue.size_bytes() - def clear_internal_input_queue(self) -> None: - """Clear internal input queue (block ref bundler).""" - while self._block_ref_bundler.has_bundle(): - (input_bundles, _) = self._block_ref_bundler.get_next_bundle() - for input_bundle in input_bundles: - self._metrics.on_input_dequeued(input_bundle) - - def clear_internal_output_queue(self) -> None: - """Clear internal output queue.""" - while self._output_queue.has_next(): - bundle = self._output_queue.get_next() - self._metrics.on_output_dequeued(bundle) - @property def name(self) -> str: name = super().name diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index 623e4b859ba2..ccafc07b4ac6 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -175,3 +175,16 @@ def all_inputs_done(self): "to increase the number of concurrent tasks by configuring " "`override_num_blocks` earlier in the pipeline." ) + + def clear_internal_input_queue(self) -> None: + """Clear internal input queue (block ref bundler).""" + while self._block_ref_bundler.has_bundle(): + input_bundles, _ = self._block_ref_bundler.get_next_bundle() + for input_bundle in input_bundles: + self._metrics.on_input_dequeued(input_bundle) + + def clear_internal_output_queue(self) -> None: + """Clear internal output queue.""" + while self._output_queue.has_next(): + bundle = self._output_queue.get_next() + self._metrics.on_output_dequeued(bundle) From e3a8274f086463d8876dee72655c7895f1d1192f Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 25 Nov 2025 23:14:45 -0800 Subject: [PATCH 2/4] Fix bug Signed-off-by: Balaji Veeramani --- .../operators/actor_pool_map_operator.py | 18 +++------- .../execution/operators/map_operator.py | 14 ++++++++ .../operators/task_pool_map_operator.py | 13 -------- .../tests/test_actor_pool_map_operator.py | 33 ++++++++++++++++++- 4 files changed, 50 insertions(+), 28 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index 1700fee5c920..86e077d9f48b 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -404,25 +404,15 @@ def all_inputs_done(self): def clear_internal_input_queue(self) -> None: """Clear internal input queues for the actor-pool map operator. - This includes: - * The local bundle queue used to stage input bundles for actors. - * The shared block ref bundler inherited from MapOperator. + In addition to clearing the base class' internal queues, this method clears + the local bundle queue used to stage input bundles for actors. """ + super().clear_internal_input_queue() + while self._bundle_queue.has_next(): bundle = self._bundle_queue.get_next() self._metrics.on_input_dequeued(bundle) - while self._block_ref_bundler.has_bundle(): - input_bundles, _ = self._block_ref_bundler.get_next_bundle() - for input_bundle in input_bundles: - self._metrics.on_input_dequeued(input_bundle) - - def clear_internal_output_queue(self) -> None: - """Clear internal output queue for the actor-pool map operator.""" - while self._output_queue.has_next(): - bundle = self._output_queue.get_next() - self._metrics.on_output_dequeued(bundle) - def _do_shutdown(self, force: bool = False): self._actor_pool.shutdown(force=force) # NOTE: It's critical for Actor Pool to release actors before calling into diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index d12d7f7a611c..d590d9fece6e 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -213,6 +213,20 @@ def name(self) -> str: name += f"->SplitBlocks({self._additional_split_factor})" return name + def clear_internal_input_queue(self) -> None: + """Clear internal input queue (block ref bundler).""" + self._block_ref_bundler.done_adding_bundles() + while self._block_ref_bundler.has_bundle(): + (input_bundles, _) = self._block_ref_bundler.get_next_bundle() + for input_bundle in input_bundles: + self._metrics.on_input_dequeued(input_bundle) + + def clear_internal_output_queue(self) -> None: + """Clear internal output queue.""" + while self._output_queue.has_next(): + bundle = self._output_queue.get_next() + self._metrics.on_output_dequeued(bundle) + @classmethod def create( cls, diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index ccafc07b4ac6..623e4b859ba2 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -175,16 +175,3 @@ def all_inputs_done(self): "to increase the number of concurrent tasks by configuring " "`override_num_blocks` earlier in the pipeline." ) - - def clear_internal_input_queue(self) -> None: - """Clear internal input queue (block ref bundler).""" - while self._block_ref_bundler.has_bundle(): - input_bundles, _ = self._block_ref_bundler.get_next_bundle() - for input_bundle in input_bundles: - self._metrics.on_input_dequeued(input_bundle) - - def clear_internal_output_queue(self) -> None: - """Clear internal output queue.""" - while self._output_queue.has_next(): - bundle = self._output_queue.get_next() - self._metrics.on_output_dequeued(bundle) diff --git a/python/ray/data/tests/test_actor_pool_map_operator.py b/python/ray/data/tests/test_actor_pool_map_operator.py index 53da5248784e..c451d60fb042 100644 --- a/python/ray/data/tests/test_actor_pool_map_operator.py +++ b/python/ray/data/tests/test_actor_pool_map_operator.py @@ -8,6 +8,7 @@ from typing import Any, Callable, Dict, Optional, Tuple from unittest.mock import MagicMock +import pyarrow as pa import pytest from freezegun import freeze_time @@ -39,7 +40,7 @@ update_operator_states, ) from ray.data._internal.execution.util import make_ref_bundles -from ray.data.block import Block, BlockMetadata +from ray.data.block import Block, BlockAccessor, BlockMetadata from ray.tests.conftest import * # noqa from ray.types import ObjectRef @@ -627,6 +628,36 @@ def test_setting_initial_size_for_actor_pool(): ray.shutdown() +def _create_bundle_with_single_row(row): + block = pa.Table.from_pylist([row]) + block_ref = ray.put(block) + metadata = BlockAccessor.for_block(block).get_metadata() + schema = BlockAccessor.for_block(block).schema() + return RefBundle([(block_ref, metadata)], owns_blocks=False, schema=schema) + + +@pytest.mark.parametrize("min_rows_per_bundle", [2, None]) +def test_internal_input_queue_is_empty_after_early_completion(min_rows_per_bundle): + data_context = ray.data.DataContext.get_current() + op = ActorPoolMapOperator( + map_transformer=MagicMock(), + input_op=InputDataBuffer(data_context, input_data=MagicMock()), + data_context=data_context, + compute_strategy=ray.data.ActorPoolStrategy(size=1), + min_rows_per_bundle=min_rows_per_bundle, + ) + op.start(ExecutionOptions()) + + ref_bundle = _create_bundle_with_single_row({"id": 0}) + op.add_input(ref_bundle, 0) + + op.mark_execution_finished() + + assert ( + op.internal_input_queue_num_blocks() == 0 + ), op.internal_input_queue_num_blocks() + + def test_min_max_resource_requirements(restore_data_context): data_context = ray.data.DataContext.get_current() op = ActorPoolMapOperator( From a1fbd0d63f2d4a5693ae9fbf8324cda17df3f68b Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 25 Nov 2025 23:16:02 -0800 Subject: [PATCH 3/4] Fix unexpected diff Signed-off-by: Balaji Veeramani --- .../_internal/execution/operators/map_operator.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index d590d9fece6e..21604d948340 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -206,13 +206,6 @@ def internal_output_queue_num_blocks(self) -> int: def internal_output_queue_num_bytes(self) -> int: return self._output_queue.size_bytes() - @property - def name(self) -> str: - name = super().name - if self._additional_split_factor is not None: - name += f"->SplitBlocks({self._additional_split_factor})" - return name - def clear_internal_input_queue(self) -> None: """Clear internal input queue (block ref bundler).""" self._block_ref_bundler.done_adding_bundles() @@ -227,6 +220,13 @@ def clear_internal_output_queue(self) -> None: bundle = self._output_queue.get_next() self._metrics.on_output_dequeued(bundle) + @property + def name(self) -> str: + name = super().name + if self._additional_split_factor is not None: + name += f"->SplitBlocks({self._additional_split_factor})" + return name + @classmethod def create( cls, From c2c27df8c4855baa8d3924edb8e0250632229fdc Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 26 Nov 2025 00:26:31 -0800 Subject: [PATCH 4/4] Fix test Signed-off-by: Balaji Veeramani --- python/ray/data/tests/test_actor_pool_map_operator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_actor_pool_map_operator.py b/python/ray/data/tests/test_actor_pool_map_operator.py index c451d60fb042..daab96975441 100644 --- a/python/ray/data/tests/test_actor_pool_map_operator.py +++ b/python/ray/data/tests/test_actor_pool_map_operator.py @@ -637,7 +637,9 @@ def _create_bundle_with_single_row(row): @pytest.mark.parametrize("min_rows_per_bundle", [2, None]) -def test_internal_input_queue_is_empty_after_early_completion(min_rows_per_bundle): +def test_internal_input_queue_is_empty_after_early_completion( + ray_start_regular_shared, min_rows_per_bundle +): data_context = ray.data.DataContext.get_current() op = ActorPoolMapOperator( map_transformer=MagicMock(),