Skip to content

Commit 1b087b3

Browse files
committed
[Data] Make test_dataset_throughput deterministic by increasing workload and applying tolerance
Signed-off-by: dancingactor <[email protected]>
1 parent ffa560a commit 1b087b3

File tree

2 files changed

+85
-21
lines changed

2 files changed

+85
-21
lines changed

python/ray/data/_internal/stats.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,6 +1230,18 @@ def to_string(
12301230

12311231
return out
12321232

1233+
@property
1234+
def num_rows_per_s(self) -> float:
1235+
"""Calculates the throughput in rows per second for the entire dataset."""
1236+
output_num_rows = (
1237+
self.operators_stats[-1].output_num_rows if self.operators_stats else 0
1238+
)
1239+
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
1240+
wall_time = self.get_total_wall_time()
1241+
if not total_num_out_rows or not wall_time:
1242+
return 0.0
1243+
return total_num_out_rows / wall_time
1244+
12331245
@staticmethod
12341246
def _collect_dataset_stats_summaries(
12351247
curr: "DatasetStatsSummary",
@@ -1378,6 +1390,20 @@ class OperatorStatsSummary:
13781390
node_count: Optional[Dict[str, float]] = None
13791391
task_rows: Optional[Dict[str, float]] = None
13801392

1393+
@property
1394+
def num_rows_per_s(self) -> float:
1395+
"""Calculates the overall throughput in rows per second for this operator."""
1396+
if not self.output_num_rows or not self.time_total_s:
1397+
return 0.0
1398+
return self.output_num_rows["sum"] / self.time_total_s
1399+
1400+
@property
1401+
def num_rows_per_task_s(self) -> float:
1402+
"""Calculates the estimated single-task throughput in rows per second."""
1403+
if not self.output_num_rows or not self.wall_time or not self.wall_time["sum"]:
1404+
return 0.0
1405+
return self.output_num_rows["sum"] / self.wall_time["sum"]
1406+
13811407
@classmethod
13821408
def from_block_metadata(
13831409
cls,

python/ray/data/tests/test_stats.py

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,35 +1744,73 @@ def test_task_duration_stats():
17441744
) # ddof=1 for sample standard deviation
17451745

17461746

1747-
# NOTE: All tests above share a Ray cluster, while the tests below do not. These
1748-
# tests should only be carefully reordered to retain this invariant!
1747+
def test_dataset_throughput_calculation(ray_start_regular_shared):
1748+
"""Test throughput calculations using mock block stats."""
1749+
from ray.data._internal.stats import DatasetStats
1750+
from ray.data.block import BlockExecStats, BlockStats
1751+
1752+
def create_block_stats(start_time, end_time, num_rows):
1753+
exec_stats = BlockExecStats()
1754+
exec_stats.start_time_s = start_time
1755+
exec_stats.end_time_s = end_time
1756+
exec_stats.wall_time_s = end_time - start_time
1757+
exec_stats.cpu_time_s = exec_stats.wall_time_s
1758+
return BlockStats(num_rows=num_rows, size_bytes=None, exec_stats=exec_stats)
1759+
1760+
blocks = [
1761+
create_block_stats(0.0, 2.0, 100),
1762+
create_block_stats(0.5, 2.5, 100),
1763+
create_block_stats(1.0, 3.0, 100),
1764+
]
17491765

1766+
stats = DatasetStats(metadata={"Map": blocks}, parent=None)
1767+
summary = stats.to_summary()
17501768

1751-
def test_dataset_throughput(shutdown_only):
1752-
ray.shutdown()
1753-
ray.init(num_cpus=2)
1769+
# Throughput: total rows / total execution duration
1770+
# Total rows = 300
1771+
# Duration = max end_time - min start_time = 3.0s
1772+
# 300 rows / 3s = 100 rows/s
1773+
assert summary.num_rows_per_s == 100
17541774

1755-
f = dummy_map_batches_sleep(0.01)
1756-
ds = ray.data.range(100).map(f).materialize().map(f).materialize()
17571775

1758-
operator_pattern = re.compile(
1759-
r"Operator (\d+).*?\* Operator throughput:\s*.*?\* Ray Data throughput: (\d+\.\d+) rows/s.*?\* Estimated single node throughput: (\d+\.\d+) rows/s",
1760-
re.DOTALL,
1761-
)
1776+
def test_operator_throughput_calculation(ray_start_regular_shared):
1777+
"""Test operator throughput calculations using mock BlockStats."""
1778+
from ray.data._internal.stats import OperatorStatsSummary
1779+
from ray.data.block import BlockExecStats, BlockStats
17621780

1763-
# Ray data throughput should always be better than single node throughput for
1764-
# multi-cpu case.
1765-
for match in operator_pattern.findall(ds.stats()):
1766-
assert float(match[1]) >= float(match[2])
1781+
def create_block_stats(start_time, end_time, num_rows, task_idx):
1782+
exec_stats = BlockExecStats()
1783+
exec_stats.start_time_s = start_time
1784+
exec_stats.end_time_s = end_time
1785+
exec_stats.wall_time_s = end_time - start_time
1786+
exec_stats.cpu_time_s = exec_stats.wall_time_s
1787+
exec_stats.task_idx = task_idx
17671788

1768-
# Pattern to match dataset throughput
1769-
dataset_pattern = re.compile(
1770-
r"Dataset throughput:.*?Ray Data throughput: (\d+\.\d+) rows/s.*?Estimated single node throughput: (\d+\.\d+) rows/s", # noqa: E501
1771-
re.DOTALL,
1789+
return BlockStats(num_rows=num_rows, size_bytes=None, exec_stats=exec_stats)
1790+
1791+
blocks = [
1792+
create_block_stats(0.0, 2.0, 100, 1),
1793+
create_block_stats(0.0, 2.0, 100, 2),
1794+
]
1795+
1796+
summary = OperatorStatsSummary.from_block_metadata(
1797+
operator_name="MockOperator",
1798+
block_stats=blocks,
1799+
is_sub_operator=False,
17721800
)
17731801

1774-
dataset_match = dataset_pattern.search(ds.stats())
1775-
assert float(dataset_match[1]) >= float(dataset_match[2])
1802+
# Total rows = 200
1803+
# Total operator wall time (from earliest start to latest end) = 2.0s
1804+
# Sum of individual task wall times = 2.0s + 2.0s = 4.0s
1805+
# Overall throughput: Total rows / Total operator wall time
1806+
assert summary.num_rows_per_s == 200 / (2.0 - 0.0)
1807+
1808+
# Estimated single task throughput: Total rows / Sum of individual task wall times`
1809+
assert summary.num_rows_per_task_s == 200 / (2.0 + 2.0)
1810+
1811+
1812+
# NOTE: All tests above share a Ray cluster, while the tests below do not. These
1813+
# tests should only be carefully reordered to retain this invariant!
17761814

17771815

17781816
def test_individual_operator_num_rows(shutdown_only):

0 commit comments

Comments
 (0)