Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,18 @@ def all_inputs_done(self):
"configuring `override_num_blocks` earlier in the pipeline."
)

def clear_internal_input_queue(self) -> None:
"""Clear internal input queues for the actor-pool map operator.

In addition to clearing the base class' internal queues, this method clears
the local bundle queue used to stage input bundles for actors.
"""
super().clear_internal_input_queue()

while self._bundle_queue.has_next():
bundle = self._bundle_queue.get_next()
self._metrics.on_input_dequeued(bundle)

def _do_shutdown(self, force: bool = False):
self._actor_pool.shutdown(force=force)
# NOTE: It's critical for Actor Pool to release actors before calling into
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def internal_output_queue_num_bytes(self) -> int:

def clear_internal_input_queue(self) -> None:
"""Clear internal input queue (block ref bundler)."""
self._block_ref_bundler.done_adding_bundles()
while self._block_ref_bundler.has_bundle():
(input_bundles, _) = self._block_ref_bundler.get_next_bundle()
for input_bundle in input_bundles:
Expand Down
35 changes: 34 additions & 1 deletion python/ray/data/tests/test_actor_pool_map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Callable, Dict, Optional, Tuple
from unittest.mock import MagicMock

import pyarrow as pa
import pytest
from freezegun import freeze_time

Expand Down Expand Up @@ -39,7 +40,7 @@
update_operator_states,
)
from ray.data._internal.execution.util import make_ref_bundles
from ray.data.block import Block, BlockMetadata
from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.tests.conftest import * # noqa
from ray.types import ObjectRef

Expand Down Expand Up @@ -627,6 +628,38 @@ def test_setting_initial_size_for_actor_pool():
ray.shutdown()


def _create_bundle_with_single_row(row):
block = pa.Table.from_pylist([row])
block_ref = ray.put(block)
metadata = BlockAccessor.for_block(block).get_metadata()
schema = BlockAccessor.for_block(block).schema()
return RefBundle([(block_ref, metadata)], owns_blocks=False, schema=schema)


@pytest.mark.parametrize("min_rows_per_bundle", [2, None])
def test_internal_input_queue_is_empty_after_early_completion(
ray_start_regular_shared, min_rows_per_bundle
):
data_context = ray.data.DataContext.get_current()
op = ActorPoolMapOperator(
map_transformer=MagicMock(),
input_op=InputDataBuffer(data_context, input_data=MagicMock()),
data_context=data_context,
compute_strategy=ray.data.ActorPoolStrategy(size=1),
min_rows_per_bundle=min_rows_per_bundle,
)
op.start(ExecutionOptions())

ref_bundle = _create_bundle_with_single_row({"id": 0})
op.add_input(ref_bundle, 0)

op.mark_execution_finished()

assert (
op.internal_input_queue_num_blocks() == 0
), op.internal_input_queue_num_blocks()


def test_min_max_resource_requirements(restore_data_context):
data_context = ray.data.DataContext.get_current()
op = ActorPoolMapOperator(
Expand Down