diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index cbf6447142b0..8661de4744ec 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -1202,42 +1202,33 @@ def to_string( out += "\nDataset memory:\n" out += "* Spilled to disk: {}MB\n".format(dataset_mb_spilled) - # For throughput, we compute both an observed Ray Data dataset throughput - # and an estimated single node dataset throughput. - - # The observed dataset throughput is computed by dividing the total number - # of rows produced by the total wall time of the dataset (i.e. from start to - # finish how long did the dataset take to be processed). With the recursive - # nature of the DatasetStatsSummary, we use get_total_wall_time to determine - # the total wall time (this finds the difference between the earliest start - # and latest end for any block in any operator). - - # The estimated single node dataset throughput is computed by dividing the - # total number of rows produced the sum of the wall times across all blocks - # of all operators. This assumes that on a single node the work done would - # be equivalent, with no concurrency. - output_num_rows = self.operators_stats[-1].output_num_rows - total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0 - wall_time = self.get_total_wall_time() - total_time_all_blocks = self.get_total_time_all_blocks() - if total_num_out_rows and wall_time and total_time_all_blocks: + if self.num_rows_per_s: out += "\n" out += "Dataset throughput:\n" - out += ( - "\t* Ray Data throughput:" - f" {total_num_out_rows / wall_time} " - "rows/s\n" - ) - out += ( - "\t* Estimated single node throughput:" - f" {total_num_out_rows / total_time_all_blocks} " - "rows/s\n" - ) + out += f"\t* Ray Data throughput: {self.num_rows_per_s} rows/s\n" if verbose_stats_logs and add_global_stats: out += "\n" + self.runtime_metrics() return out + @property + def num_rows_per_s(self) -> float: + """Calculates the throughput in rows per second for the entire dataset.""" + # The observed dataset throughput is computed by dividing the total number + # of rows produced by the total wall time of the dataset (i.e. from start to + # finish how long did the dataset take to be processed). With the recursive + # nature of the DatasetStatsSummary, we use get_total_wall_time to determine + # the total wall time (this finds the difference between the earliest start + # and latest end for any block in any operator). + output_num_rows = ( + self.operators_stats[-1].output_num_rows if self.operators_stats else 0 + ) + total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0 + wall_time = self.get_total_wall_time() + if not total_num_out_rows or not wall_time: + return 0.0 + return total_num_out_rows / wall_time + @staticmethod def _collect_dataset_stats_summaries( curr: "DatasetStatsSummary", @@ -1386,6 +1377,26 @@ class OperatorStatsSummary: node_count: Optional[Dict[str, float]] = None task_rows: Optional[Dict[str, float]] = None + @property + def num_rows_per_s(self) -> float: + # The observed Ray Data operator throughput is computed by dividing the + # total number of rows produced by the wall time of the operator, + # time_total_s. + if not self.output_num_rows or not self.time_total_s: + return 0.0 + return self.output_num_rows["sum"] / self.time_total_s + + @property + def num_rows_per_task_s(self) -> float: + """Calculates the estimated single-task throughput in rows per second.""" + # The estimated single task operator throughput is computed by dividing the + # total number of rows produced by the sum of the wall times across all + # blocks of the operator. This assumes that on a single task the work done + # would be equivalent, with no concurrency. + if not self.output_num_rows or not self.wall_time or not self.wall_time["sum"]: + return 0.0 + return self.output_num_rows["sum"] / self.wall_time["sum"] + @classmethod def from_block_metadata( cls, @@ -1630,18 +1641,7 @@ def __str__(self) -> str: node_count_stats["mean"], node_count_stats["count"], ) - if output_num_rows_stats and self.time_total_s and wall_time_stats: - # For throughput, we compute both an observed Ray Data operator throughput - # and an estimated single node operator throughput. - - # The observed Ray Data operator throughput is computed by dividing the - # total number of rows produced by the wall time of the operator, - # time_total_s. - - # The estimated single node operator throughput is computed by dividing the - # total number of rows produced by the sum of the wall times across all - # blocks of the operator. This assumes that on a single node the work done - # would be equivalent, with no concurrency. + if self.num_rows_per_s and self.num_rows_per_task_s: total_num_in_rows = ( self.total_input_num_rows if self.total_input_num_rows else 0 ) @@ -1656,12 +1656,12 @@ def __str__(self) -> str: ) out += ( indent + "\t* Ray Data throughput:" - f" {total_num_out_rows / self.time_total_s} " + f" {self.num_rows_per_s} " "rows/s\n" ) out += ( - indent + "\t* Estimated single node throughput:" - f" {total_num_out_rows / wall_time_stats['sum']} " + indent + "\t* Estimated single task throughput:" + f" {self.num_rows_per_task_s} " "rows/s\n" ) return out diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 491880137b68..edc10eb63c47 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -33,10 +33,12 @@ from ray.data._internal.stats import ( DatasetStats, NodeMetrics, + OperatorStatsSummary, _StatsActor, get_or_create_stats_actor, ) from ray.data._internal.util import MemoryProfiler +from ray.data.block import BlockExecStats, BlockStats from ray.data.context import DataContext from ray.data.tests.util import column_udf from ray.tests.conftest import * # noqa @@ -406,7 +408,7 @@ def test_streaming_split_stats(ray_start_regular_shared, restore_data_context): * Total input num rows: N rows * Total output num rows: N rows * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s + * Estimated single task throughput: N rows/s * Extra metrics: {extra_metrics_1} Operator N split(N, equal=False): \n""" @@ -464,12 +466,11 @@ def test_dataset_stats_basic( f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"{gen_extra_metrics_str(STANDARD_EXTRA_METRICS_TASK_BACKPRESSURE, verbose_stats_logs)}" # noqa: E501 f"\n" f"Dataset throughput:\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" f"{gen_runtime_metrics_str(['ReadRange->MapBatches(dummy_map_batches)'], verbose_stats_logs)}" # noqa: E501 ) @@ -491,12 +492,11 @@ def test_dataset_stats_basic( f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"{gen_extra_metrics_str(STANDARD_EXTRA_METRICS_TASK_BACKPRESSURE, verbose_stats_logs)}" # noqa: E501 f"\n" f"Dataset throughput:\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" f"{gen_runtime_metrics_str(['ReadRange->MapBatches(dummy_map_batches)','Map(dummy_map_batches)'], verbose_stats_logs)}" # noqa: E501 ) @@ -523,7 +523,7 @@ def test_dataset_stats_basic( f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"{extra_metrics}\n" f"Operator N Map(dummy_map_batches): {EXECUTION_STRING}\n" f"* Remote wall time: T min, T max, T mean, T total\n" @@ -538,7 +538,7 @@ def test_dataset_stats_basic( f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"{extra_metrics}\n" f"Dataset iterator time breakdown:\n" f"* Total time overall: T\n" @@ -554,7 +554,6 @@ def test_dataset_stats_basic( f"\n" f"Dataset throughput:\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" f"{gen_runtime_metrics_str(['ReadRange->MapBatches(dummy_map_batches)','Map(dummy_map_batches)'], verbose_stats_logs)}" # noqa: E501 ) @@ -583,7 +582,7 @@ def test_block_location_nums(ray_start_regular_shared, restore_data_context): f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"\n" f"Dataset iterator time breakdown:\n" f"* Total time overall: T\n" @@ -603,7 +602,6 @@ def test_block_location_nums(ray_start_regular_shared, restore_data_context): f"\n" f"Dataset throughput:\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" ) @@ -1017,7 +1015,7 @@ def test_dataset_stats_shuffle(ray_start_regular_shared): * Total input num rows: N rows * Total output num rows: N rows * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s + * Estimated single task throughput: N rows/s Suboperator N RandomShuffleReduce: N tasks executed, N blocks produced * Remote wall time: T min, T max, T mean, T total @@ -1032,7 +1030,7 @@ def test_dataset_stats_shuffle(ray_start_regular_shared): * Total input num rows: N rows * Total output num rows: N rows * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s + * Estimated single task throughput: N rows/s Operator N Repartition: executed in T @@ -1049,7 +1047,7 @@ def test_dataset_stats_shuffle(ray_start_regular_shared): * Total input num rows: N rows * Total output num rows: N rows * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s + * Estimated single task throughput: N rows/s Suboperator N RepartitionReduce: N tasks executed, N blocks produced * Remote wall time: T min, T max, T mean, T total @@ -1064,11 +1062,10 @@ def test_dataset_stats_shuffle(ray_start_regular_shared): * Total input num rows: N rows * Total output num rows: N rows * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s + * Estimated single task throughput: N rows/s Dataset throughput: * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s """ ) @@ -1125,11 +1122,10 @@ def test_dataset_stats_range(ray_start_regular_shared, tmp_path): f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"\n" f"Dataset throughput:\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" ) @@ -1159,7 +1155,7 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path, restore_data_co f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"\n" f"Operator N Split: {EXECUTION_STRING}\n" f"* Remote wall time: T min, T max, T mean, T total\n" @@ -1174,7 +1170,7 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path, restore_data_co f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"\n" f"Operator N Map(): {EXECUTION_STRING}\n" f"* Remote wall time: T min, T max, T mean, T total\n" @@ -1189,11 +1185,10 @@ def test_dataset_split_stats(ray_start_regular_shared, tmp_path, restore_data_co f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"\n" f"Dataset throughput:\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" ) @@ -1391,7 +1386,7 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): * Total input num rows: N rows * Total output num rows: N rows * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s + * Estimated single task throughput: N rows/s Dataset iterator time breakdown: * Total time overall: T @@ -1407,7 +1402,6 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): Dataset throughput: * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s """ ) @@ -1432,11 +1426,10 @@ def test_write_ds_stats(ray_start_regular_shared, tmp_path): * Total input num rows: N rows * Total output num rows: N rows * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s + * Estimated single task throughput: N rows/s Dataset throughput: * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s """ ) @@ -1465,7 +1458,7 @@ def test_write_ds_stats(ray_start_regular_shared, tmp_path): * Total input num rows: N rows * Total output num rows: N rows * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s + * Estimated single task throughput: N rows/s Operator N Write: {EXECUTION_STRING} * Remote wall time: T min, T max, T mean, T total @@ -1480,11 +1473,10 @@ def test_write_ds_stats(ray_start_regular_shared, tmp_path): * Total input num rows: N rows * Total output num rows: N rows * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s + * Estimated single task throughput: N rows/s Dataset throughput: * Ray Data throughput: N rows/s - * Estimated single node throughput: N rows/s """ ) @@ -1661,35 +1653,69 @@ def test_task_duration_stats(): ) # ddof=1 for sample standard deviation -# NOTE: All tests above share a Ray cluster, while the tests below do not. These -# tests should only be carefully reordered to retain this invariant! +def test_dataset_throughput_calculation(ray_start_regular_shared): + """Test throughput calculations using mock block stats.""" + def create_block_stats(start_time, end_time, num_rows): + exec_stats = BlockExecStats() + exec_stats.start_time_s = start_time + exec_stats.end_time_s = end_time + exec_stats.wall_time_s = end_time - start_time + exec_stats.cpu_time_s = exec_stats.wall_time_s + return BlockStats(num_rows=num_rows, size_bytes=None, exec_stats=exec_stats) -def test_dataset_throughput(shutdown_only): - ray.shutdown() - ray.init(num_cpus=2) + blocks_stats = [ + create_block_stats(0.0, 2.0, 100), + create_block_stats(0.5, 2.5, 100), + create_block_stats(1.0, 3.0, 100), + ] - f = dummy_map_batches_sleep(0.01) - ds = ray.data.range(100).map(f).materialize().map(f).materialize() + stats = DatasetStats(metadata={"Map": blocks_stats}, parent=None) + summary = stats.to_summary() + + # Throughput: total rows / total execution duration + # Total rows = 300 + # Duration = max end_time - min start_time = 3.0s + # 300 rows / 3s = 100 rows/s + assert summary.num_rows_per_s == 100 - operator_pattern = re.compile( - r"Operator (\d+).*?\* Operator throughput:\s*.*?\* Ray Data throughput: (\d+\.\d+) rows/s.*?\* Estimated single node throughput: (\d+\.\d+) rows/s", - re.DOTALL, - ) - # Ray data throughput should always be better than single node throughput for - # multi-cpu case. - for match in operator_pattern.findall(ds.stats()): - assert float(match[1]) >= float(match[2]) +def test_operator_throughput_calculation(ray_start_regular_shared): + """Test operator throughput calculations using mock BlockStats.""" - # Pattern to match dataset throughput - dataset_pattern = re.compile( - r"Dataset throughput:.*?Ray Data throughput: (\d+\.\d+) rows/s.*?Estimated single node throughput: (\d+\.\d+) rows/s", # noqa: E501 - re.DOTALL, + def create_block_stats(start_time, end_time, num_rows, task_idx): + exec_stats = BlockExecStats() + exec_stats.start_time_s = start_time + exec_stats.end_time_s = end_time + exec_stats.wall_time_s = end_time - start_time + exec_stats.cpu_time_s = exec_stats.wall_time_s + exec_stats.task_idx = task_idx + + return BlockStats(num_rows=num_rows, size_bytes=None, exec_stats=exec_stats) + + blocks_stats = [ + create_block_stats(0.0, 2.0, 100, 1), + create_block_stats(0.0, 2.0, 100, 2), + ] + + summary = OperatorStatsSummary.from_block_metadata( + operator_name="MockOperator", + block_stats=blocks_stats, + is_sub_operator=False, ) - dataset_match = dataset_pattern.search(ds.stats()) - assert float(dataset_match[1]) >= float(dataset_match[2]) + # Total rows = 200 + # Total operator wall time (from earliest start to latest end) = 2.0s + # Sum of individual task wall times = 2.0s + 2.0s = 4.0s + # Overall throughput: Total rows / Total operator wall time + assert summary.num_rows_per_s == 200 / (2.0 - 0.0) + + # Estimated single task throughput: Total rows / Sum of individual task wall times` + assert summary.num_rows_per_task_s == 200 / (2.0 + 2.0) + + +# NOTE: All tests above share a Ray cluster, while the tests below do not. These +# tests should only be carefully reordered to retain this invariant! def test_individual_operator_num_rows(shutdown_only): @@ -1787,7 +1813,7 @@ def test_spilled_stats(shutdown_only, verbose_stats_logs, restore_data_context): f" * Total input num rows: N rows\n" f" * Total output num rows: N rows\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" + f" * Estimated single task throughput: N rows/s\n" f"{extra_metrics}\n" f"Cluster memory:\n" f"* Spilled to disk: M\n" @@ -1795,7 +1821,6 @@ def test_spilled_stats(shutdown_only, verbose_stats_logs, restore_data_context): f"\n" f"Dataset throughput:\n" f" * Ray Data throughput: N rows/s\n" - f" * Estimated single node throughput: N rows/s\n" f"{gen_runtime_metrics_str(['ReadRange->MapBatches()'], verbose_stats_logs)}" # noqa: E501 )