Skip to content

Conversation

@400Ping
Copy link
Contributor

@400Ping 400Ping commented Nov 17, 2025

Description

One of the PyTorch examples intermittently fails with an error like this:

AssertionError: Expected Internal Input Queue for MapBatches(ResnetModel) to be empty, but found 1 bundles

This assertion can fail because of two bugs:

  1. We don't call done_adding_bundles before clearing the block ref bundler
  2. We don't clean the actor pool's bundle queue

This PR fixes those bugs and deflakes the test.

Related issues

Closes #58546

Additional information

@400Ping 400Ping requested a review from a team as a code owner November 17, 2025 04:31
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a flaky test by fixing a race condition in ActorPoolMapOperator. The change adds a call to _dispatch_tasks() in all_inputs_done() to ensure any queued bundles are processed when no more inputs are expected. This is a solid fix for the described problem. I've added one minor suggestion to clean up some redundant code in the same method for improved maintainability.

@400Ping
Copy link
Contributor Author

400Ping commented Nov 17, 2025

cc @owenowenisme

@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Nov 17, 2025
@iamjustinhsu
Copy link
Contributor

Hi @400Ping, thanks for your contribution! We really appreciate the effort.

I think @owenowenisme might be right here, all_inputs_done should already be called. My hunch is that BlockRefBundler class is not correctly finalizing the outputs. From here

            if (
                output_buffer_size < self._min_rows_per_bundle
                or output_buffer_size == 0
            ):

it looks like we should be doing

            if (
                output_buffer_size < self._min_rows_per_bundle
                or output_buffer_size == 0
                or self._finalized
            ):

so that there are no more remainders. Can you try that and report back?
cc: @bveeramani

@bveeramani
Copy link
Member

Hey @400Ping, would you mind helping me understand the root cause of the assertion error?

Also, did the repro script fail before the changes?

@400Ping
Copy link
Contributor Author

400Ping commented Nov 19, 2025

Hey @400Ping, would you mind helping me understand the root cause of the assertion error?

Also, did the repro script fail before the changes?

Ok, will try to find it.

@iamjustinhsu iamjustinhsu self-requested a review November 19, 2025 19:06
@bveeramani bveeramani self-assigned this Nov 19, 2025
output_buffer_size += bundle_size
else:
remainder = self._bundle_buffer[idx:]
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani this feels like this break should have been there in the beginning, right?

@400Ping 400Ping requested a review from iamjustinhsu November 22, 2025 03:36
@400Ping
Copy link
Contributor Author

400Ping commented Nov 23, 2025

cc @bveeramani PTAL

@400Ping
Copy link
Contributor Author

400Ping commented Nov 23, 2025

not very sure if this is the way to solve this.

