Skip to content

Conversation

@iamjustinhsu
Copy link
Contributor

@iamjustinhsu iamjustinhsu commented Oct 1, 2025

Before:
https://github.com/user-attachments/assets/9db00f37-0c37-4e99-874a-a14481878e4a
In before, the progress bar won't update until the first tasks finishes.

~~After:
https://github.com/user-attachments/assets/99877a3f-7b52-4293-aae5-7702edfaabec

In After, the progress bar won't update until the first task generates output. If a task generates 10 blocks, we will update the progress bar while it's generating blocks, even if the task hasn't finished. Once the task finishes, we default back to the way it was before.

This is better because the very 1st progress bar update will occur sooner, and won't feel abrupt to the user.

Refractoring the progress bar estimates using known metrics.

Why are these changes needed?

Currently we use number of finished tasks. This is OK, but since we use streaming geneator, 1 task = thousands of blocks. This is troublesome for additional split factor (split blocks) in read parquet

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run pre-commit jobs to lint the changes in this PR. (pre-commit setup)
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

object to use injestion.
input_data: The list of bundles to output from this operator.
input_data_factory: The factory to get input data, if input_data is None.
num_output_blocks: The number of output blocks. If not specified, progress
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not being used anywhere

@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Oct 1, 2025
@iamjustinhsu iamjustinhsu marked this pull request as ready for review October 9, 2025 21:19
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner October 9, 2025 21:19
cursor[bot]

This comment was marked as outdated.

Signed-off-by: iamjustinhsu <[email protected]>
@iamjustinhsu iamjustinhsu marked this pull request as draft October 9, 2025 21:45
Signed-off-by: iamjustinhsu <[email protected]>
@iamjustinhsu iamjustinhsu marked this pull request as ready for review October 9, 2025 21:58
cursor[bot]

This comment was marked as outdated.

@ray-gardener ray-gardener bot added the data Ray Data-related issues label Oct 10, 2025
Signed-off-by: iamjustinhsu <[email protected]>
Signed-off-by: iamjustinhsu <[email protected]>
cursor[bot]

This comment was marked as outdated.

