Skip to content

Conversation

@dancingactor
Copy link
Contributor

@dancingactor dancingactor commented Nov 17, 2025

This PR makes three improvements to Ray Data's throughput statistics:

  1. Makes test_dataset_throughput deterministic: The original test was flaky because it relied on actual task
    execution timing. This PR rewrites it as unit tests (test_dataset_throughput_calculation and
    test_operator_throughput_calculation) using mocked BlockStats objects, making the tests fast and reliable.

  2. Removes "Estimated single node throughput" from Dataset-level stats: This metric was misleading at the
    dataset level since it summed wall times across all operators, which doesn't accurately represent single-node
    performance. The "Ray Data throughput" metric (total rows / total wall time) remains and provides the meaningful
    dataset-level throughput.

  3. Renames "Estimated single node throughput" to "Estimated single task throughput": At the operator level,
    this metric divides total rows by the sum of task wall times. The new name more accurately reflects what it
    measures—the throughput if all work were done by a single task serially.

@dancingactor dancingactor requested a review from a team as a code owner November 17, 2025 04:10
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request aims to make test_dataset_throughput deterministic by increasing the workload and introducing a tolerance for the throughput assertions. The changes look good and should help improve the test's stability. I've added a couple of minor style suggestions to align a new variable name with PEP 8 conventions.

@dancingactor dancingactor force-pushed the master branch 2 times, most recently from 08f8c78 to 7f86e78 Compare November 17, 2025 05:17
ray.init(num_cpus=2)

f = dummy_map_batches_sleep(0.01)
f = dummy_map_batches_sleep(0.03)
Copy link
Contributor Author

@dancingactor dancingactor Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shorter sleep time is better because it reduces the execution time. However, we choose 0.03 instead of 0.02 because using 0.02 resulting in 1 failure during 20 test runs

@dancingactor
Copy link
Contributor Author

@owenowenisme @bveeramani PTAL, thanks!

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR decreases the likeliness that this test fails, but ultimately, the test still relies on nondeterministic timing. It's also brittle because it uses regex that can break with minor formatting changes.

Rather than adjusting the parameters, could you rewrite this test as a unit test?

Separately, I realized that the "per node" throughputs actually represent the per-task throughput. Based on this, I think we should:

  1. Remove the "per node throughput" for the "Dataset throughput" section, because the average per-task throughput across all operators isn't really useful, and
  2. Rename "per node throughput" to "per task throughput" in the "Operator throughput" sections

The tests could look something like this:

def test_dataset_throughput_calculation():
    """Test throughput calculations using mock block stats."""
    from ray.data._internal.stats import DatasetStats
    from ray.data.block import BlockStats, BlockExecStats

    # Helper to create minimal block stats with only timing fields
    def create_block_stats(start_time, end_time, num_rows):
        exec_stats = BlockExecStats()
        exec_stats.start_time_s = start_time
        exec_stats.end_time_s = end_time
        exec_stats.wall_time_s = end_time - start_time

        return BlockStats(
            num_rows=num_rows,
            size_bytes=None,
            exec_stats=exec_stats
        )

    # Simulate 3 overlapping blocks
    blocks = [
        create_block_stats(0.0, 2.0, 100),
        create_block_stats(0.5, 2.5, 100),
        create_block_stats(1.0, 3.0, 100),
    ]

    stats = DatasetStats(metadata={"Map": blocks}, parent=None)
    summary = stats.to_summary()

    # Throughput: total rows / total execution duration
    # Total rows = 300
    # Duration = max end_time - min start_time = 3.0s
    # 300 rows / 3s = 100 rows/s
    # TODO: You'll need to expose this as a property so that it's testable.
    assert summary.num_rows_per_s == 100

def test_operator_throughput_calculation():
    ...  # A similar unit test. You might need to do some refactoring.

    # summary is a OperatorStatsSummary here, not DatasetStatsSummary
    # TODO: You'll need to similarly expose this property.
    assert summary.num_rows_per_s == 100
    assert summary.num_rows_per_task_s == 100

@bveeramani
Copy link
Member

@dancingactor lemme know if you have any questions.

@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Nov 17, 2025
@dancingactor
Copy link
Contributor Author

dancingactor commented Nov 17, 2025

Thanks for your detailed feedback! I have two questions:

1.