if (
output_buffer_size < self._min_rows_per_bundle
or output_buffer_size == 0
or self._finalized
Copy link
Contributor

@iamjustinhsu iamjustinhsu Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani my impression is that when self._finalized=True (ie, when an operator is completed()), it is possible this for loop enters the else statement down below, populating remainders with non-empty ref bundles.

I also think the break statement is necessary too, otherwise, remainders is always being reassigned.

@bveeramani
Copy link
Member

@400Ping @iamjustinhsu I was able to create a minimal repro of this issue (at least, I think this is the same issue)

import ray


class Fn:

    def __call__(self, batch):
        return batch


ds = ray.data.range(100, override_num_blocks=100).map_batches(Fn, batch_size=10).limit(1)
for _ in ds.iter_internal_ref_bundles():
    pass
Traceback (most recent call last):                                                                                                                                      
  File "/Users/balaji/ray/1.py", line 12, in <module>
    for _ in ds.iter_internal_ref_bundles():                                                                                                                            
  File "/Users/balaji/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 34, in __next__
    return self.get_next()
  File "/Users/balaji/ray/python/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
    bundle = self._base_iterator.get_next(output_split_idx)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 786, in get_next
    bundle = state.get_output_blocking(output_split_idx)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 454, in get_output_blocking
    raise self._exception
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 356, in run
    continue_sched = self._scheduling_loop_step(self._topology)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 529, in _scheduling_loop_step
    self._validate_operator_queues_empty(op, state)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 571, in _validate_operator_queues_empty
    assert op.internal_input_queue_num_blocks() == 0, error_msg.format(
AssertionError: Expected Internal Input Queue for MapBatches(Fn) to be empty, but found 8 bundles

@iamjustinhsu would a reasonable fix be to just clear the internal queues when the operator is manually marked finished?

    def mark_execution_finished(self):
        # Discard remaining bundles in the internal bundle queue.
        self._bundle_queue.clear()

        # Discard remaining bundles in the block ref bundler.
        self._block_ref_bundler.done_adding_bundles()
        while self._block_ref_bundler.has_bundle():
            self._block_ref_bundler.get_next_bundle()

        super().mark_execution_finished()

@iamjustinhsu
Copy link
Contributor

@bveeramani oh i see now. We actually do this for for all InternalQueueOperatorMixin operators. However, I implemented this for MapOperator, thinking ActorPoolMapOperator uses the same underlying implementation. It actually doesn't, and contains an additional queue called _bundle_queue. Here is my PR for reference: #58441. What we should do is have logic for also clearing the self._bundle_queue in ActorPoolMapOperator. This can be done with having separate implementations for TaskPoolMapOperator and ActorPoolMapOperator

@bveeramani
Copy link
Member

Ah, okay.

@400Ping would you mind refactoring the PR or opening a new PR to do the following:

  • Delete MapOperator.clear_internal_output_queue and MapOperator.clear_internal_input_queue
  • Add TaskPoolMapOperator.clear_internal_output_queue and TaskPoolMapOperator.clear_internal_input_queue
  • Add ActorPoolMapOperator.clear_internal_output_queue and ActorPoolMapOperator.clear_internal_input_queue

I think this is all that we need to do to solve the flaky issue.

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementations for clear_internal_input_queue and clear_internal_output_queue LGTM.

@400Ping to keep the git history clear, would you mind reverting all of the changes unrelated to those methods (e.g., formatting, removing _inputs_done, type annotations)? Once we've reverted the unrelated changes, I'll approve the PR

@400Ping
Copy link
Contributor Author

400Ping commented Nov 26, 2025

The implementations for clear_internal_input_queue and clear_internal_output_queue LGTM.

@400Ping to keep the git history clear, would you mind reverting all of the changes unrelated to those methods (e.g., formatting, removing _inputs_done, type annotations)? Once we've reverted the unrelated changes, I'll approve the PR

Ok.

Signed-off-by: 400Ping <[email protected]>
@400Ping 400Ping force-pushed the data/fix-pytorch_resnet_batch_prediction-flaky branch from f47460e to 04c6b1d Compare November 26, 2025 06:39
@400Ping 400Ping requested a review from bveeramani November 26, 2025 06:41
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 🚢

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
@bveeramani bveeramani added the go add ONLY when ready to merge, run all tests label Nov 26, 2025
Signed-off-by: Balaji Veeramani <[email protected]>
@400Ping
Copy link
Contributor Author

400Ping commented Nov 26, 2025

Thanks for the fix!

@bveeramani bveeramani merged commit 6a14c93 into ray-project:master Nov 26, 2025
6 checks passed
@bveeramani
Copy link
Member

@400Ping Just merged! ty for the contribution!

@400Ping
Copy link
Contributor Author

400Ping commented Nov 26, 2025

@400Ping Just merged! ty for the contribution!

Thank you as well, I am a newbie in this area 😓.

@bveeramani bveeramani changed the title [Data][Flaky] pytorch_resnet_batch_prediction is flaky [Data][Flaky] Ensure ActorPoolMapOperator clears all queues on completion Nov 26, 2025
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…8694)

## Description
The test fails intermittently with an assertion error indicating that
the internal input queue for a MapBatches operator is not empty when
it's expected to be. This suggests a race condition or timing issue in
the streaming executor's queue management.

## Related issues
Closes ray-project#58546 

## Additional information

---------

Signed-off-by: 400Ping <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Co-authored-by: Balaji Veeramani <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data][Flaky] pytorch_resnet_batch_prediction is flaky

5 participants