estimated_output_num_rows = round(
estimated_num_tasks
* metrics.rows_task_outputs_generated
* metrics.rows_outputs_of_finished_tasks
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iamjustinhsu I felt confused after reading the PR description about what the change is, and why it's an improvement. Could you update the description to make it clearer?

Is this understanding correct?

  • Before: Ray Data doesn't render a progress bar if no tasks have finished because it can't estimate how many rows each task will produce, and therefore can't estimate the total number of rows the operator will produce.
  • After: Ray Data uses the number of rows already outputted as an estimate of the total number of rows that will be produced by the operator.

And this is better because it appears smoother (?)

@bveeramani
Copy link
Member

@iamjustinhsu what's the code you used for the example recordings?

@iamjustinhsu
Copy link
Contributor Author

iamjustinhsu commented Oct 16, 2025

The code is above the sample recordings:

import time

def random_blocks(x):
    bits_to_allocate = 1 * 1024 * 1024
    for i in range(5):
        time.sleep(1)
        yield {'item': [0] * bits_to_allocate} # After the first yield, it the progress bar will update

ray.data.DataContext.get_current().target_max_block_size = 1 * 1024 * 1024
ray.data.range(100, override_num_blocks=2).map_batches(random_blocks, concurrency=1).materialize()

@bveeramani
Copy link
Member

Gotcha.

I'm worried that some users might feel confused if they see the progress bar stuck at 100% for a while before it abruptly updates to <100%. Might be clearer to show no progress bar than an inaccurate one, though I can understand the argument that always showing a progress bar might look nicer.

@alexeykudinkin what's your take?

@iamjustinhsu
Copy link
Contributor Author

@bveeramani for more context, this was mainly annoying because of ReadParquet->SplitBlocks(N), because you don't get any updates until after the 1st task finishes. If N is large (which I have seen to be in the several hundreds), nothing is updating since SplitBlocks is in a single task.

@bveeramani
Copy link
Member

@bveeramani for more context, this was mainly annoying because of ReadParquet->SplitBlocks(N), because you don't get any updates until after the 1st task finishes. If N is large (which I have seen to be in the several hundreds), nothing is updating since SplitBlocks is in a single task.

Yeah, this can happen whenever tasks produce multiple outputs, which can be especially common with something like split blocks

Comment on lines 420 to 425
self._estimated_num_output_bundles = (
self._metrics.num_task_outputs_generated
)
self._estimated_output_num_rows = (
self._metrics.rows_task_outputs_generated
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold on, this is what we estimate total # of rows/bundles to be not what has been gen'd so far, right?

Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_estimated_num_output_bundles and _estimated_output_num_rows estimate the total number. But as a crude heuristic, I wanted these to update sooner, rather than later. These variables are mainly used for user facing progress indicators (like in progress bar). After the 1st task finishes, the progress bar will behave as before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But these values don't make sense, right?

As reader of the code i'm scratching my head as we're now messing up their semantic

Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i understand, if the name of variable is named estimation, then semantics imply that it's not exact but a guess. Whether that guess is wrong, is also intrinsic for the current estimation. IMO a crude estimation is still better than None. Incidentally, I had originally thought to do something like this:

if self._metrics.num_tasks_finished == 0:
      estimated_num_tasks = (
            self.upstream_op_num_outputs
            / metrics.num_inputs_received
            * num_tasks_submitted
      )
      ratio = estimated_num_tasks / self._metrics.num_tasks_running
      self._estimated_num_output_bundles = (
                self._metrics.num_task_outputs_generated * ratio
      )
      self._estimated_output_num_rows = (
            self._metrics.rows_task_outputs_generated * ratio
     )

but at this point the estimation complexity outweighs it's temporal use when num_tasks_finished = 0.

Regardless, I'm gonna change the intent of the PR to refractor stuff instead, because there are still stuff that needs to change in this PR.

Signed-off-by: iamjustinhsu <[email protected]>
Signed-off-by: iamjustinhsu <[email protected]>
@iamjustinhsu iamjustinhsu changed the title [data] Refine estimate for total_num_rows in progress bars [data] Refractor progress bar clearer metrics Oct 23, 2025
Comment on lines +836 to +843
upstream_op_num_outputs / metrics.average_num_inputs_per_task
)

estimated_num_output_bundles = round(
estimated_num_tasks
* metrics.num_outputs_of_finished_tasks
/ metrics.num_tasks_finished
estimated_num_tasks * metrics.average_num_outputs_per_task
)
estimated_output_num_rows = round(
estimated_num_tasks
* metrics.rows_task_outputs_generated
/ metrics.num_tasks_finished
estimated_num_tasks * metrics.average_rows_outputs_per_task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these metrics are None, this code will raise a TypeError:

  • average_num_inputs_per_task
  • average_num_outputs_per_task
  • average_rows_outputs_per_task

This can't happen right now because we check num_tasks_finished > 0, and the three metrics aren't None when num_tasks_finished > 0. But, that's an implementation detail, and isn't guaranteed by their interface.

This code would be more robust if replaced this check:

        and metrics.num_inputs_received > 0
        and metrics.num_tasks_finished > 0

With explicit checks that the metrics aren't None.

Signed-off-by: iamjustinhsu <[email protected]>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/use-outputs-generated-for-pg branch from c85111d to 37bd144 Compare October 23, 2025 22:11
@alexeykudinkin alexeykudinkin merged commit 77a96d9 into ray-project:master Oct 24, 2025
6 checks passed
@iamjustinhsu iamjustinhsu deleted the jhsu/use-outputs-generated-for-pg branch October 24, 2025 19:40
xinyuangui2 pushed a commit to xinyuangui2/ray that referenced this pull request Oct 27, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

~~Before:~~

~~https://github.com/user-attachments/assets/9db00f37-0c37-4e99-874a-a14481878e4a~~
~~In before, the progress bar won't update until the first tasks
finishes.~~

~~After:

~~https://github.com/user-attachments/assets/99877a3f-7b52-4293-aae5-7702edfaabec~~

~~In After, the progress bar won't update until the first task generates
output. If a task generates 10 blocks, we will update the progress bar
while it's generating blocks, even if the task hasn't finished. Once the
task finishes, we default back to the way it was before.~~

~~This is better because the very 1st progress bar update will occur
sooner, and won't feel abrupt to the user.~~

Refractoring the progress bar estimates using known metrics.

## Why are these changes needed?
Currently we use number of finished tasks. This is OK, but since we use
streaming geneator, 1 task = thousands of blocks. This is troublesome
for additional split factor (split blocks) in read parquet
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <[email protected]>
Signed-off-by: xgui <[email protected]>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

~~Before:~~

~~https://github.com/user-attachments/assets/9db00f37-0c37-4e99-874a-a14481878e4a~~
~~In before, the progress bar won't update until the first tasks
finishes.~~

~~After:

~~https://github.com/user-attachments/assets/99877a3f-7b52-4293-aae5-7702edfaabec~~

~~In After, the progress bar won't update until the first task generates
output. If a task generates 10 blocks, we will update the progress bar
while it's generating blocks, even if the task hasn't finished. Once the
task finishes, we default back to the way it was before.~~

~~This is better because the very 1st progress bar update will occur
sooner, and won't feel abrupt to the user.~~

Refractoring the progress bar estimates using known metrics.

## Why are these changes needed?
Currently we use number of finished tasks. This is OK, but since we use
streaming geneator, 1 task = thousands of blocks. This is troublesome
for additional split factor (split blocks) in read parquet
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <[email protected]>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

~~Before:~~

~~https://github.com/user-attachments/assets/9db00f37-0c37-4e99-874a-a14481878e4a~~
~~In before, the progress bar won't update until the first tasks
finishes.~~

~~After:

~~https://github.com/user-attachments/assets/99877a3f-7b52-4293-aae5-7702edfaabec~~

~~In After, the progress bar won't update until the first task generates
output. If a task generates 10 blocks, we will update the progress bar
while it's generating blocks, even if the task hasn't finished. Once the
task finishes, we default back to the way it was before.~~

~~This is better because the very 1st progress bar update will occur
sooner, and won't feel abrupt to the user.~~

Refractoring the progress bar estimates using known metrics.

## Why are these changes needed?
Currently we use number of finished tasks. This is OK, but since we use
streaming geneator, 1 task = thousands of blocks. This is troublesome
for additional split factor (split blocks) in read parquet
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <[email protected]>
Signed-off-by: Aydin Abiar <[email protected]>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

~~Before:~~

~~https://github.com/user-attachments/assets/9db00f37-0c37-4e99-874a-a14481878e4a~~
~~In before, the progress bar won't update until the first tasks
finishes.~~

~~After:

~~https://github.com/user-attachments/assets/99877a3f-7b52-4293-aae5-7702edfaabec~~

~~In After, the progress bar won't update until the first task generates
output. If a task generates 10 blocks, we will update the progress bar
while it's generating blocks, even if the task hasn't finished. Once the
task finishes, we default back to the way it was before.~~

~~This is better because the very 1st progress bar update will occur
sooner, and won't feel abrupt to the user.~~

Refractoring the progress bar estimates using known metrics.

## Why are these changes needed?
Currently we use number of finished tasks. This is OK, but since we use
streaming geneator, 1 task = thousands of blocks. This is troublesome
for additional split factor (split blocks) in read parquet
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: iamjustinhsu <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants