Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 44 additions & 44 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,42 +1202,33 @@ def to_string(
out += "\nDataset memory:\n"
out += "* Spilled to disk: {}MB\n".format(dataset_mb_spilled)

# For throughput, we compute both an observed Ray Data dataset throughput
Copy link
Contributor Author

@dancingactor dancingactor Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comments were moved to https://github.com/ray-project/ray/pull/58693/files#diff-4dba40d789c60bfba4ae769f109b39979aa7d6977390329e7e2bb0e666569009R1221-R1226

the comment for "estimated single node" was removed since we removed this part from class Dataset

# and an estimated single node dataset throughput.

# The observed dataset throughput is computed by dividing the total number
# of rows produced by the total wall time of the dataset (i.e. from start to
# finish how long did the dataset take to be processed). With the recursive
# nature of the DatasetStatsSummary, we use get_total_wall_time to determine
# the total wall time (this finds the difference between the earliest start
# and latest end for any block in any operator).

# The estimated single node dataset throughput is computed by dividing the
# total number of rows produced the sum of the wall times across all blocks
# of all operators. This assumes that on a single node the work done would
# be equivalent, with no concurrency.
output_num_rows = self.operators_stats[-1].output_num_rows
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
wall_time = self.get_total_wall_time()
total_time_all_blocks = self.get_total_time_all_blocks()
if total_num_out_rows and wall_time and total_time_all_blocks:
if self.num_rows_per_s:
out += "\n"
out += "Dataset throughput:\n"
out += (
"\t* Ray Data throughput:"
f" {total_num_out_rows / wall_time} "
"rows/s\n"
)
out += (
"\t* Estimated single node throughput:"
f" {total_num_out_rows / total_time_all_blocks} "
"rows/s\n"
)
out += "\t* Ray Data throughput:" f" {self.num_rows_per_s} " "rows/s\n"
if verbose_stats_logs and add_global_stats:
out += "\n" + self.runtime_metrics()

return out

@property
def num_rows_per_s(self) -> float:
"""Calculates the throughput in rows per second for the entire dataset."""
# The observed dataset throughput is computed by dividing the total number
# of rows produced by the total wall time of the dataset (i.e. from start to
# finish how long did the dataset take to be processed). With the recursive
# nature of the DatasetStatsSummary, we use get_total_wall_time to determine
# the total wall time (this finds the difference between the earliest start
# and latest end for any block in any operator).
output_num_rows = (
self.operators_stats[-1].output_num_rows if self.operators_stats else 0
)
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
wall_time = self.get_total_wall_time()
if not total_num_out_rows or not wall_time:
return 0.0
return total_num_out_rows / wall_time

@staticmethod
def _collect_dataset_stats_summaries(
curr: "DatasetStatsSummary",
Expand Down Expand Up @@ -1386,6 +1377,26 @@ class OperatorStatsSummary:
node_count: Optional[Dict[str, float]] = None
task_rows: Optional[Dict[str, float]] = None

@property
def num_rows_per_s(self) -> float:
# The observed Ray Data operator throughput is computed by dividing the
# total number of rows produced by the wall time of the operator,
# time_total_s.
if not self.output_num_rows or not self.time_total_s:
return 0.0
return self.output_num_rows["sum"] / self.time_total_s

@property
def num_rows_per_task_s(self) -> float:
"""Calculates the estimated single-task throughput in rows per second."""
# The estimated single task operator throughput is computed by dividing the
# total number of rows produced by the sum of the wall times across all
# blocks of the operator. This assumes that on a single task the work done
# would be equivalent, with no concurrency.
if not self.output_num_rows or not self.wall_time or not self.wall_time["sum"]:
return 0.0
return self.output_num_rows["sum"] / self.wall_time["sum"]

@classmethod
def from_block_metadata(
cls,
Expand Down Expand Up @@ -1630,18 +1641,7 @@ def __str__(self) -> str:
node_count_stats["mean"],
node_count_stats["count"],
)
if output_num_rows_stats and self.time_total_s and wall_time_stats:
# For throughput, we compute both an observed Ray Data operator throughput
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# and an estimated single node operator throughput.

# The observed Ray Data operator throughput is computed by dividing the
# total number of rows produced by the wall time of the operator,
# time_total_s.

# The estimated single node operator throughput is computed by dividing the
# total number of rows produced by the sum of the wall times across all
# blocks of the operator. This assumes that on a single node the work done
# would be equivalent, with no concurrency.
if self.num_rows_per_s and self.num_rows_per_task_s:
total_num_in_rows = (
self.total_input_num_rows if self.total_input_num_rows else 0
)
Expand All @@ -1656,12 +1656,12 @@ def __str__(self) -> str:
)
out += (
indent + "\t* Ray Data throughput:"
f" {total_num_out_rows / self.time_total_s} "
f" {self.num_rows_per_s} "
"rows/s\n"
)
out += (
indent + "\t* Estimated single node throughput:"
f" {total_num_out_rows / wall_time_stats['sum']} "
indent + "\t* Estimated single task throughput:"
f" {self.num_rows_per_task_s} "
"rows/s\n"
)
return out
Expand Down
Loading