Skip to content

Optionally display metrics per task#307

Closed
jayshrivastava wants to merge 1 commit intomainfrom
js/per-task-metrics
Closed

Optionally display metrics per task#307
jayshrivastava wants to merge 1 commit intomainfrom
js/per-task-metrics

Conversation

@jayshrivastava
Copy link
Collaborator

@jayshrivastava jayshrivastava commented Jan 22, 2026

This change adds a new enum DistributedMetricsFormat which is either Aggregated or PerTask.

Aggregated is the default. When we use DisplayableExecutionPlan::with_metrics, metrics are aggregated by name. So we leave metrics as is before displaying.

PerTask renames metrics like output_rows to be output_rows_x where x is the task id. That way DisplayableExecutionPlan::with_metrics can still aggregate by name but we preserve per-task metrics. See the comment on annotate_metrics_set_with_task_id for why it's implemented like this.

Sample output:

  1. DistributedMetricsFormat::Aggregated
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_rows=2, elapsed_compute=1.92µs, output_bytes=48.0 B, output_batches=1, expr_0_eval_time=208ns, expr_1_eval_time=125ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_rows=2, elapsed_compute=10.83µs, output_bytes=64.0 B, output_batches=1]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=10.80µs, output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_rows=2, elapsed_compute=5.09µs, output_bytes=32.1 KB, output_batches=2, expr_0_eval_time=462ns,
expr_1_eval_time=254ns, expr_2_eval_time=170ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=165.50µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_
rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=9.38µs, aggregation_time=9.21µs, emitting_time=5.34µs, time_calculating_group_ids=18.75µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_rows=6, elapsed_compute=52.26µs, output_bytes=1152.0 KB, output_batches=6, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10
.72ms, repartition_time=39.67µs, send_time=13.72µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=6, elapsed_compute=192.54µs, output_bytes=48.1 KB, output_batches=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, s
kipped_aggregation_rows=0, peak_mem_used=56.47 K, aggregate_arguments_time=10.42µs, aggregation_time=7.58µs, emitting_time=17.00µs, time_calculating_group_ids=81.88µs, reduction_factor=1.6% (6/366)]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=8.2 KB, output_batches=3, file
s_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=3.12 K, fil
e_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=6ns
, metadata_load_time=5.62ms, page_index_eval_time=6ns, row_pushdown_eval_time=6ns, statistics_eval_time=6ns, time_elapsed_opening=5.79ms, time_elapsed_processing=1.51ms, time_elapsed_scanning_total=4.77ms, time_elapsed_scanning_until_da
ta=4.54ms, scan_efficiency_ratio=2.3% (3.12 K/137.4 K)]
    └──────────────────────────────────────────────────
  1. DistributedMetricsFormat::PerTask
───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_batches_0=1, output_bytes_0=48, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=1.58µs, expr_0_eva
l_time_0=167ns, expr_1_eval_time_0=83ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_batches_0=1, output_bytes_0=64, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=27.04µs]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_batches_0=0, output_batches_1=0, output_bytes_0=0, output_bytes_1=0, output_rows_0=1, output_rows_1=1, spill_count_0=0, spill_count_1=0, spill
ed_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=8.42µs, elapsed_compute_1=4.9
2µs]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1
, output_rows_1=1, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=1.83µs, elapsed_compute_1=1.96µs, expr_0_eval_time_0=168ns, expr_0_eval_time_1=
168ns, expr_1_eval_time_0=43ns, expr_1_eval_time_1=85ns, expr_2_eval_time_0=86ns, expr_2_eval_time_1=85ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1, output_rows_1=1, spi
ll_count_0=0, spill_count_1=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, peak_mem_used_0=50.47 K, peak_mem_used_1=50.47 K, start_timestamp_0=53073
54.2 T, start_timestamp_1=5307354.2 T, aggregate_arguments_time_0=3.17µs, aggregate_arguments_time_1=2.59µs, aggregation_time_0=3.13µs, aggregation_time_1=2.04µs, elapsed_compute_0=93.29µs, elapsed_compute_1=57.67µs, emitting_time_0=2.5
0µs, emitting_time_1=2.04µs, time_calculating_group_ids_0=7.75µs, time_calculating_group_ids_1=4.67µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_batches_0=2, output_batches_1=2, output_batches_2=2, output_bytes_0=393.2 K, output_bytes_1=393.2 K, output_bytes_2=393.2 K, output_rows_0=2
, output_rows_1=2, output_rows_2=2, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=10614708.4 T, end_time
stamp_1=10614708.4 T, end_timestamp_2=10614708.4 T, start_timestamp_0=10614708.4 T, start_timestamp_1=10614708.4 T, start_timestamp_2=10614708.4 T, elapsed_compute_0=10.42µs, elapsed_compute_1=11.29µs, elapsed_compute_2=19.63µs, fetch_t
ime_0=3.92ms, fetch_time_1=3.35ms, fetch_time_2=4.08ms, repartition_time_0=7.21µs, repartition_time_1=14.33µs, repartition_time_2=20.21µs, send_time_0=2.63µs, send_time_1=3.63µs, send_time_2=5.46µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_batches_2=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_bytes_2=16.42 K, ou
tput_rows_0=2, output_rows_1=2, output_rows_2=2, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_
bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, peak_mem_used_0=18.82 K, peak_mem_used_1=18.82 K, peak_mem_used_2=18.82 K, reduction
_factor_0=1, reduction_factor_1=1, reduction_factor_2=1, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, start_timestamp_2=1769118.1 T, aggregate_arguments_time_0=2.83µs, aggregate_arguments_time_1=5.17µs, aggregate_argume
nts_time_2=7.21µs, aggregation_time_0=2.00µs, aggregation_time_1=3.42µs, aggregation_time_2=5.12µs, elapsed_compute_0=54.71µs, elapsed_compute_1=71.75µs, elapsed_compute_2=85.38µs, emitting_time_0=3.38µs, emitting_time_1=6.83µs, emittin
g_time_2=8.33µs, time_calculating_group_ids_0=25.96µs, time_calculating_group_ids_1=30.79µs, time_calculating_group_ids_2=34.00µs]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=1.07 K,
bytes_scanned_1=1.01 K, bytes_scanned_2=1.03 K, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, files_ranges_pruned_statistics_matched_0=1, files_ranges
_pruned_statistics_matched_1=1, files_ranges_pruned_statistics_matched_2=1, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, output_batches_0=1, output_batches_1=1, output_batches_
2=1, output_bytes_0=2.81 K, output_bytes_1=2.79 K, output_bytes_2=2.80 K, output_rows_0=122, output_rows_1=122, output_rows_2=122, page_index_rows_pruned_matched_0=0, page_index_rows_pruned_matched_1=0, page_index_rows_pruned_matched_2=
0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, predicate_evaluation_errors_0=0, predicat
e_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, row_groups_pruned_b
loom_filter_matched_0=1, row_groups_pruned_bloom_filter_matched_1=1, row_groups_pruned_bloom_filter_matched_2=1, row_groups_pruned_statistics_matched_0=1, row_groups_pruned_statistics_matched_1=1, row_groups_pruned_statistics_matched_2=
1, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, scan_efficiency_ratio_0=2, scan_efficiency_ratio_1=2, scan_efficiency_ratio_2=2, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, sta
rt_timestamp_2=1769118.1 T, bloom_filter_eval_time_0=2ns, bloom_filter_eval_time_1=2ns, bloom_filter_eval_time_2=2ns, elapsed_compute_0=1ns, elapsed_compute_1=1ns, elapsed_compute_2=1ns, metadata_load_time_0=2.92ms, metadata_load_time_1
=2.77ms, metadata_load_time_2=3.29ms, page_index_eval_time_0=2ns, page_index_eval_time_1=2ns, page_index_eval_time_2=2ns, row_pushdown_eval_time_0=2ns, row_pushdown_eval_time_1=2ns, row_pushdown_eval_time_2=2ns, statistics_eval_time_0=2
ns, statistics_eval_time_1=2ns, statistics_eval_time_2=2ns, time_elapsed_opening_0=2.94ms, time_elapsed_opening_1=2.82ms, time_elapsed_opening_2=3.33ms, time_elapsed_processing_0=374.50µs, time_elapsed_processing_1=432.71µs, time_elapse
d_processing_2=485.12µs, time_elapsed_scanning_total_0=929.29µs, time_elapsed_scanning_total_1=444.71µs, time_elapsed_scanning_total_2=699.75µs, time_elapsed_scanning_until_data_0=866.62µs, time_elapsed_scanning_until_data_1=364.79µs, t
ime_elapsed_scanning_until_data_2=598.04µs]
    └──────────────────────────────────────────────────

