diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index 486842181127..86e077d9f48b 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -401,6 +401,18 @@ def all_inputs_done(self): "configuring `override_num_blocks` earlier in the pipeline." ) + def clear_internal_input_queue(self) -> None: + """Clear internal input queues for the actor-pool map operator. + + In addition to clearing the base class' internal queues, this method clears + the local bundle queue used to stage input bundles for actors. + """ + super().clear_internal_input_queue() + + while self._bundle_queue.has_next(): + bundle = self._bundle_queue.get_next() + self._metrics.on_input_dequeued(bundle) + def _do_shutdown(self, force: bool = False): self._actor_pool.shutdown(force=force) # NOTE: It's critical for Actor Pool to release actors before calling into diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index d2c628dbcad6..21604d948340 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -208,6 +208,7 @@ def internal_output_queue_num_bytes(self) -> int: def clear_internal_input_queue(self) -> None: """Clear internal input queue (block ref bundler).""" + self._block_ref_bundler.done_adding_bundles() while self._block_ref_bundler.has_bundle(): (input_bundles, _) = self._block_ref_bundler.get_next_bundle() for input_bundle in input_bundles: diff --git a/python/ray/data/tests/test_actor_pool_map_operator.py b/python/ray/data/tests/test_actor_pool_map_operator.py index 53da5248784e..daab96975441 100644 --- a/python/ray/data/tests/test_actor_pool_map_operator.py +++ b/python/ray/data/tests/test_actor_pool_map_operator.py @@ -8,6 +8,7 @@ from typing import Any, Callable, Dict, Optional, Tuple from unittest.mock import MagicMock +import pyarrow as pa import pytest from freezegun import freeze_time @@ -39,7 +40,7 @@ update_operator_states, ) from ray.data._internal.execution.util import make_ref_bundles -from ray.data.block import Block, BlockMetadata +from ray.data.block import Block, BlockAccessor, BlockMetadata from ray.tests.conftest import * # noqa from ray.types import ObjectRef @@ -627,6 +628,38 @@ def test_setting_initial_size_for_actor_pool(): ray.shutdown() +def _create_bundle_with_single_row(row): + block = pa.Table.from_pylist([row]) + block_ref = ray.put(block) + metadata = BlockAccessor.for_block(block).get_metadata() + schema = BlockAccessor.for_block(block).schema() + return RefBundle([(block_ref, metadata)], owns_blocks=False, schema=schema) + + +@pytest.mark.parametrize("min_rows_per_bundle", [2, None]) +def test_internal_input_queue_is_empty_after_early_completion( + ray_start_regular_shared, min_rows_per_bundle +): + data_context = ray.data.DataContext.get_current() + op = ActorPoolMapOperator( + map_transformer=MagicMock(), + input_op=InputDataBuffer(data_context, input_data=MagicMock()), + data_context=data_context, + compute_strategy=ray.data.ActorPoolStrategy(size=1), + min_rows_per_bundle=min_rows_per_bundle, + ) + op.start(ExecutionOptions()) + + ref_bundle = _create_bundle_with_single_row({"id": 0}) + op.add_input(ref_bundle, 0) + + op.mark_execution_finished() + + assert ( + op.internal_input_queue_num_blocks() == 0 + ), op.internal_input_queue_num_blocks() + + def test_min_max_resource_requirements(restore_data_context): data_context = ray.data.DataContext.get_current() op = ActorPoolMapOperator(