Skip to content

Commit ec254d0

Browse files
[Data] Make test_dataset_throughput deterministic and refactor throughput stats (#58693)
This PR makes three improvements to Ray Data's throughput statistics: 1. **Makes `test_dataset_throughput` deterministic**: The original test was flaky because it relied on actual task execution timing. This PR rewrites it as unit tests (`test_dataset_throughput_calculation` and `test_operator_throughput_calculation`) using mocked `BlockStats` objects, making the tests fast and reliable. 2. **Removes "Estimated single node throughput" from Dataset-level stats**: This metric was misleading at the dataset level since it summed wall times across all operators, which doesn't accurately represent single-node performance. The "Ray Data throughput" metric (total rows / total wall time) remains and provides the meaningful dataset-level throughput. 3. **Renames "Estimated single node throughput" to "Estimated single task throughput"**: At the operator level, this metric divides total rows by the sum of task wall times. The new name more accurately reflects what it measures—the throughput if all work were done by a single task serially. --------- Signed-off-by: dancingactor <[email protected]> Signed-off-by: Balaji Veeramani <[email protected]> Co-authored-by: Balaji Veeramani <[email protected]>
1 parent 88bc8ac commit ec254d0

File tree

2 files changed

+120
-95
lines changed

2 files changed

+120
-95
lines changed

python/ray/data/_internal/stats.py

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,42 +1202,33 @@ def to_string(
12021202
out += "\nDataset memory:\n"
12031203
out += "* Spilled to disk: {}MB\n".format(dataset_mb_spilled)
12041204

1205-
# For throughput, we compute both an observed Ray Data dataset throughput
1206-
# and an estimated single node dataset throughput.
1207-
1208-
# The observed dataset throughput is computed by dividing the total number
1209-
# of rows produced by the total wall time of the dataset (i.e. from start to
1210-
# finish how long did the dataset take to be processed). With the recursive
1211-
# nature of the DatasetStatsSummary, we use get_total_wall_time to determine
1212-
# the total wall time (this finds the difference between the earliest start
1213-
# and latest end for any block in any operator).
1214-
1215-
# The estimated single node dataset throughput is computed by dividing the
1216-
# total number of rows produced the sum of the wall times across all blocks
1217-
# of all operators. This assumes that on a single node the work done would
1218-
# be equivalent, with no concurrency.
1219-
output_num_rows = self.operators_stats[-1].output_num_rows
1220-
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
1221-
wall_time = self.get_total_wall_time()
1222-
total_time_all_blocks = self.get_total_time_all_blocks()
1223-
if total_num_out_rows and wall_time and total_time_all_blocks:
1205+
if self.num_rows_per_s:
12241206
out += "\n"
12251207
out += "Dataset throughput:\n"
1226-
out += (
1227-
"\t* Ray Data throughput:"
1228-
f" {total_num_out_rows / wall_time} "
1229-
"rows/s\n"
1230-
)
1231-
out += (
1232-
"\t* Estimated single node throughput:"
1233-
f" {total_num_out_rows / total_time_all_blocks} "
1234-
"rows/s\n"
1235-
)
1208+
out += f"\t* Ray Data throughput: {self.num_rows_per_s} rows/s\n"
12361209
if verbose_stats_logs and add_global_stats:
12371210
out += "\n" + self.runtime_metrics()
12381211

12391212
return out
12401213

1214+
@property
1215+
def num_rows_per_s(self) -> float:
1216+
"""Calculates the throughput in rows per second for the entire dataset."""
1217+
# The observed dataset throughput is computed by dividing the total number
1218+
# of rows produced by the total wall time of the dataset (i.e. from start to
1219+
# finish how long did the dataset take to be processed). With the recursive
1220+
# nature of the DatasetStatsSummary, we use get_total_wall_time to determine
1221+
# the total wall time (this finds the difference between the earliest start
1222+
# and latest end for any block in any operator).
1223+
output_num_rows = (
1224+
self.operators_stats[-1].output_num_rows if self.operators_stats else 0
1225+
)
1226+
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
1227+
wall_time = self.get_total_wall_time()
1228+
if not total_num_out_rows or not wall_time:
1229+
return 0.0
1230+
return total_num_out_rows / wall_time
1231+
12411232
@staticmethod
12421233
def _collect_dataset_stats_summaries(
12431234
curr: "DatasetStatsSummary",
@@ -1386,6 +1377,26 @@ class OperatorStatsSummary:
13861377
node_count: Optional[Dict[str, float]] = None
13871378
task_rows: Optional[Dict[str, float]] = None
13881379

1380+
@property
1381+
def num_rows_per_s(self) -> float:
1382+
# The observed Ray Data operator throughput is computed by dividing the
1383+
# total number of rows produced by the wall time of the operator,
1384+
# time_total_s.
1385+
if not self.output_num_rows or not self.time_total_s:
1386+
return 0.0
1387+
return self.output_num_rows["sum"] / self.time_total_s
1388+
1389+
@property
1390+
def num_rows_per_task_s(self) -> float:
1391+
"""Calculates the estimated single-task throughput in rows per second."""
1392+
# The estimated single task operator throughput is computed by dividing the
1393+
# total number of rows produced by the sum of the wall times across all
1394+
# blocks of the operator. This assumes that on a single task the work done
1395+
# would be equivalent, with no concurrency.
1396+
if not self.output_num_rows or not self.wall_time or not self.wall_time["sum"]:
1397+
return 0.0
1398+
return self.output_num_rows["sum"] / self.wall_time["sum"]
1399+
13891400
@classmethod
13901401
def from_block_metadata(
13911402
cls,
@@ -1630,18 +1641,7 @@ def __str__(self) -> str:
16301641
node_count_stats["mean"],
16311642
node_count_stats["count"],
16321643
)
1633-
if output_num_rows_stats and self.time_total_s and wall_time_stats:
1634-
# For throughput, we compute both an observed Ray Data operator throughput
1635-
# and an estimated single node operator throughput.
1636-
1637-
# The observed Ray Data operator throughput is computed by dividing the
1638-
# total number of rows produced by the wall time of the operator,
1639-
# time_total_s.
1640-
1641-
# The estimated single node operator throughput is computed by dividing the
1642-
# total number of rows produced by the sum of the wall times across all
1643-
# blocks of the operator. This assumes that on a single node the work done
1644-
# would be equivalent, with no concurrency.
1644+
if self.num_rows_per_s and self.num_rows_per_task_s:
16451645
total_num_in_rows = (
16461646
self.total_input_num_rows if self.total_input_num_rows else 0
16471647
)
@@ -1656,12 +1656,12 @@ def __str__(self) -> str:
16561656
)
16571657
out += (
16581658
indent + "\t* Ray Data throughput:"
1659-
f" {total_num_out_rows / self.time_total_s} "
1659+
f" {self.num_rows_per_s} "
16601660
"rows/s\n"
16611661
)
16621662
out += (
1663-
indent + "\t* Estimated single node throughput:"
1664-
f" {total_num_out_rows / wall_time_stats['sum']} "
1663+
indent + "\t* Estimated single task throughput:"
1664+
f" {self.num_rows_per_task_s} "
16651665
"rows/s\n"
16661666
)
16671667
return out

0 commit comments

Comments
 (0)