Skip to content

Commit 6a14c93

Browse files
400Pingbveeramani
andauthored
[Data][Flaky] pytorch_resnet_batch_prediction is flaky (#58694)
## Description The test fails intermittently with an assertion error indicating that the internal input queue for a MapBatches operator is not empty when it's expected to be. This suggests a race condition or timing issue in the streaming executor's queue management. ## Related issues Closes #58546 ## Additional information --------- Signed-off-by: 400Ping <[email protected]> Signed-off-by: Balaji Veeramani <[email protected]> Co-authored-by: Balaji Veeramani <[email protected]>
1 parent 2fbb0bd commit 6a14c93

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

python/ray/data/_internal/execution/operators/actor_pool_map_operator.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,18 @@ def all_inputs_done(self):
401401
"configuring `override_num_blocks` earlier in the pipeline."
402402
)
403403

404+
def clear_internal_input_queue(self) -> None:
405+
"""Clear internal input queues for the actor-pool map operator.
406+
407+
In addition to clearing the base class' internal queues, this method clears
408+
the local bundle queue used to stage input bundles for actors.
409+
"""
410+
super().clear_internal_input_queue()
411+
412+
while self._bundle_queue.has_next():
413+
bundle = self._bundle_queue.get_next()
414+
self._metrics.on_input_dequeued(bundle)
415+
404416
def _do_shutdown(self, force: bool = False):
405417
self._actor_pool.shutdown(force=force)
406418
# NOTE: It's critical for Actor Pool to release actors before calling into

python/ray/data/_internal/execution/operators/map_operator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def internal_output_queue_num_bytes(self) -> int:
208208

209209
def clear_internal_input_queue(self) -> None:
210210
"""Clear internal input queue (block ref bundler)."""
211+
self._block_ref_bundler.done_adding_bundles()
211212
while self._block_ref_bundler.has_bundle():
212213
(input_bundles, _) = self._block_ref_bundler.get_next_bundle()
213214
for input_bundle in input_bundles:

python/ray/data/tests/test_actor_pool_map_operator.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import Any, Callable, Dict, Optional, Tuple
99
from unittest.mock import MagicMock
1010

11+
import pyarrow as pa
1112
import pytest
1213
from freezegun import freeze_time
1314

@@ -39,7 +40,7 @@
3940
update_operator_states,
4041
)
4142
from ray.data._internal.execution.util import make_ref_bundles
42-
from ray.data.block import Block, BlockMetadata
43+
from ray.data.block import Block, BlockAccessor, BlockMetadata
4344
from ray.tests.conftest import * # noqa
4445
from ray.types import ObjectRef
4546

@@ -627,6 +628,38 @@ def test_setting_initial_size_for_actor_pool():
627628
ray.shutdown()
628629

629630

631+
def _create_bundle_with_single_row(row):
632+
block = pa.Table.from_pylist([row])
633+
block_ref = ray.put(block)
634+
metadata = BlockAccessor.for_block(block).get_metadata()
635+
schema = BlockAccessor.for_block(block).schema()
636+
return RefBundle([(block_ref, metadata)], owns_blocks=False, schema=schema)
637+
638+
639+
@pytest.mark.parametrize("min_rows_per_bundle", [2, None])
640+
def test_internal_input_queue_is_empty_after_early_completion(
641+
ray_start_regular_shared, min_rows_per_bundle
642+
):
643+
data_context = ray.data.DataContext.get_current()
644+
op = ActorPoolMapOperator(
645+
map_transformer=MagicMock(),
646+
input_op=InputDataBuffer(data_context, input_data=MagicMock()),
647+
data_context=data_context,
648+
compute_strategy=ray.data.ActorPoolStrategy(size=1),
649+
min_rows_per_bundle=min_rows_per_bundle,
650+
)
651+
op.start(ExecutionOptions())
652+
653+
ref_bundle = _create_bundle_with_single_row({"id": 0})
654+
op.add_input(ref_bundle, 0)
655+
656+
op.mark_execution_finished()
657+
658+
assert (
659+
op.internal_input_queue_num_blocks() == 0
660+
), op.internal_input_queue_num_blocks()
661+
662+
630663
def test_min_max_resource_requirements(restore_data_context):
631664
data_context = ray.data.DataContext.get_current()
632665
op = ActorPoolMapOperator(

0 commit comments

Comments
 (0)