Closes #304

@jayshrivastava jayshrivastava changed the title Js/per task metrics Optionally display metrics per task Jan 22, 2026
AVG("MinTemp") as "MinTemp",
"RainTomorrow"
FROM weather
WHERE "RainToday" = 'yes'
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were outputting zero rows, which makes the test case trivial. I changed it so we output nonzero rows.

for data_source_index in 0..2 {
assert_metrics_equal::<DataSourceExec>(
["output_rows", "bytes_scanned"],
["output_rows", "output_bytes"],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will explain below.

assert!(
summed > 0,
"Sum of metric values is 0. Either the metric {metric_name} is not present or the test is too trivial"
);
Copy link
Collaborator Author

@jayshrivastava jayshrivastava Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, the single node plans have less bytes_scanned than the distributed plans. See how the distributed one is missing scan_efficiency_ratio, row_groups_pruned_statistics, and row_groups_pruned_bloom_filter. I wonder if we mess up the statistics when creating new file groups in the default task estimator. I filed #308 to investigate.


// single node
DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[Rainfall, RainToday], file_type=parquet, predicate=Rainfall@2 > 5, pruning_predicate=Rainfall_null_count@1 != row_count@2 AND Rainfall_max@0 > 5, required_guarantees=[], metrics=[output_rows=234, elapsed_compute=3ns, output_bytes=8.9 KB, output_batches=3, files_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=366 total → 234 matched, batches_split=0, bytes_scanned=4.90 K, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=37.71µs, metadata_load_time=552.13µs, page_index_eval_time=57.13µs, row_pushdown_eval_time=6ns, statistics_eval_time=40.17µs, time_elapsed_opening=993.33µs, time_elapsed_processing=1.54ms, time_elapsed_scanning_total=12.00ms, time_elapsed_scanning_until_data=11.93ms, scan_efficiency_ratio=11% (4.90 K/45.82 K)]

// distributed
DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[Rainfall, RainToday], file_type=parquet, predicate=Rainfall@2 > 5, pruning_predicate=Rainfall_null_count@1 != row_count@2 AND Rainfall_max@0 > 5, required_guarantees=[], metrics=[output_rows=234, elapsed_compute=3ns, batches_split=0, bytes_scanned=33.63 K, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=35.92µs, metadata_load_time=16.26ms, page_index_eval_time=53.13µs, row_pushdown_eval_time=6ns, statistics_eval_time=38.50µs, time_elapsed_opening=16.72ms, time_elapsed_processing=2.52ms, time_elapsed_scanning_total=8.38ms, time_elapsed_scanning_until_data=8.27ms]

This leads to the question - why was the test passing before?

I think it's because we would stop at the first metric which matched bytes_scanned. Technically, there may be many in the metrics set, even in single node plans. This is because 1 plan node may push multiple bytes_scanned metrics to the MetricSet for each partition. I believe the old test code did not fail because we would stop at the first bytes_scanned and this would have the same bytes scanned for the distributed and non distributed plan.

After I changed this code to sum, the test started failing on bytes_scanned due to the difference in pruning in distributed vs non-distributed.

I've changed these tests to use output_bytes instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this makes sense 👍, let's use output_bytes then.

/// └── MetricsWrapperExec (wrapped: ProjectionExec) [output_rows = 2, elapsed_compute = 200]
/// └── NetworkShuffleExec
pub fn rewrite_local_plan_with_metrics(
ctx: RewriteCtx,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I basically added RewriteCtx because this can be called on a distributed plan that has a task id or on just a single node plan (although, I don't know why anyone would do that).

Let me know if you have any cleaner ideas for RewriteCtx and DistributedMetricsFormat

#[test_case(DistributedMetricsFormat::Aggregated ; "aggregated_metrics")]
#[test_case(DistributedMetricsFormat::PerTask ; "per_task_metrics")]
#[tokio::test]
async fn test_metrics_collection_in_aggregation() -> Result<(), Box<dyn std::error::Error>> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers, I would recommend running these tests and taking a look at the metrics.

@jayshrivastava jayshrivastava marked this pull request as ready for review January 22, 2026 22:16
Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 that was fast! all of this LGTM.

I'd double check the CI before shipping this though.

Comment on lines +239 to +243
let labels: Vec<Label> = metric
.labels()
.iter()
.map(|l| Label::new(l.name().to_string(), l.value().to_string()))
.collect();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here it should be fine to just do:

        let labels = metric.labels().to_vec();

That way we don't penalize labels with names and values that are Cow::Borrowed by allocating a strings for them.

let new_name = Cow::Owned(format!("{base_name}_{task_id}"));

let new_metric_value = match metric.value() {
MetricValue::OutputRows(count) => {
Copy link
Collaborator

@gabotechs gabotechs Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit annoying that we need to do this manual clone of the metric values here. Another drawback is that we do not need atomic variable allocation and modifications, which means that we are paying an unnecessary price for thread safety when no operation will happen across threads.

I gave it a try, and I don't think we can avoid this with the current API. IMO it would be nice to make a contribution upstream doing whatever is necessary so that at some point in the future we can replace this with something like:

metric.value().clone().with_name("my new name")

Or something similar.

Also MetricValue, Label and pretty much everything that composes a Metric implements Clone, but for some reason it does not look like Metric implements clone. That's weird.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing we can do in this PR is probably open a contribution upstream and link the contributed PR here. If you are up for it that would be amazing, otherwise we can just put a comment here stating what would need to change in upstream's API so that we can do this more ergonomically.

tests/join.rs Outdated
let physical_plan = state.create_physical_plan(&logical_plan).await?;
let distributed_plan = display_plan_ascii(physical_plan.as_ref(), false);
println!("\n——————— DISTRIBUTED PLAN ———————\n\n{}", distributed_plan);
println!("\n——————— DISTRIBUTED PLAN ———————\n\n{distributed_plan}");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was fixed by @JSOD11 already in #303

assert!(
summed > 0,
"Sum of metric values is 0. Either the metric {metric_name} is not present or the test is too trivial"
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this makes sense 👍, let's use output_bytes then.

jayshrivastava added a commit that referenced this pull request Jan 23, 2026
This change adds a new enum `DistributedMetricsFormat` which is either `Aggregated` or `PerTask`.

`Aggregated` is the default. When we use `DisplayableExecutionPlan::with_metrics`, metrics are aggregated by name. So we leave metrics as is before displaying.

`PerTask` will configure the metrics re-writer to label metrics like `output_rows` with `__distributed_datafusion__task_id=x` at re-write time. At display time, we do 3 things to the `MetricSet` of a node:
1. Aggregate by name, task_id
2. Sort by name, task_id
3. Display metrics in the format `output_rows_x` where `x` is the task id.

If you look at what `DisplayableExecutionPlan::with_metrics` does, it does the exact same 3 steps above, except it only aggregates by name. The alternative approach is to rename metrics like in #307. I felt that this approach was more datafusion-naitive.

I also considered labelling metrics at collection-time, but I figured it's not worth the effort.

Sample output:

1. `DistributedMetricsFormat::Aggregated`
```
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_rows=2, elapsed_compute=1.92µs, output_bytes=48.0 B, output_batches=1, expr_0_eval_time=208ns, expr_1_eval_time=125ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_rows=2, elapsed_compute=10.83µs, output_bytes=64.0 B, output_batches=1]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=10.80µs, output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_rows=2, elapsed_compute=5.09µs, output_bytes=32.1 KB, output_batches=2, expr_0_eval_time=462ns,
expr_1_eval_time=254ns, expr_2_eval_time=170ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=165.50µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_
rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=9.38µs, aggregation_time=9.21µs, emitting_time=5.34µs, time_calculating_group_ids=18.75µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_rows=6, elapsed_compute=52.26µs, output_bytes=1152.0 KB, output_batches=6, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10
.72ms, repartition_time=39.67µs, send_time=13.72µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=6, elapsed_compute=192.54µs, output_bytes=48.1 KB, output_batches=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, s
kipped_aggregation_rows=0, peak_mem_used=56.47 K, aggregate_arguments_time=10.42µs, aggregation_time=7.58µs, emitting_time=17.00µs, time_calculating_group_ids=81.88µs, reduction_factor=1.6% (6/366)]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=8.2 KB, output_batches=3, file
s_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=3.12 K, fil
e_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=6ns
, metadata_load_time=5.62ms, page_index_eval_time=6ns, row_pushdown_eval_time=6ns, statistics_eval_time=6ns, time_elapsed_opening=5.79ms, time_elapsed_processing=1.51ms, time_elapsed_scanning_total=4.77ms, time_elapsed_scanning_until_da
ta=4.54ms, scan_efficiency_ratio=2.3% (3.12 K/137.4 K)]
    └──────────────────────────────────────────────────
```

2. `DistributedMetricsFormat::PerTask`
```
───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_batches_0=1, output_bytes_0=48, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=1.58µs, expr_0_eva
l_time_0=167ns, expr_1_eval_time_0=83ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_batches_0=1, output_bytes_0=64, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=27.04µs]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_batches_0=0, output_batches_1=0, output_bytes_0=0, output_bytes_1=0, output_rows_0=1, output_rows_1=1, spill_count_0=0, spill_count_1=0, spill
ed_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=8.42µs, elapsed_compute_1=4.9
2µs]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1
, output_rows_1=1, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=1.83µs, elapsed_compute_1=1.96µs, expr_0_eval_time_0=168ns, expr_0_eval_time_1=
168ns, expr_1_eval_time_0=43ns, expr_1_eval_time_1=85ns, expr_2_eval_time_0=86ns, expr_2_eval_time_1=85ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1, output_rows_1=1, spi
ll_count_0=0, spill_count_1=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, peak_mem_used_0=50.47 K, peak_mem_used_1=50.47 K, start_timestamp_0=53073
54.2 T, start_timestamp_1=5307354.2 T, aggregate_arguments_time_0=3.17µs, aggregate_arguments_time_1=2.59µs, aggregation_time_0=3.13µs, aggregation_time_1=2.04µs, elapsed_compute_0=93.29µs, elapsed_compute_1=57.67µs, emitting_time_0=2.5
0µs, emitting_time_1=2.04µs, time_calculating_group_ids_0=7.75µs, time_calculating_group_ids_1=4.67µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_batches_0=2, output_batches_1=2, output_batches_2=2, output_bytes_0=393.2 K, output_bytes_1=393.2 K, output_bytes_2=393.2 K, output_rows_0=2
, output_rows_1=2, output_rows_2=2, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=10614708.4 T, end_time
stamp_1=10614708.4 T, end_timestamp_2=10614708.4 T, start_timestamp_0=10614708.4 T, start_timestamp_1=10614708.4 T, start_timestamp_2=10614708.4 T, elapsed_compute_0=10.42µs, elapsed_compute_1=11.29µs, elapsed_compute_2=19.63µs, fetch_t
ime_0=3.92ms, fetch_time_1=3.35ms, fetch_time_2=4.08ms, repartition_time_0=7.21µs, repartition_time_1=14.33µs, repartition_time_2=20.21µs, send_time_0=2.63µs, send_time_1=3.63µs, send_time_2=5.46µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_batches_2=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_bytes_2=16.42 K, ou
tput_rows_0=2, output_rows_1=2, output_rows_2=2, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_
bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, peak_mem_used_0=18.82 K, peak_mem_used_1=18.82 K, peak_mem_used_2=18.82 K, reduction
_factor_0=1, reduction_factor_1=1, reduction_factor_2=1, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, start_timestamp_2=1769118.1 T, aggregate_arguments_time_0=2.83µs, aggregate_arguments_time_1=5.17µs, aggregate_argume
nts_time_2=7.21µs, aggregation_time_0=2.00µs, aggregation_time_1=3.42µs, aggregation_time_2=5.12µs, elapsed_compute_0=54.71µs, elapsed_compute_1=71.75µs, elapsed_compute_2=85.38µs, emitting_time_0=3.38µs, emitting_time_1=6.83µs, emittin
g_time_2=8.33µs, time_calculating_group_ids_0=25.96µs, time_calculating_group_ids_1=30.79µs, time_calculating_group_ids_2=34.00µs]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=1.07 K,
bytes_scanned_1=1.01 K, bytes_scanned_2=1.03 K, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, files_ranges_pruned_statistics_matched_0=1, files_ranges
_pruned_statistics_matched_1=1, files_ranges_pruned_statistics_matched_2=1, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, output_batches_0=1, output_batches_1=1, output_batches_
2=1, output_bytes_0=2.81 K, output_bytes_1=2.79 K, output_bytes_2=2.80 K, output_rows_0=122, output_rows_1=122, output_rows_2=122, page_index_rows_pruned_matched_0=0, page_index_rows_pruned_matched_1=0, page_index_rows_pruned_matched_2=
0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, predicate_evaluation_errors_0=0, predicat
e_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, row_groups_pruned_b
loom_filter_matched_0=1, row_groups_pruned_bloom_filter_matched_1=1, row_groups_pruned_bloom_filter_matched_2=1, row_groups_pruned_statistics_matched_0=1, row_groups_pruned_statistics_matched_1=1, row_groups_pruned_statistics_matched_2=
1, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, scan_efficiency_ratio_0=2, scan_efficiency_ratio_1=2, scan_efficiency_ratio_2=2, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, sta
rt_timestamp_2=1769118.1 T, bloom_filter_eval_time_0=2ns, bloom_filter_eval_time_1=2ns, bloom_filter_eval_time_2=2ns, elapsed_compute_0=1ns, elapsed_compute_1=1ns, elapsed_compute_2=1ns, metadata_load_time_0=2.92ms, metadata_load_time_1
=2.77ms, metadata_load_time_2=3.29ms, page_index_eval_time_0=2ns, page_index_eval_time_1=2ns, page_index_eval_time_2=2ns, row_pushdown_eval_time_0=2ns, row_pushdown_eval_time_1=2ns, row_pushdown_eval_time_2=2ns, statistics_eval_time_0=2
ns, statistics_eval_time_1=2ns, statistics_eval_time_2=2ns, time_elapsed_opening_0=2.94ms, time_elapsed_opening_1=2.82ms, time_elapsed_opening_2=3.33ms, time_elapsed_processing_0=374.50µs, time_elapsed_processing_1=432.71µs, time_elapse
d_processing_2=485.12µs, time_elapsed_scanning_total_0=929.29µs, time_elapsed_scanning_total_1=444.71µs, time_elapsed_scanning_total_2=699.75µs, time_elapsed_scanning_until_data_0=866.62µs, time_elapsed_scanning_until_data_1=364.79µs, t
ime_elapsed_scanning_until_data_2=598.04µs]
    └──────────────────────────────────────────────────
```
Closes #304
This change adds a new enum `DistributedMetricsFormat` which is either `Aggregated` or `PerTask`.

`Aggregated` is the default. When we use `DisplayableExecutionPlan::with_metrics`, metrics are aggregated by name. So we leave metrics as is before displaying.

`PerTask` renames metrics like `output_rows` to be `output_rows_x` where `x` is the task id. That way `DisplayableExecutionPlan::with_metrics` can still aggregate by name but we preserve per-task metrics. See the comment on `annotate_metrics_set_with_task_id` for why it's implemented like this.

Sample output:

1. `DistributedMetricsFormat::Aggregated`
```
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_rows=2, elapsed_compute=1.92µs, output_bytes=48.0 B, output_batches=1, expr_0_eval_time=208ns, expr_1_eval_time=125ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_rows=2, elapsed_compute=10.83µs, output_bytes=64.0 B, output_batches=1]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=10.80µs, output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_rows=2, elapsed_compute=5.09µs, output_bytes=32.1 KB, output_batches=2, expr_0_eval_time=462ns,
expr_1_eval_time=254ns, expr_2_eval_time=170ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=165.50µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_
rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=9.38µs, aggregation_time=9.21µs, emitting_time=5.34µs, time_calculating_group_ids=18.75µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_rows=6, elapsed_compute=52.26µs, output_bytes=1152.0 KB, output_batches=6, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10
.72ms, repartition_time=39.67µs, send_time=13.72µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=6, elapsed_compute=192.54µs, output_bytes=48.1 KB, output_batches=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, s
kipped_aggregation_rows=0, peak_mem_used=56.47 K, aggregate_arguments_time=10.42µs, aggregation_time=7.58µs, emitting_time=17.00µs, time_calculating_group_ids=81.88µs, reduction_factor=1.6% (6/366)]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=8.2 KB, output_batches=3, file
s_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=3.12 K, fil
e_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=6ns
, metadata_load_time=5.62ms, page_index_eval_time=6ns, row_pushdown_eval_time=6ns, statistics_eval_time=6ns, time_elapsed_opening=5.79ms, time_elapsed_processing=1.51ms, time_elapsed_scanning_total=4.77ms, time_elapsed_scanning_until_da
ta=4.54ms, scan_efficiency_ratio=2.3% (3.12 K/137.4 K)]
    └──────────────────────────────────────────────────
```

2. `DistributedMetricsFormat::PerTask`
```
───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_batches_0=1, output_bytes_0=48, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=1.58µs, expr_0_eva
l_time_0=167ns, expr_1_eval_time_0=83ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_batches_0=1, output_bytes_0=64, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=27.04µs]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_batches_0=0, output_batches_1=0, output_bytes_0=0, output_bytes_1=0, output_rows_0=1, output_rows_1=1, spill_count_0=0, spill_count_1=0, spill
ed_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=8.42µs, elapsed_compute_1=4.9
2µs]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1
, output_rows_1=1, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=1.83µs, elapsed_compute_1=1.96µs, expr_0_eval_time_0=168ns, expr_0_eval_time_1=
168ns, expr_1_eval_time_0=43ns, expr_1_eval_time_1=85ns, expr_2_eval_time_0=86ns, expr_2_eval_time_1=85ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1, output_rows_1=1, spi
ll_count_0=0, spill_count_1=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, peak_mem_used_0=50.47 K, peak_mem_used_1=50.47 K, start_timestamp_0=53073
54.2 T, start_timestamp_1=5307354.2 T, aggregate_arguments_time_0=3.17µs, aggregate_arguments_time_1=2.59µs, aggregation_time_0=3.13µs, aggregation_time_1=2.04µs, elapsed_compute_0=93.29µs, elapsed_compute_1=57.67µs, emitting_time_0=2.5
0µs, emitting_time_1=2.04µs, time_calculating_group_ids_0=7.75µs, time_calculating_group_ids_1=4.67µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_batches_0=2, output_batches_1=2, output_batches_2=2, output_bytes_0=393.2 K, output_bytes_1=393.2 K, output_bytes_2=393.2 K, output_rows_0=2
, output_rows_1=2, output_rows_2=2, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=10614708.4 T, end_time
stamp_1=10614708.4 T, end_timestamp_2=10614708.4 T, start_timestamp_0=10614708.4 T, start_timestamp_1=10614708.4 T, start_timestamp_2=10614708.4 T, elapsed_compute_0=10.42µs, elapsed_compute_1=11.29µs, elapsed_compute_2=19.63µs, fetch_t
ime_0=3.92ms, fetch_time_1=3.35ms, fetch_time_2=4.08ms, repartition_time_0=7.21µs, repartition_time_1=14.33µs, repartition_time_2=20.21µs, send_time_0=2.63µs, send_time_1=3.63µs, send_time_2=5.46µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_batches_2=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_bytes_2=16.42 K, ou
tput_rows_0=2, output_rows_1=2, output_rows_2=2, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_
bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, peak_mem_used_0=18.82 K, peak_mem_used_1=18.82 K, peak_mem_used_2=18.82 K, reduction
_factor_0=1, reduction_factor_1=1, reduction_factor_2=1, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, start_timestamp_2=1769118.1 T, aggregate_arguments_time_0=2.83µs, aggregate_arguments_time_1=5.17µs, aggregate_argume
nts_time_2=7.21µs, aggregation_time_0=2.00µs, aggregation_time_1=3.42µs, aggregation_time_2=5.12µs, elapsed_compute_0=54.71µs, elapsed_compute_1=71.75µs, elapsed_compute_2=85.38µs, emitting_time_0=3.38µs, emitting_time_1=6.83µs, emittin
g_time_2=8.33µs, time_calculating_group_ids_0=25.96µs, time_calculating_group_ids_1=30.79µs, time_calculating_group_ids_2=34.00µs]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=1.07 K,
bytes_scanned_1=1.01 K, bytes_scanned_2=1.03 K, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, files_ranges_pruned_statistics_matched_0=1, files_ranges
_pruned_statistics_matched_1=1, files_ranges_pruned_statistics_matched_2=1, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, output_batches_0=1, output_batches_1=1, output_batches_
2=1, output_bytes_0=2.81 K, output_bytes_1=2.79 K, output_bytes_2=2.80 K, output_rows_0=122, output_rows_1=122, output_rows_2=122, page_index_rows_pruned_matched_0=0, page_index_rows_pruned_matched_1=0, page_index_rows_pruned_matched_2=
0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, predicate_evaluation_errors_0=0, predicat
e_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, row_groups_pruned_b
loom_filter_matched_0=1, row_groups_pruned_bloom_filter_matched_1=1, row_groups_pruned_bloom_filter_matched_2=1, row_groups_pruned_statistics_matched_0=1, row_groups_pruned_statistics_matched_1=1, row_groups_pruned_statistics_matched_2=
1, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, scan_efficiency_ratio_0=2, scan_efficiency_ratio_1=2, scan_efficiency_ratio_2=2, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, sta
rt_timestamp_2=1769118.1 T, bloom_filter_eval_time_0=2ns, bloom_filter_eval_time_1=2ns, bloom_filter_eval_time_2=2ns, elapsed_compute_0=1ns, elapsed_compute_1=1ns, elapsed_compute_2=1ns, metadata_load_time_0=2.92ms, metadata_load_time_1
=2.77ms, metadata_load_time_2=3.29ms, page_index_eval_time_0=2ns, page_index_eval_time_1=2ns, page_index_eval_time_2=2ns, row_pushdown_eval_time_0=2ns, row_pushdown_eval_time_1=2ns, row_pushdown_eval_time_2=2ns, statistics_eval_time_0=2
ns, statistics_eval_time_1=2ns, statistics_eval_time_2=2ns, time_elapsed_opening_0=2.94ms, time_elapsed_opening_1=2.82ms, time_elapsed_opening_2=3.33ms, time_elapsed_processing_0=374.50µs, time_elapsed_processing_1=432.71µs, time_elapse
d_processing_2=485.12µs, time_elapsed_scanning_total_0=929.29µs, time_elapsed_scanning_total_1=444.71µs, time_elapsed_scanning_total_2=699.75µs, time_elapsed_scanning_until_data_0=866.62µs, time_elapsed_scanning_until_data_1=364.79µs, t
ime_elapsed_scanning_until_data_2=598.04µs]
    └──────────────────────────────────────────────────
```
Closes #304
jayshrivastava added a commit that referenced this pull request Jan 23, 2026
This change adds a new enum `DistributedMetricsFormat` which is either `Aggregated` or `PerTask`.

`Aggregated` is the default. When we use `DisplayableExecutionPlan::with_metrics`, metrics are aggregated by name. So we leave metrics as is before displaying.

`PerTask` will configure the metrics re-writer to label metrics like `output_rows` with `__distributed_datafusion__task_id=x` at re-write time. At display time, we do 3 things to the `MetricSet` of a node:
1. Aggregate by name, task_id
2. Sort by name, task_id
3. Display metrics in the format `output_rows_x` where `x` is the task id.

If you look at what `DisplayableExecutionPlan::with_metrics` does, it does the exact same 3 steps above, except it only aggregates by name. The alternative approach is to rename metrics like in #307. I felt that this approach was more datafusion-naitive.

I also considered labelling metrics at collection-time, but I figured it's not worth the effort.

Sample output:

1. `DistributedMetricsFormat::Aggregated`
```
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_rows=2, elapsed_compute=1.92µs, output_bytes=48.0 B, output_batches=1, expr_0_eval_time=208ns, expr_1_eval_time=125ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_rows=2, elapsed_compute=10.83µs, output_bytes=64.0 B, output_batches=1]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=10.80µs, output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_rows=2, elapsed_compute=5.09µs, output_bytes=32.1 KB, output_batches=2, expr_0_eval_time=462ns,
expr_1_eval_time=254ns, expr_2_eval_time=170ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=165.50µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_
rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=9.38µs, aggregation_time=9.21µs, emitting_time=5.34µs, time_calculating_group_ids=18.75µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_rows=6, elapsed_compute=52.26µs, output_bytes=1152.0 KB, output_batches=6, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10
.72ms, repartition_time=39.67µs, send_time=13.72µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=6, elapsed_compute=192.54µs, output_bytes=48.1 KB, output_batches=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, s
kipped_aggregation_rows=0, peak_mem_used=56.47 K, aggregate_arguments_time=10.42µs, aggregation_time=7.58µs, emitting_time=17.00µs, time_calculating_group_ids=81.88µs, reduction_factor=1.6% (6/366)]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=8.2 KB, output_batches=3, file
s_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=3.12 K, fil
e_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=6ns
, metadata_load_time=5.62ms, page_index_eval_time=6ns, row_pushdown_eval_time=6ns, statistics_eval_time=6ns, time_elapsed_opening=5.79ms, time_elapsed_processing=1.51ms, time_elapsed_scanning_total=4.77ms, time_elapsed_scanning_until_da
ta=4.54ms, scan_efficiency_ratio=2.3% (3.12 K/137.4 K)]
    └──────────────────────────────────────────────────
```

2. `DistributedMetricsFormat::PerTask`
```
───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_batches_0=1, output_bytes_0=48, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=1.58µs, expr_0_eva
l_time_0=167ns, expr_1_eval_time_0=83ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_batches_0=1, output_bytes_0=64, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=27.04µs]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_batches_0=0, output_batches_1=0, output_bytes_0=0, output_bytes_1=0, output_rows_0=1, output_rows_1=1, spill_count_0=0, spill_count_1=0, spill
ed_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=8.42µs, elapsed_compute_1=4.9
2µs]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1
, output_rows_1=1, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=1.83µs, elapsed_compute_1=1.96µs, expr_0_eval_time_0=168ns, expr_0_eval_time_1=
168ns, expr_1_eval_time_0=43ns, expr_1_eval_time_1=85ns, expr_2_eval_time_0=86ns, expr_2_eval_time_1=85ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1, output_rows_1=1, spi
ll_count_0=0, spill_count_1=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, peak_mem_used_0=50.47 K, peak_mem_used_1=50.47 K, start_timestamp_0=53073
54.2 T, start_timestamp_1=5307354.2 T, aggregate_arguments_time_0=3.17µs, aggregate_arguments_time_1=2.59µs, aggregation_time_0=3.13µs, aggregation_time_1=2.04µs, elapsed_compute_0=93.29µs, elapsed_compute_1=57.67µs, emitting_time_0=2.5
0µs, emitting_time_1=2.04µs, time_calculating_group_ids_0=7.75µs, time_calculating_group_ids_1=4.67µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_batches_0=2, output_batches_1=2, output_batches_2=2, output_bytes_0=393.2 K, output_bytes_1=393.2 K, output_bytes_2=393.2 K, output_rows_0=2
, output_rows_1=2, output_rows_2=2, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=10614708.4 T, end_time
stamp_1=10614708.4 T, end_timestamp_2=10614708.4 T, start_timestamp_0=10614708.4 T, start_timestamp_1=10614708.4 T, start_timestamp_2=10614708.4 T, elapsed_compute_0=10.42µs, elapsed_compute_1=11.29µs, elapsed_compute_2=19.63µs, fetch_t
ime_0=3.92ms, fetch_time_1=3.35ms, fetch_time_2=4.08ms, repartition_time_0=7.21µs, repartition_time_1=14.33µs, repartition_time_2=20.21µs, send_time_0=2.63µs, send_time_1=3.63µs, send_time_2=5.46µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_batches_2=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_bytes_2=16.42 K, ou
tput_rows_0=2, output_rows_1=2, output_rows_2=2, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_
bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, peak_mem_used_0=18.82 K, peak_mem_used_1=18.82 K, peak_mem_used_2=18.82 K, reduction
_factor_0=1, reduction_factor_1=1, reduction_factor_2=1, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, start_timestamp_2=1769118.1 T, aggregate_arguments_time_0=2.83µs, aggregate_arguments_time_1=5.17µs, aggregate_argume
nts_time_2=7.21µs, aggregation_time_0=2.00µs, aggregation_time_1=3.42µs, aggregation_time_2=5.12µs, elapsed_compute_0=54.71µs, elapsed_compute_1=71.75µs, elapsed_compute_2=85.38µs, emitting_time_0=3.38µs, emitting_time_1=6.83µs, emittin
g_time_2=8.33µs, time_calculating_group_ids_0=25.96µs, time_calculating_group_ids_1=30.79µs, time_calculating_group_ids_2=34.00µs]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=1.07 K,
bytes_scanned_1=1.01 K, bytes_scanned_2=1.03 K, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, files_ranges_pruned_statistics_matched_0=1, files_ranges
_pruned_statistics_matched_1=1, files_ranges_pruned_statistics_matched_2=1, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, output_batches_0=1, output_batches_1=1, output_batches_
2=1, output_bytes_0=2.81 K, output_bytes_1=2.79 K, output_bytes_2=2.80 K, output_rows_0=122, output_rows_1=122, output_rows_2=122, page_index_rows_pruned_matched_0=0, page_index_rows_pruned_matched_1=0, page_index_rows_pruned_matched_2=
0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, predicate_evaluation_errors_0=0, predicat
e_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, row_groups_pruned_b
loom_filter_matched_0=1, row_groups_pruned_bloom_filter_matched_1=1, row_groups_pruned_bloom_filter_matched_2=1, row_groups_pruned_statistics_matched_0=1, row_groups_pruned_statistics_matched_1=1, row_groups_pruned_statistics_matched_2=
1, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, scan_efficiency_ratio_0=2, scan_efficiency_ratio_1=2, scan_efficiency_ratio_2=2, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, sta
rt_timestamp_2=1769118.1 T, bloom_filter_eval_time_0=2ns, bloom_filter_eval_time_1=2ns, bloom_filter_eval_time_2=2ns, elapsed_compute_0=1ns, elapsed_compute_1=1ns, elapsed_compute_2=1ns, metadata_load_time_0=2.92ms, metadata_load_time_1
=2.77ms, metadata_load_time_2=3.29ms, page_index_eval_time_0=2ns, page_index_eval_time_1=2ns, page_index_eval_time_2=2ns, row_pushdown_eval_time_0=2ns, row_pushdown_eval_time_1=2ns, row_pushdown_eval_time_2=2ns, statistics_eval_time_0=2
ns, statistics_eval_time_1=2ns, statistics_eval_time_2=2ns, time_elapsed_opening_0=2.94ms, time_elapsed_opening_1=2.82ms, time_elapsed_opening_2=3.33ms, time_elapsed_processing_0=374.50µs, time_elapsed_processing_1=432.71µs, time_elapse
d_processing_2=485.12µs, time_elapsed_scanning_total_0=929.29µs, time_elapsed_scanning_total_1=444.71µs, time_elapsed_scanning_total_2=699.75µs, time_elapsed_scanning_until_data_0=866.62µs, time_elapsed_scanning_until_data_1=364.79µs, t
ime_elapsed_scanning_until_data_2=598.04µs]
    └──────────────────────────────────────────────────
```
Closes #304
jayshrivastava added a commit that referenced this pull request Jan 23, 2026
This change adds a new enum `DistributedMetricsFormat` which is either `Aggregated` or `PerTask`.

`Aggregated` is the default. When we use `DisplayableExecutionPlan::with_metrics`, metrics are aggregated by name. So we leave metrics as is before displaying.

`PerTask` will configure the metrics re-writer to label metrics like `output_rows` with `__distributed_datafusion__task_id=x` at re-write time. At display time, we do 3 things to the `MetricSet` of a node:
1. Aggregate by name, task_id
2. Sort by name, task_id
3. Display metrics in the format `output_rows_x` where `x` is the task id.

If you look at what `DisplayableExecutionPlan::with_metrics` does, it does the exact same 3 steps above, except it only aggregates by name. The alternative approach is to rename metrics like in #307. I felt that this approach was more datafusion-naitive.

I also considered labelling metrics at collection-time, but I figured it's not worth the effort.

Sample output:

1. `DistributedMetricsFormat::Aggregated`
```
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_rows=2, elapsed_compute=1.92µs, output_bytes=48.0 B, output_batches=1, expr_0_eval_time=208ns, expr_1_eval_time=125ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_rows=2, elapsed_compute=10.83µs, output_bytes=64.0 B, output_batches=1]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=10.80µs, output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_rows=2, elapsed_compute=5.09µs, output_bytes=32.1 KB, output_batches=2, expr_0_eval_time=462ns,
expr_1_eval_time=254ns, expr_2_eval_time=170ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=165.50µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_
rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=9.38µs, aggregation_time=9.21µs, emitting_time=5.34µs, time_calculating_group_ids=18.75µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_rows=6, elapsed_compute=52.26µs, output_bytes=1152.0 KB, output_batches=6, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10
.72ms, repartition_time=39.67µs, send_time=13.72µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=6, elapsed_compute=192.54µs, output_bytes=48.1 KB, output_batches=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, s
kipped_aggregation_rows=0, peak_mem_used=56.47 K, aggregate_arguments_time=10.42µs, aggregation_time=7.58µs, emitting_time=17.00µs, time_calculating_group_ids=81.88µs, reduction_factor=1.6% (6/366)]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=8.2 KB, output_batches=3, file
s_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=3.12 K, fil
e_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=6ns
, metadata_load_time=5.62ms, page_index_eval_time=6ns, row_pushdown_eval_time=6ns, statistics_eval_time=6ns, time_elapsed_opening=5.79ms, time_elapsed_processing=1.51ms, time_elapsed_scanning_total=4.77ms, time_elapsed_scanning_until_da
ta=4.54ms, scan_efficiency_ratio=2.3% (3.12 K/137.4 K)]
    └──────────────────────────────────────────────────
```

2. `DistributedMetricsFormat::PerTask`
```
───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_batches_0=1, output_bytes_0=48, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=1.58µs, expr_0_eva
l_time_0=167ns, expr_1_eval_time_0=83ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_batches_0=1, output_bytes_0=64, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=27.04µs]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_batches_0=0, output_batches_1=0, output_bytes_0=0, output_bytes_1=0, output_rows_0=1, output_rows_1=1, spill_count_0=0, spill_count_1=0, spill
ed_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=8.42µs, elapsed_compute_1=4.9
2µs]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1
, output_rows_1=1, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=1.83µs, elapsed_compute_1=1.96µs, expr_0_eval_time_0=168ns, expr_0_eval_time_1=
168ns, expr_1_eval_time_0=43ns, expr_1_eval_time_1=85ns, expr_2_eval_time_0=86ns, expr_2_eval_time_1=85ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1, output_rows_1=1, spi
ll_count_0=0, spill_count_1=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, peak_mem_used_0=50.47 K, peak_mem_used_1=50.47 K, start_timestamp_0=53073
54.2 T, start_timestamp_1=5307354.2 T, aggregate_arguments_time_0=3.17µs, aggregate_arguments_time_1=2.59µs, aggregation_time_0=3.13µs, aggregation_time_1=2.04µs, elapsed_compute_0=93.29µs, elapsed_compute_1=57.67µs, emitting_time_0=2.5
0µs, emitting_time_1=2.04µs, time_calculating_group_ids_0=7.75µs, time_calculating_group_ids_1=4.67µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_batches_0=2, output_batches_1=2, output_batches_2=2, output_bytes_0=393.2 K, output_bytes_1=393.2 K, output_bytes_2=393.2 K, output_rows_0=2
, output_rows_1=2, output_rows_2=2, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=10614708.4 T, end_time
stamp_1=10614708.4 T, end_timestamp_2=10614708.4 T, start_timestamp_0=10614708.4 T, start_timestamp_1=10614708.4 T, start_timestamp_2=10614708.4 T, elapsed_compute_0=10.42µs, elapsed_compute_1=11.29µs, elapsed_compute_2=19.63µs, fetch_t
ime_0=3.92ms, fetch_time_1=3.35ms, fetch_time_2=4.08ms, repartition_time_0=7.21µs, repartition_time_1=14.33µs, repartition_time_2=20.21µs, send_time_0=2.63µs, send_time_1=3.63µs, send_time_2=5.46µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_batches_2=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_bytes_2=16.42 K, ou
tput_rows_0=2, output_rows_1=2, output_rows_2=2, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_
bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, peak_mem_used_0=18.82 K, peak_mem_used_1=18.82 K, peak_mem_used_2=18.82 K, reduction
_factor_0=1, reduction_factor_1=1, reduction_factor_2=1, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, start_timestamp_2=1769118.1 T, aggregate_arguments_time_0=2.83µs, aggregate_arguments_time_1=5.17µs, aggregate_argume
nts_time_2=7.21µs, aggregation_time_0=2.00µs, aggregation_time_1=3.42µs, aggregation_time_2=5.12µs, elapsed_compute_0=54.71µs, elapsed_compute_1=71.75µs, elapsed_compute_2=85.38µs, emitting_time_0=3.38µs, emitting_time_1=6.83µs, emittin
g_time_2=8.33µs, time_calculating_group_ids_0=25.96µs, time_calculating_group_ids_1=30.79µs, time_calculating_group_ids_2=34.00µs]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=1.07 K,
bytes_scanned_1=1.01 K, bytes_scanned_2=1.03 K, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, files_ranges_pruned_statistics_matched_0=1, files_ranges
_pruned_statistics_matched_1=1, files_ranges_pruned_statistics_matched_2=1, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, output_batches_0=1, output_batches_1=1, output_batches_
2=1, output_bytes_0=2.81 K, output_bytes_1=2.79 K, output_bytes_2=2.80 K, output_rows_0=122, output_rows_1=122, output_rows_2=122, page_index_rows_pruned_matched_0=0, page_index_rows_pruned_matched_1=0, page_index_rows_pruned_matched_2=
0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, predicate_evaluation_errors_0=0, predicat
e_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, row_groups_pruned_b
loom_filter_matched_0=1, row_groups_pruned_bloom_filter_matched_1=1, row_groups_pruned_bloom_filter_matched_2=1, row_groups_pruned_statistics_matched_0=1, row_groups_pruned_statistics_matched_1=1, row_groups_pruned_statistics_matched_2=
1, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, scan_efficiency_ratio_0=2, scan_efficiency_ratio_1=2, scan_efficiency_ratio_2=2, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, sta
rt_timestamp_2=1769118.1 T, bloom_filter_eval_time_0=2ns, bloom_filter_eval_time_1=2ns, bloom_filter_eval_time_2=2ns, elapsed_compute_0=1ns, elapsed_compute_1=1ns, elapsed_compute_2=1ns, metadata_load_time_0=2.92ms, metadata_load_time_1
=2.77ms, metadata_load_time_2=3.29ms, page_index_eval_time_0=2ns, page_index_eval_time_1=2ns, page_index_eval_time_2=2ns, row_pushdown_eval_time_0=2ns, row_pushdown_eval_time_1=2ns, row_pushdown_eval_time_2=2ns, statistics_eval_time_0=2
ns, statistics_eval_time_1=2ns, statistics_eval_time_2=2ns, time_elapsed_opening_0=2.94ms, time_elapsed_opening_1=2.82ms, time_elapsed_opening_2=3.33ms, time_elapsed_processing_0=374.50µs, time_elapsed_processing_1=432.71µs, time_elapse
d_processing_2=485.12µs, time_elapsed_scanning_total_0=929.29µs, time_elapsed_scanning_total_1=444.71µs, time_elapsed_scanning_total_2=699.75µs, time_elapsed_scanning_until_data_0=866.62µs, time_elapsed_scanning_until_data_1=364.79µs, t
ime_elapsed_scanning_until_data_2=598.04µs]
    └──────────────────────────────────────────────────
```
Closes #304
jayshrivastava added a commit that referenced this pull request Jan 26, 2026
This change adds a new enum `DistributedMetricsFormat` which is either `Aggregated` or `PerTask`.

`Aggregated` is the default. When we use `DisplayableExecutionPlan::with_metrics`, metrics are aggregated by name. So we leave metrics as is before displaying.

`PerTask` will configure the metrics re-writer to label metrics like `output_rows` with `__distributed_datafusion__task_id=x` at re-write time. At display time, we do 3 things to the `MetricSet` of a node:
1. Aggregate by name, task_id
2. Sort by name, task_id
3. Display metrics in the format `output_rows_x` where `x` is the task id.

If you look at what `DisplayableExecutionPlan::with_metrics` does, it does the exact same 3 steps above, except it only aggregates by name. The alternative approach is to rename metrics like in #307. I felt that this approach was more datafusion-naitive.

I also considered labelling metrics at collection-time, but I figured it's not worth the effort.

Sample output:

1. `DistributedMetricsFormat::Aggregated`
```
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_rows=2, elapsed_compute=1.92µs, output_bytes=48.0 B, output_batches=1, expr_0_eval_time=208ns, expr_1_eval_time=125ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_rows=2, elapsed_compute=10.83µs, output_bytes=64.0 B, output_batches=1]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=10.80µs, output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_rows=2, elapsed_compute=5.09µs, output_bytes=32.1 KB, output_batches=2, expr_0_eval_time=462ns,
expr_1_eval_time=254ns, expr_2_eval_time=170ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=165.50µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_
rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=9.38µs, aggregation_time=9.21µs, emitting_time=5.34µs, time_calculating_group_ids=18.75µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_rows=6, elapsed_compute=52.26µs, output_bytes=1152.0 KB, output_batches=6, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10
.72ms, repartition_time=39.67µs, send_time=13.72µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=6, elapsed_compute=192.54µs, output_bytes=48.1 KB, output_batches=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, s
kipped_aggregation_rows=0, peak_mem_used=56.47 K, aggregate_arguments_time=10.42µs, aggregation_time=7.58µs, emitting_time=17.00µs, time_calculating_group_ids=81.88µs, reduction_factor=1.6% (6/366)]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=8.2 KB, output_batches=3, file
s_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=3.12 K, fil
e_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=6ns
, metadata_load_time=5.62ms, page_index_eval_time=6ns, row_pushdown_eval_time=6ns, statistics_eval_time=6ns, time_elapsed_opening=5.79ms, time_elapsed_processing=1.51ms, time_elapsed_scanning_total=4.77ms, time_elapsed_scanning_until_da
ta=4.54ms, scan_efficiency_ratio=2.3% (3.12 K/137.4 K)]
    └──────────────────────────────────────────────────
```

2. `DistributedMetricsFormat::PerTask`
```
───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_batches_0=1, output_bytes_0=48, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=1.58µs, expr_0_eva
l_time_0=167ns, expr_1_eval_time_0=83ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_batches_0=1, output_bytes_0=64, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=27.04µs]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_batches_0=0, output_batches_1=0, output_bytes_0=0, output_bytes_1=0, output_rows_0=1, output_rows_1=1, spill_count_0=0, spill_count_1=0, spill
ed_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=8.42µs, elapsed_compute_1=4.9
2µs]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1
, output_rows_1=1, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=1.83µs, elapsed_compute_1=1.96µs, expr_0_eval_time_0=168ns, expr_0_eval_time_1=
168ns, expr_1_eval_time_0=43ns, expr_1_eval_time_1=85ns, expr_2_eval_time_0=86ns, expr_2_eval_time_1=85ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1, output_rows_1=1, spi
ll_count_0=0, spill_count_1=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, peak_mem_used_0=50.47 K, peak_mem_used_1=50.47 K, start_timestamp_0=53073
54.2 T, start_timestamp_1=5307354.2 T, aggregate_arguments_time_0=3.17µs, aggregate_arguments_time_1=2.59µs, aggregation_time_0=3.13µs, aggregation_time_1=2.04µs, elapsed_compute_0=93.29µs, elapsed_compute_1=57.67µs, emitting_time_0=2.5
0µs, emitting_time_1=2.04µs, time_calculating_group_ids_0=7.75µs, time_calculating_group_ids_1=4.67µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_batches_0=2, output_batches_1=2, output_batches_2=2, output_bytes_0=393.2 K, output_bytes_1=393.2 K, output_bytes_2=393.2 K, output_rows_0=2
, output_rows_1=2, output_rows_2=2, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=10614708.4 T, end_time
stamp_1=10614708.4 T, end_timestamp_2=10614708.4 T, start_timestamp_0=10614708.4 T, start_timestamp_1=10614708.4 T, start_timestamp_2=10614708.4 T, elapsed_compute_0=10.42µs, elapsed_compute_1=11.29µs, elapsed_compute_2=19.63µs, fetch_t
ime_0=3.92ms, fetch_time_1=3.35ms, fetch_time_2=4.08ms, repartition_time_0=7.21µs, repartition_time_1=14.33µs, repartition_time_2=20.21µs, send_time_0=2.63µs, send_time_1=3.63µs, send_time_2=5.46µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_batches_2=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_bytes_2=16.42 K, ou
tput_rows_0=2, output_rows_1=2, output_rows_2=2, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_
bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, peak_mem_used_0=18.82 K, peak_mem_used_1=18.82 K, peak_mem_used_2=18.82 K, reduction
_factor_0=1, reduction_factor_1=1, reduction_factor_2=1, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, start_timestamp_2=1769118.1 T, aggregate_arguments_time_0=2.83µs, aggregate_arguments_time_1=5.17µs, aggregate_argume
nts_time_2=7.21µs, aggregation_time_0=2.00µs, aggregation_time_1=3.42µs, aggregation_time_2=5.12µs, elapsed_compute_0=54.71µs, elapsed_compute_1=71.75µs, elapsed_compute_2=85.38µs, emitting_time_0=3.38µs, emitting_time_1=6.83µs, emittin
g_time_2=8.33µs, time_calculating_group_ids_0=25.96µs, time_calculating_group_ids_1=30.79µs, time_calculating_group_ids_2=34.00µs]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=1.07 K,
bytes_scanned_1=1.01 K, bytes_scanned_2=1.03 K, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, files_ranges_pruned_statistics_matched_0=1, files_ranges
_pruned_statistics_matched_1=1, files_ranges_pruned_statistics_matched_2=1, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, output_batches_0=1, output_batches_1=1, output_batches_
2=1, output_bytes_0=2.81 K, output_bytes_1=2.79 K, output_bytes_2=2.80 K, output_rows_0=122, output_rows_1=122, output_rows_2=122, page_index_rows_pruned_matched_0=0, page_index_rows_pruned_matched_1=0, page_index_rows_pruned_matched_2=
0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, predicate_evaluation_errors_0=0, predicat
e_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, row_groups_pruned_b
loom_filter_matched_0=1, row_groups_pruned_bloom_filter_matched_1=1, row_groups_pruned_bloom_filter_matched_2=1, row_groups_pruned_statistics_matched_0=1, row_groups_pruned_statistics_matched_1=1, row_groups_pruned_statistics_matched_2=
1, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, scan_efficiency_ratio_0=2, scan_efficiency_ratio_1=2, scan_efficiency_ratio_2=2, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, sta
rt_timestamp_2=1769118.1 T, bloom_filter_eval_time_0=2ns, bloom_filter_eval_time_1=2ns, bloom_filter_eval_time_2=2ns, elapsed_compute_0=1ns, elapsed_compute_1=1ns, elapsed_compute_2=1ns, metadata_load_time_0=2.92ms, metadata_load_time_1
=2.77ms, metadata_load_time_2=3.29ms, page_index_eval_time_0=2ns, page_index_eval_time_1=2ns, page_index_eval_time_2=2ns, row_pushdown_eval_time_0=2ns, row_pushdown_eval_time_1=2ns, row_pushdown_eval_time_2=2ns, statistics_eval_time_0=2
ns, statistics_eval_time_1=2ns, statistics_eval_time_2=2ns, time_elapsed_opening_0=2.94ms, time_elapsed_opening_1=2.82ms, time_elapsed_opening_2=3.33ms, time_elapsed_processing_0=374.50µs, time_elapsed_processing_1=432.71µs, time_elapse
d_processing_2=485.12µs, time_elapsed_scanning_total_0=929.29µs, time_elapsed_scanning_total_1=444.71µs, time_elapsed_scanning_total_2=699.75µs, time_elapsed_scanning_until_data_0=866.62µs, time_elapsed_scanning_until_data_1=364.79µs, t
ime_elapsed_scanning_until_data_2=598.04µs]
    └──────────────────────────────────────────────────
```
Closes #304
@jayshrivastava
Copy link
Collaborator Author

Closing in favor of #309

jayshrivastava added a commit that referenced this pull request Jan 26, 2026
This change adds a new enum `DistributedMetricsFormat` which is either
`Aggregated` or `PerTask`.

`Aggregated` is the default. When we use
`DisplayableExecutionPlan::with_metrics`, metrics are aggregated by
name. So we leave metrics as is before displaying.

`PerTask` will configure the metrics re-writer to label metrics like
`output_rows` with `__distributed_datafusion__task_id=x` at re-write
time. At display time, we do 3 things to the `MetricSet` of a node:
1. Aggregate by name, task_id
2. Sort by name, task_id
3. Display metrics in the format `output_rows_x` where `x` is the task
id.

If you look at what `DisplayableExecutionPlan::with_metrics` does, it
does the exact same 3 steps above, except it only aggregates by name.
The alternative approach is to rename metrics like in
#307. I
felt that this approach was more datafusion-naitive.

I also considered labelling metrics at collection-time, but I figured
it's not worth the effort.

Sample output:

1. `DistributedMetricsFormat::Aggregated`
```
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_rows=2, elapsed_compute=1.92µs, output_bytes=48.0 B, output_batches=1, expr_0_eval_time=208ns, expr_1_eval_time=125ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_rows=2, elapsed_compute=10.83µs, output_bytes=64.0 B, output_batches=1]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=10.80µs, output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_rows=2, elapsed_compute=5.09µs, output_bytes=32.1 KB, output_batches=2, expr_0_eval_time=462ns,
expr_1_eval_time=254ns, expr_2_eval_time=170ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=165.50µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_
rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=9.38µs, aggregation_time=9.21µs, emitting_time=5.34µs, time_calculating_group_ids=18.75µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_rows=6, elapsed_compute=52.26µs, output_bytes=1152.0 KB, output_batches=6, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10
.72ms, repartition_time=39.67µs, send_time=13.72µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=6, elapsed_compute=192.54µs, output_bytes=48.1 KB, output_batches=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, s
kipped_aggregation_rows=0, peak_mem_used=56.47 K, aggregate_arguments_time=10.42µs, aggregation_time=7.58µs, emitting_time=17.00µs, time_calculating_group_ids=81.88µs, reduction_factor=1.6% (6/366)]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=8.2 KB, output_batches=3, file
s_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=3.12 K, fil
e_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=6ns
, metadata_load_time=5.62ms, page_index_eval_time=6ns, row_pushdown_eval_time=6ns, statistics_eval_time=6ns, time_elapsed_opening=5.79ms, time_elapsed_processing=1.51ms, time_elapsed_scanning_total=4.77ms, time_elapsed_scanning_until_da
ta=4.54ms, scan_efficiency_ratio=2.3% (3.12 K/137.4 K)]
    └──────────────────────────────────────────────────
```

2. `DistributedMetricsFormat::PerTask`
```
───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_batches_0=1, output_bytes_0=48, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=1.58µs, expr_0_eva
l_time_0=167ns, expr_1_eval_time_0=83ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_batches_0=1, output_bytes_0=64, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=27.04µs]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_batches_0=0, output_batches_1=0, output_bytes_0=0, output_bytes_1=0, output_rows_0=1, output_rows_1=1, spill_count_0=0, spill_count_1=0, spill
ed_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=8.42µs, elapsed_compute_1=4.9
2µs]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1
, output_rows_1=1, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=1.83µs, elapsed_compute_1=1.96µs, expr_0_eval_time_0=168ns, expr_0_eval_time_1=
168ns, expr_1_eval_time_0=43ns, expr_1_eval_time_1=85ns, expr_2_eval_time_0=86ns, expr_2_eval_time_1=85ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1, output_rows_1=1, spi
ll_count_0=0, spill_count_1=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, peak_mem_used_0=50.47 K, peak_mem_used_1=50.47 K, start_timestamp_0=53073
54.2 T, start_timestamp_1=5307354.2 T, aggregate_arguments_time_0=3.17µs, aggregate_arguments_time_1=2.59µs, aggregation_time_0=3.13µs, aggregation_time_1=2.04µs, elapsed_compute_0=93.29µs, elapsed_compute_1=57.67µs, emitting_time_0=2.5
0µs, emitting_time_1=2.04µs, time_calculating_group_ids_0=7.75µs, time_calculating_group_ids_1=4.67µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_batches_0=2, output_batches_1=2, output_batches_2=2, output_bytes_0=393.2 K, output_bytes_1=393.2 K, output_bytes_2=393.2 K, output_rows_0=2
, output_rows_1=2, output_rows_2=2, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=10614708.4 T, end_time
stamp_1=10614708.4 T, end_timestamp_2=10614708.4 T, start_timestamp_0=10614708.4 T, start_timestamp_1=10614708.4 T, start_timestamp_2=10614708.4 T, elapsed_compute_0=10.42µs, elapsed_compute_1=11.29µs, elapsed_compute_2=19.63µs, fetch_t
ime_0=3.92ms, fetch_time_1=3.35ms, fetch_time_2=4.08ms, repartition_time_0=7.21µs, repartition_time_1=14.33µs, repartition_time_2=20.21µs, send_time_0=2.63µs, send_time_1=3.63µs, send_time_2=5.46µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_batches_2=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_bytes_2=16.42 K, ou
tput_rows_0=2, output_rows_1=2, output_rows_2=2, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_
bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, peak_mem_used_0=18.82 K, peak_mem_used_1=18.82 K, peak_mem_used_2=18.82 K, reduction
_factor_0=1, reduction_factor_1=1, reduction_factor_2=1, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, start_timestamp_2=1769118.1 T, aggregate_arguments_time_0=2.83µs, aggregate_arguments_time_1=5.17µs, aggregate_argume
nts_time_2=7.21µs, aggregation_time_0=2.00µs, aggregation_time_1=3.42µs, aggregation_time_2=5.12µs, elapsed_compute_0=54.71µs, elapsed_compute_1=71.75µs, elapsed_compute_2=85.38µs, emitting_time_0=3.38µs, emitting_time_1=6.83µs, emittin
g_time_2=8.33µs, time_calculating_group_ids_0=25.96µs, time_calculating_group_ids_1=30.79µs, time_calculating_group_ids_2=34.00µs]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=1.07 K,
bytes_scanned_1=1.01 K, bytes_scanned_2=1.03 K, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, files_ranges_pruned_statistics_matched_0=1, files_ranges
_pruned_statistics_matched_1=1, files_ranges_pruned_statistics_matched_2=1, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, output_batches_0=1, output_batches_1=1, output_batches_
2=1, output_bytes_0=2.81 K, output_bytes_1=2.79 K, output_bytes_2=2.80 K, output_rows_0=122, output_rows_1=122, output_rows_2=122, page_index_rows_pruned_matched_0=0, page_index_rows_pruned_matched_1=0, page_index_rows_pruned_matched_2=
0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, predicate_evaluation_errors_0=0, predicat
e_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, row_groups_pruned_b
loom_filter_matched_0=1, row_groups_pruned_bloom_filter_matched_1=1, row_groups_pruned_bloom_filter_matched_2=1, row_groups_pruned_statistics_matched_0=1, row_groups_pruned_statistics_matched_1=1, row_groups_pruned_statistics_matched_2=
1, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, scan_efficiency_ratio_0=2, scan_efficiency_ratio_1=2, scan_efficiency_ratio_2=2, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, sta
rt_timestamp_2=1769118.1 T, bloom_filter_eval_time_0=2ns, bloom_filter_eval_time_1=2ns, bloom_filter_eval_time_2=2ns, elapsed_compute_0=1ns, elapsed_compute_1=1ns, elapsed_compute_2=1ns, metadata_load_time_0=2.92ms, metadata_load_time_1
=2.77ms, metadata_load_time_2=3.29ms, page_index_eval_time_0=2ns, page_index_eval_time_1=2ns, page_index_eval_time_2=2ns, row_pushdown_eval_time_0=2ns, row_pushdown_eval_time_1=2ns, row_pushdown_eval_time_2=2ns, statistics_eval_time_0=2
ns, statistics_eval_time_1=2ns, statistics_eval_time_2=2ns, time_elapsed_opening_0=2.94ms, time_elapsed_opening_1=2.82ms, time_elapsed_opening_2=3.33ms, time_elapsed_processing_0=374.50µs, time_elapsed_processing_1=432.71µs, time_elapse
d_processing_2=485.12µs, time_elapsed_scanning_total_0=929.29µs, time_elapsed_scanning_total_1=444.71µs, time_elapsed_scanning_total_2=699.75µs, time_elapsed_scanning_until_data_0=866.62µs, time_elapsed_scanning_until_data_1=364.79µs, t
ime_elapsed_scanning_until_data_2=598.04µs]
    └──────────────────────────────────────────────────
```
Closes
#304
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Aggregate metrics per task instead of globally

2 participants