My understanding is that we should remove the original test_dataset_throughput performance test, and instead add two unit tests, test_dataset_throughput_calculation and test_operator_throughput_calculation, to verify the correctness of the dataset and operator throughput calculations.

2.

Separately, I realized that the "per node" throughputs actually represent the per-task throughput. Based on this, I think we should:

  1. Remove the "per node throughput" for the "Dataset throughput" section, because the average per-task throughput across all operators isn't really useful, and
  2. Rename "per node throughput" to "per task throughput" in the "Operator throughput" sections

May I confirm that this means we should modify the current ds.stats() output as follow

Operator 1 Map(f): 4 tasks executed, 4 blocks produced in 2.23s                                                                 
 ...                                                                                                                             
* Operator throughput:                                                                                                          
        * Total input num rows: 0 rows                                                                                          
        * Total output num rows: 100 rows                                                                                       
        * Ray Data throughput: 44.8881328745759 rows/s                                               
        * Estimated single node throughput: 32.05117589203472 rows/s    <-- change node to task

Dataset throughput:
        * Ray Data throughput: 24.66899248263124 rows/s
        * Estimated single node throughput: 16.076964501040045 rows/s.  <-- remove this line

@bveeramani
Copy link
Member

@dancingactor that's right!

@dancingactor
Copy link
Contributor Author

dancingactor commented Nov 18, 2025

Just to confirm, I need to do following things in this PR

  1. Remove test_dataset_throughput test, add test_dataset_throughput_calculation and test_operator_throughput_calculation tests
  2. Modify stats.py
  3. Modify other tests in test_stats.py that are related to the change in stats.py

@bveeramani
Copy link
Member

Just to confirm, I need to do following things in this PR

  1. Remove test_dataset_throughput test, add test_dataset_throughput_calculation and test_operator_throughput_calculation tests
  2. Modify stats.py
  3. Modify other tests in test_stats.py that are related to the change in stats.py

That sounds right.

One note of warning -- test_stats.py is extremely brittle!

@bveeramani
Copy link
Member

Hey @dancingactor, just following up here. Lemme know if I can provide any info or help to move this along!

@dancingactor
Copy link
Contributor Author

Hi @bveeramani, since I am new to ray, I spend some time understanding the context and the codebase. I almost completed the test_dataset_throughput and test_operator_throughput_calculation. and will work on ds.stats() output very soon.

@dancingactor dancingactor force-pushed the master branch 2 times, most recently from 6f974a2 to fabe20e Compare November 20, 2025 14:17
@dancingactor
Copy link
Contributor Author

dancingactor commented Nov 20, 2025

Hi @bveeramani, could you please advise on how to correctly test the new test_stats.py? 🙏

Currently I try to directly execute pytest /ray/python/ray/data/tests/test_stats.py I run into an error during environment setup. The error message is like

2025-11-21 00:50:44,720 INFO worker.py:2023 -- Started a local Ray instance.
                                                                                                                        6% ▋         

―――――――――――――――――――――――――――――――――――― ERROR at setup of test_large_args_scheduling_strategy[True] ――――――――――――――――――――――――――――――――――――

request = <SubRequest 'ray_start_regular_shared' for <Function test_streaming_split_stats>>

    @pytest.fixture(scope="module")
    def ray_start_regular_shared(request):
        param = getattr(request, "param", {})
>       with _ray_start(**param) as res:

python/ray/tests/conftest.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/contextlib.py:117: in __enter__
    return next(self.gen)
python/ray/tests/conftest.py:547: in _ray_start
    address_info = ray.init("local", **init_kwargs)
python/ray/_private/client_mode_hook.py:104: in wrapper
    return func(*args, **kwargs)
python/ray/_private/worker.py:2025: in init
    connect(
python/ray/_private/worker.py:1163: in wrapper
    return func(*args, **kwargs)
python/ray/_private/worker.py:2662: in connect
    worker.core_worker = ray._raylet.CoreWorker(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   self._gc_thread = PythonGCThread()
E   AttributeError: module 'ray._private.ray_constants' has no attribute 'RAY_GC_MIN_COLLECT_INTERVAL'

python/ray/_raylet.pyx:2709: AttributeError

@bveeramani
Copy link
Member

Ah, this sounds like your Ray Core version is out-of-date.

Are you building Ray Core from source, or using the setup-dev.py script? I think you might need to either rebuild Ray (if building from source) or reinstasll the Ray nightly wheel (if using setup-dev.py)

@dancingactor
Copy link
Contributor Author

Thanks! I will try the setup-dev.py approach

@bveeramani
Copy link
Member

Thanks! I will try the setup-dev.py approach

Awesome! Lemme know if you run into any problems. Happy to help you out

@dancingactor
Copy link
Contributor Author

dancingactor commented Nov 23, 2025

Hi @bveeramani,
I have tested the modified code works, PTAL

tests % pytest /Users/ryanchen/github/ray/python/ray/data/tests/test_stats.py         
Test session starts (platform: darwin, Python 3.10.19, pytest 7.4.4, pytest-sugar 0.9.5)
rootdir: /Users/ryanchen/github/ray
configfile: pytest.ini
plugins: docker-tools-3.1.3, sphinx-0.5.1.dev0, forked-1.4.0, anyio-4.11.0, asyncio-0.17.2, sugar-0.9.5, timeout-2.1.0, shutil-1.8.1, lazy-fixtures-1.1.2, rerunfailures-11.1.2, pytest_httpserver-1.1.3, virtualenv-1.8.1, mock-3.14.0, aiohttp-1.1.0
asyncio: mode=auto
timeout: 180.0s
timeout method: signal
timeout func_only: False
collecting ... 
 python/ray/data/tests/test_stats.py ss✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓s✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓                                                                100% ██████████

Results (729.00s):
      44 passed
       3 skipped

@dancingactor dancingactor force-pushed the master branch 2 times, most recently from 7f7199b to 7efd4f6 Compare November 24, 2025 04:30
@dancingactor dancingactor force-pushed the master branch 3 times, most recently from 963260c to da9e36b Compare November 24, 2025 06:58
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Left a few comments

out += "\nDataset memory:\n"
out += "* Spilled to disk: {}MB\n".format(dataset_mb_spilled)

# For throughput, we compute both an observed Ray Data dataset throughput
Copy link
Contributor Author

@dancingactor dancingactor Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comments were moved to https://github.com/ray-project/ray/pull/58693/files#diff-4dba40d789c60bfba4ae769f109b39979aa7d6977390329e7e2bb0e666569009R1221-R1226

the comment for "estimated single node" was removed since we removed this part from class Dataset

node_count_stats["count"],
)
if output_num_rows_stats and self.time_total_s and wall_time_stats:
# For throughput, we compute both an observed Ray Data operator throughput
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

…as unit tests

2. Remove the "per node throughput" for the "Dataset throughput" section
3. Rename "per node throughput" to "per task throughput" in the "Operator throughput" sections

Signed-off-by: dancingactor <[email protected]>
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 🚢

Signed-off-by: Balaji Veeramani <[email protected]>
@bveeramani bveeramani changed the title [Data] Make test_dataset_throughput deterministic by increasing workload and applying tolerance [Data] Make test_dataset_throughput deterministic and refactor throughput stats Nov 26, 2025
@bveeramani bveeramani enabled auto-merge (squash) November 26, 2025 07:24
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Nov 26, 2025
@dancingactor
Copy link
Contributor Author

Thanks! Really appreciate your time and guidance for this issue!

@bveeramani bveeramani merged commit ec254d0 into ray-project:master Nov 26, 2025
7 of 8 checks passed
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…oughput stats (ray-project#58693)

This PR makes three improvements to Ray Data's throughput statistics:

1. **Makes `test_dataset_throughput` deterministic**: The original test
was flaky because it relied on actual task
execution timing. This PR rewrites it as unit tests
(`test_dataset_throughput_calculation` and
`test_operator_throughput_calculation`) using mocked `BlockStats`
objects, making the tests fast and reliable.

2. **Removes "Estimated single node throughput" from Dataset-level
stats**: This metric was misleading at the
dataset level since it summed wall times across all operators, which
doesn't accurately represent single-node
performance. The "Ray Data throughput" metric (total rows / total wall
time) remains and provides the meaningful
  dataset-level throughput.

3. **Renames "Estimated single node throughput" to "Estimated single
task throughput"**: At the operator level,
this metric divides total rows by the sum of task wall times. The new
name more accurately reflects what it
measures—the throughput if all work were done by a single task serially.

---------

Signed-off-by: dancingactor <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Co-authored-by: Balaji Veeramani <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants