Skip to content

Commit fabe20e

Browse files
committed
[Data] Make test_dataset_throughput deterministic by increasing workload and applying tolerance
Signed-off-by: dancingactor <[email protected]>
1 parent f478f50 commit fabe20e

File tree

2 files changed

+79
-21
lines changed

2 files changed

+79
-21
lines changed

python/ray/data/_internal/stats.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,6 +1219,16 @@ def to_string(
12191219
out += "\n" + self.runtime_metrics()
12201220

12211221
return out
1222+
1223+
@property
1224+
def num_rows_per_s(self) -> float:
1225+
"""Calculates the throughput in rows per second for the entire dataset."""
1226+
output_num_rows = self.operators_stats[-1].output_num_rows
1227+
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
1228+
wall_time = self.get_total_wall_time()
1229+
if not total_num_out_rows or not wall_time:
1230+
return 0.0
1231+
return total_num_out_rows / wall_time
12221232

12231233
@staticmethod
12241234
def _collect_dataset_stats_summaries(
@@ -1368,6 +1378,20 @@ class OperatorStatsSummary:
13681378
node_count: Optional[Dict[str, float]] = None
13691379
task_rows: Optional[Dict[str, float]] = None
13701380

1381+
@property
1382+
def num_rows_per_s(self) -> float:
1383+
"""Calculates the overall throughput in rows per second for this operator."""
1384+
if not self.output_num_rows or not self.time_total_s:
1385+
return 0.0
1386+
return self.output_num_rows["sum"] / self.time_total_s
1387+
1388+
@property
1389+
def num_rows_per_task_s(self) -> float:
1390+
"""Calculates the estimated single-task throughput in rows per second."""
1391+
if not self.output_num_rows or not self.wall_time:
1392+
return 0.0
1393+
return self.output_num_rows["sum"] / self.wall_time["sum"]
1394+
13711395
@classmethod
13721396
def from_block_metadata(
13731397
cls,

python/ray/data/tests/test_stats.py

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1743,37 +1743,71 @@ def test_task_duration_stats():
17431743
durations, ddof=1
17441744
) # ddof=1 for sample standard deviation
17451745

1746+
def test_dataset_throughput_calculation(ray_start_regular_shared):
1747+
"""Test throughput calculations using mock block stats."""
1748+
from ray.data._internal.stats import DatasetStats
1749+
from ray.data.block import BlockStats, BlockExecStats
1750+
1751+
def create_block_stats(start_time, end_time, num_rows):
1752+
exec_stats = BlockExecStats()
1753+
exec_stats.start_time_s = start_time
1754+
exec_stats.end_time_s = end_time
1755+
exec_stats.wall_time_s = end_time - start_time
1756+
exec_stats.cpu_time_s = exec_stats.wall_time_s
1757+
return BlockStats(num_rows=num_rows, size_bytes=None, exec_stats=exec_stats)
1758+
1759+
blocks = [
1760+
create_block_stats(0.0, 2.0, 100),
1761+
create_block_stats(0.5, 2.5, 100),
1762+
create_block_stats(1.0, 3.0, 100),
1763+
]
17461764

1747-
# NOTE: All tests above share a Ray cluster, while the tests below do not. These
1748-
# tests should only be carefully reordered to retain this invariant!
1765+
stats = DatasetStats(metadata={"Map": blocks}, parent=None)
1766+
summary = stats.to_summary()
17491767

1768+
# Throughput: total rows / total execution duration
1769+
# Total rows = 300
1770+
# Duration = max end_time - min start_time = 3.0s
1771+
# 300 rows / 3s = 100 rows/s
1772+
assert summary.num_rows_per_s == 100
17501773

1751-
def test_dataset_throughput(shutdown_only):
1752-
ray.shutdown()
1753-
ray.init(num_cpus=2)
17541774

1755-
f = dummy_map_batches_sleep(0.01)
1756-
ds = ray.data.range(100).map(f).materialize().map(f).materialize()
1775+
def test_operator_throughput_calculation(ray_start_regular_shared):
1776+
"""Test operator throughput calculations using mock BlockStats."""
1777+
from ray.data._internal.stats import OperatorStatsSummary
1778+
from ray.data.block import BlockStats, BlockExecStats
17571779

1758-
operator_pattern = re.compile(
1759-
r"Operator (\d+).*?\* Operator throughput:\s*.*?\* Ray Data throughput: (\d+\.\d+) rows/s.*?\* Estimated single node throughput: (\d+\.\d+) rows/s",
1760-
re.DOTALL,
1761-
)
1780+
def create_block_stats(start_time, end_time, num_rows):
1781+
exec_stats = BlockExecStats()
1782+
exec_stats.start_time_s = start_time
1783+
exec_stats.end_time_s = end_time
1784+
exec_stats.wall_time_s = end_time - start_time
1785+
exec_stats.cpu_time_s = exec_stats.wall_time_s
1786+
1787+
return BlockStats(num_rows=num_rows, size_bytes=None, exec_stats=exec_stats)
17621788

1763-
# Ray data throughput should always be better than single node throughput for
1764-
# multi-cpu case.
1765-
for match in operator_pattern.findall(ds.stats()):
1766-
assert float(match[1]) >= float(match[2])
1789+
blocks = [
1790+
create_block_stats(0.0, 2.0, 100),
1791+
create_block_stats(0.0, 2.0, 100),
1792+
]
17671793

1768-
# Pattern to match dataset throughput
1769-
dataset_pattern = re.compile(
1770-
r"Dataset throughput:.*?Ray Data throughput: (\d+\.\d+) rows/s.*?Estimated single node throughput: (\d+\.\d+) rows/s", # noqa: E501
1771-
re.DOTALL,
1794+
summary = OperatorStatsSummary.from_block_metadata(
1795+
operator_name="MockOperator",
1796+
block_stats=blocks,
1797+
is_sub_operator=False,
17721798
)
17731799

1774-
dataset_match = dataset_pattern.search(ds.stats())
1775-
assert float(dataset_match[1]) >= float(dataset_match[2])
1800+
# Total rows = 200
1801+
# Total operator wall time (from earliest start to latest end) = 2.0s
1802+
# Sum of individual task wall times = 2.0s + 2.0s = 4.0s
1803+
# Overall throughput: Total rows / Total operator wall time
1804+
assert summary.num_rows_per_s == 200 / (2.0 - 0.0)
17761805

1806+
# Estimated single task throughput: Total rows / Sum of individual task wall times`
1807+
assert summary.num_rows_per_task_s == 200 / (2.0 + 2.0)
1808+
1809+
# NOTE: All tests above share a Ray cluster, while the tests below do not. These
1810+
# tests should only be carefully reordered to retain this invariant!
17771811

17781812
def test_individual_operator_num_rows(shutdown_only):
17791813
# The input num rows of an individual operator should be the same as the output num rows of its parent operator.

0 commit comments

Comments
 (0)