Skip to content

Optionally display metrics per task (version 2)#309

Merged
jayshrivastava merged 1 commit intomainfrom
js/per-task-metrics-2
Jan 26, 2026
Merged

Optionally display metrics per task (version 2)#309
jayshrivastava merged 1 commit intomainfrom
js/per-task-metrics-2

Conversation

@jayshrivastava
Copy link
Copy Markdown
Collaborator

This change adds a new enum DistributedMetricsFormat which is either Aggregated or PerTask.

Aggregated is the default. When we use DisplayableExecutionPlan::with_metrics, metrics are aggregated by name. So we leave metrics as is before displaying.

PerTask will configure the metrics re-writer to label metrics like output_rows with __distributed_datafusion__task_id=x at re-write time. At display time, we do 3 things to the MetricSet of a node:

  1. Aggregate by name, task_id
  2. Sort by name, task_id
  3. Display metrics in the format output_rows_x where x is the task id.

If you look at what DisplayableExecutionPlan::with_metrics does, it does the exact same 3 steps above, except it only aggregates by name. The alternative approach is to rename metrics like in #307. I felt that this approach was more datafusion-naitive.

I also considered labelling metrics at collection-time, but I figured it's not worth the effort.

Sample output:

  1. DistributedMetricsFormat::Aggregated
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_rows=2, elapsed_compute=1.92µs, output_bytes=48.0 B, output_batches=1, expr_0_eval_time=208ns, expr_1_eval_time=125ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_rows=2, elapsed_compute=10.83µs, output_bytes=64.0 B, output_batches=1]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=10.80µs, output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_rows=2, elapsed_compute=5.09µs, output_bytes=32.1 KB, output_batches=2, expr_0_eval_time=462ns,
expr_1_eval_time=254ns, expr_2_eval_time=170ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=165.50µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_
rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=9.38µs, aggregation_time=9.21µs, emitting_time=5.34µs, time_calculating_group_ids=18.75µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_rows=6, elapsed_compute=52.26µs, output_bytes=1152.0 KB, output_batches=6, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10
.72ms, repartition_time=39.67µs, send_time=13.72µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=6, elapsed_compute=192.54µs, output_bytes=48.1 KB, output_batches=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, s
kipped_aggregation_rows=0, peak_mem_used=56.47 K, aggregate_arguments_time=10.42µs, aggregation_time=7.58µs, emitting_time=17.00µs, time_calculating_group_ids=81.88µs, reduction_factor=1.6% (6/366)]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=8.2 KB, output_batches=3, file
s_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=3.12 K, fil
e_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=6ns
, metadata_load_time=5.62ms, page_index_eval_time=6ns, row_pushdown_eval_time=6ns, statistics_eval_time=6ns, time_elapsed_opening=5.79ms, time_elapsed_processing=1.51ms, time_elapsed_scanning_total=4.77ms, time_elapsed_scanning_until_da
ta=4.54ms, scan_efficiency_ratio=2.3% (3.12 K/137.4 K)]
    └──────────────────────────────────────────────────
  1. DistributedMetricsFormat::PerTask
───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_batches_0=1, output_bytes_0=48, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=1.58µs, expr_0_eva
l_time_0=167ns, expr_1_eval_time_0=83ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_batches_0=1, output_bytes_0=64, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=27.04µs]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_batches_0=0, output_batches_1=0, output_bytes_0=0, output_bytes_1=0, output_rows_0=1, output_rows_1=1, spill_count_0=0, spill_count_1=0, spill
ed_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=8.42µs, elapsed_compute_1=4.9
2µs]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1
, output_rows_1=1, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=1.83µs, elapsed_compute_1=1.96µs, expr_0_eval_time_0=168ns, expr_0_eval_time_1=
168ns, expr_1_eval_time_0=43ns, expr_1_eval_time_1=85ns, expr_2_eval_time_0=86ns, expr_2_eval_time_1=85ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1, output_rows_1=1, spi
ll_count_0=0, spill_count_1=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, peak_mem_used_0=50.47 K, peak_mem_used_1=50.47 K, start_timestamp_0=53073
54.2 T, start_timestamp_1=5307354.2 T, aggregate_arguments_time_0=3.17µs, aggregate_arguments_time_1=2.59µs, aggregation_time_0=3.13µs, aggregation_time_1=2.04µs, elapsed_compute_0=93.29µs, elapsed_compute_1=57.67µs, emitting_time_0=2.5
0µs, emitting_time_1=2.04µs, time_calculating_group_ids_0=7.75µs, time_calculating_group_ids_1=4.67µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_batches_0=2, output_batches_1=2, output_batches_2=2, output_bytes_0=393.2 K, output_bytes_1=393.2 K, output_bytes_2=393.2 K, output_rows_0=2
, output_rows_1=2, output_rows_2=2, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=10614708.4 T, end_time
stamp_1=10614708.4 T, end_timestamp_2=10614708.4 T, start_timestamp_0=10614708.4 T, start_timestamp_1=10614708.4 T, start_timestamp_2=10614708.4 T, elapsed_compute_0=10.42µs, elapsed_compute_1=11.29µs, elapsed_compute_2=19.63µs, fetch_t
ime_0=3.92ms, fetch_time_1=3.35ms, fetch_time_2=4.08ms, repartition_time_0=7.21µs, repartition_time_1=14.33µs, repartition_time_2=20.21µs, send_time_0=2.63µs, send_time_1=3.63µs, send_time_2=5.46µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_batches_2=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_bytes_2=16.42 K, ou
tput_rows_0=2, output_rows_1=2, output_rows_2=2, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_
bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, peak_mem_used_0=18.82 K, peak_mem_used_1=18.82 K, peak_mem_used_2=18.82 K, reduction
_factor_0=1, reduction_factor_1=1, reduction_factor_2=1, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, start_timestamp_2=1769118.1 T, aggregate_arguments_time_0=2.83µs, aggregate_arguments_time_1=5.17µs, aggregate_argume
nts_time_2=7.21µs, aggregation_time_0=2.00µs, aggregation_time_1=3.42µs, aggregation_time_2=5.12µs, elapsed_compute_0=54.71µs, elapsed_compute_1=71.75µs, elapsed_compute_2=85.38µs, emitting_time_0=3.38µs, emitting_time_1=6.83µs, emittin
g_time_2=8.33µs, time_calculating_group_ids_0=25.96µs, time_calculating_group_ids_1=30.79µs, time_calculating_group_ids_2=34.00µs]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=1.07 K,
bytes_scanned_1=1.01 K, bytes_scanned_2=1.03 K, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, files_ranges_pruned_statistics_matched_0=1, files_ranges
_pruned_statistics_matched_1=1, files_ranges_pruned_statistics_matched_2=1, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, output_batches_0=1, output_batches_1=1, output_batches_
2=1, output_bytes_0=2.81 K, output_bytes_1=2.79 K, output_bytes_2=2.80 K, output_rows_0=122, output_rows_1=122, output_rows_2=122, page_index_rows_pruned_matched_0=0, page_index_rows_pruned_matched_1=0, page_index_rows_pruned_matched_2=
0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, predicate_evaluation_errors_0=0, predicat
e_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, row_groups_pruned_b
loom_filter_matched_0=1, row_groups_pruned_bloom_filter_matched_1=1, row_groups_pruned_bloom_filter_matched_2=1, row_groups_pruned_statistics_matched_0=1, row_groups_pruned_statistics_matched_1=1, row_groups_pruned_statistics_matched_2=
1, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, scan_efficiency_ratio_0=2, scan_efficiency_ratio_1=2, scan_efficiency_ratio_2=2, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, sta
rt_timestamp_2=1769118.1 T, bloom_filter_eval_time_0=2ns, bloom_filter_eval_time_1=2ns, bloom_filter_eval_time_2=2ns, elapsed_compute_0=1ns, elapsed_compute_1=1ns, elapsed_compute_2=1ns, metadata_load_time_0=2.92ms, metadata_load_time_1
=2.77ms, metadata_load_time_2=3.29ms, page_index_eval_time_0=2ns, page_index_eval_time_1=2ns, page_index_eval_time_2=2ns, row_pushdown_eval_time_0=2ns, row_pushdown_eval_time_1=2ns, row_pushdown_eval_time_2=2ns, statistics_eval_time_0=2
ns, statistics_eval_time_1=2ns, statistics_eval_time_2=2ns, time_elapsed_opening_0=2.94ms, time_elapsed_opening_1=2.82ms, time_elapsed_opening_2=3.33ms, time_elapsed_processing_0=374.50µs, time_elapsed_processing_1=432.71µs, time_elapse
d_processing_2=485.12µs, time_elapsed_scanning_total_0=929.29µs, time_elapsed_scanning_total_1=444.71µs, time_elapsed_scanning_total_2=699.75µs, time_elapsed_scanning_until_data_0=866.62µs, time_elapsed_scanning_until_data_1=364.79µs, t
ime_elapsed_scanning_until_data_2=598.04µs]
    └──────────────────────────────────────────────────

Closes #304

metric.value().clone(),
metric.partition(),
labels,
)));
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much simpler, but still allocates. Will open something upstream.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's much better definitely! at least now we don't perform atomic operations, I think that's a major win.

metrics_set_proto_to_df(&expected_task_node_metrics).unwrap();
// Add task ids labels. We expect the actual metrics to be annotated by the
// rewriter
expected_metrics_set =
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some extra test coverage to ensure we added labels during the rewrite!

Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the differences I see between this PR and #307 is the way plans are displayed with metrics if visualizing with:

        println!("{}", DisplayableExecutionPlan::with_full_metrics(s_physical.as_ref()).indent(true).to_string());

In this PR:

  ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[expr_0_eval_time{partition=0, expr=count(*) AS count(*), __distributed_datafusion__task_id=0}=166ns, expr_1_eval_time{partition=0, expr=RainToday AS RainToday, __distributed_datafusion__task_id=0}=125ns, start_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.764266 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.786380 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=0}=1.79µs, output_rows{partition=0, __distributed_datafusion__task_id=0}=2, output_bytes{partition=0, __distributed_datafusion__task_id=0}=48.0 B, output_batches{partition=0, __distributed_datafusion__task_id=0}=1]
    SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[start_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.764240 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.786380 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=0}=27.88µs, output_rows{partition=0, __distributed_datafusion__task_id=0}=2, output_bytes{partition=0, __distributed_datafusion__task_id=0}=64.0 B, output_batches{partition=0, __distributed_datafusion__task_id=0}=1]
      [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
        SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[start_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.771675 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.785360 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=0}=4.54µs, output_rows{partition=0, __distributed_datafusion__task_id=0}=1, output_bytes{partition=0, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=0, __distributed_datafusion__task_id=0}=0, spill_count{partition=0, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=0}=0, start_timestamp{partition=1, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.771756 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.785436 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=0}=1ns, output_rows{partition=1, __distributed_datafusion__task_id=0}=0, output_bytes{partition=1, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=1, __distributed_datafusion__task_id=0}=0, spill_count{partition=1, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=1, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=1, __distributed_datafusion__task_id=0}=0, start_timestamp{partition=2, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.771835 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.785303 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=0}=1ns, output_rows{partition=2, __distributed_datafusion__task_id=0}=0, output_bytes{partition=2, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=2, __distributed_datafusion__task_id=0}=0, spill_count{partition=2, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=2, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=2, __distributed_datafusion__task_id=0}=0, start_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.770514 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784674 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=1}=1ns, output_rows{partition=0, __distributed_datafusion__task_id=1}=0, output_bytes{partition=0, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=0, __distributed_datafusion__task_id=1}=0, spill_count{partition=0, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=1}=0, start_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.770781 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784711 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=1}=1ns, output_rows{partition=1, __distributed_datafusion__task_id=1}=0, output_bytes{partition=1, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=1, __distributed_datafusion__task_id=1}=0, spill_count{partition=1, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=1, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=1, __distributed_datafusion__task_id=1}=0, start_timestamp{partition=2, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.770863 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784808 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=1}=9.38µs, output_rows{partition=2, __distributed_datafusion__task_id=1}=1, output_bytes{partition=2, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=2, __distributed_datafusion__task_id=1}=0, spill_count{partition=2, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=2, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=2, __distributed_datafusion__task_id=1}=0]
          ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[expr_0_eval_time{partition=0, expr=count(Int64(1)) AS count(*), __distributed_datafusion__task_id=0}=125ns, expr_1_eval_time{partition=0, expr=RainToday AS RainToday, __distributed_datafusion__task_id=0}=83ns, expr_2_eval_time{partition=0, expr=count(Int64(1)) AS count(Int64(1)), __distributed_datafusion__task_id=0}=84ns, start_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.771661 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.785355 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=0}=1.96µs, output_rows{partition=0, __distributed_datafusion__task_id=0}=1, output_bytes{partition=0, __distributed_datafusion__task_id=0}=16.0 KB, output_batches{partition=0, __distributed_datafusion__task_id=0}=1, expr_0_eval_time{partition=1, expr=count(Int64(1)) AS count(*), __distributed_datafusion__task_id=0}=1ns, expr_1_eval_time{partition=1, expr=RainToday AS RainToday, __distributed_datafusion__task_id=0}=1ns, expr_2_eval_time{partition=1, expr=count(Int64(1)) AS count(Int64(1)), __distributed_datafusion__task_id=0}=1ns, start_timestamp{partition=1, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.771744 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.785436 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=0}=1ns, output_rows{partition=1, __distributed_datafusion__task_id=0}=0, output_bytes{partition=1, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=1, __distributed_datafusion__task_id=0}=0, expr_0_eval_time{partition=2, expr=count(Int64(1)) AS count(*), __distributed_datafusion__task_id=0}=1ns, expr_1_eval_time{partition=2, expr=RainToday AS RainToday, __distributed_datafusion__task_id=0}=1ns, expr_2_eval_time{partition=2, expr=count(Int64(1)) AS count(Int64(1)), __distributed_datafusion__task_id=0}=1ns, start_timestamp{partition=2, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.771824 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.785302 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=0}=1ns, output_rows{partition=2, __distributed_datafusion__task_id=0}=0, output_bytes{partition=2, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=2, __distributed_datafusion__task_id=0}=0, expr_0_eval_time{partition=0, expr=count(Int64(1)) AS count(*), __distributed_datafusion__task_id=1}=1ns, expr_1_eval_time{partition=0, expr=RainToday AS RainToday, __distributed_datafusion__task_id=1}=1ns, expr_2_eval_time{partition=0, expr=count(Int64(1)) AS count(Int64(1)), __distributed_datafusion__task_id=1}=1ns, start_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.770499 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784673 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=1}=1ns, output_rows{partition=0, __distributed_datafusion__task_id=1}=0, output_bytes{partition=0, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=0, __distributed_datafusion__task_id=1}=0, expr_0_eval_time{partition=1, expr=count(Int64(1)) AS count(*), __distributed_datafusion__task_id=1}=1ns, expr_1_eval_time{partition=1, expr=RainToday AS RainToday, __distributed_datafusion__task_id=1}=1ns, expr_2_eval_time{partition=1, expr=count(Int64(1)) AS count(Int64(1)), __distributed_datafusion__task_id=1}=1ns, start_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.770767 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784710 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=1}=1ns, output_rows{partition=1, __distributed_datafusion__task_id=1}=0, output_bytes{partition=1, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=1, __distributed_datafusion__task_id=1}=0, expr_0_eval_time{partition=2, expr=count(Int64(1)) AS count(*), __distributed_datafusion__task_id=1}=166ns, expr_1_eval_time{partition=2, expr=RainToday AS RainToday, __distributed_datafusion__task_id=1}=84ns, expr_2_eval_time{partition=2, expr=count(Int64(1)) AS count(Int64(1)), __distributed_datafusion__task_id=1}=83ns, start_timestamp{partition=2, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.770851 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784798 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=1}=2.46µs, output_rows{partition=2, __distributed_datafusion__task_id=1}=1, output_bytes{partition=2, __distributed_datafusion__task_id=1}=16.0 KB, output_batches{partition=2, __distributed_datafusion__task_id=1}=1]
            AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[start_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.771644 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.785364 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=0}=39.79µs, output_rows{partition=0, __distributed_datafusion__task_id=0}=1, output_bytes{partition=0, __distributed_datafusion__task_id=0}=16.0 KB, output_batches{partition=0, __distributed_datafusion__task_id=0}=1, time_calculating_group_ids{partition=0, __distributed_datafusion__task_id=0}=7.29µs, aggregate_arguments_time{partition=0, __distributed_datafusion__task_id=0}=3.12µs, aggregation_time{partition=0, __distributed_datafusion__task_id=0}=2.67µs, emitting_time{partition=0, __distributed_datafusion__task_id=0}=2.08µs, spill_count{partition=0, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=0}=0, peak_mem_used{partition=0, __distributed_datafusion__task_id=0}=16.89 K, start_timestamp{partition=1, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.771727 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.785439 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=0}=13.83µs, output_rows{partition=1, __distributed_datafusion__task_id=0}=0, output_bytes{partition=1, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=1, __distributed_datafusion__task_id=0}=0, time_calculating_group_ids{partition=1, __distributed_datafusion__task_id=0}=1ns, aggregate_arguments_time{partition=1, __distributed_datafusion__task_id=0}=1ns, aggregation_time{partition=1, __distributed_datafusion__task_id=0}=1ns, emitting_time{partition=1, __distributed_datafusion__task_id=0}=1ns, spill_count{partition=1, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=1, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=1, __distributed_datafusion__task_id=0}=0, peak_mem_used{partition=1, __distributed_datafusion__task_id=0}=16.79 K, start_timestamp{partition=2, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.771809 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.785307 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=0}=11.08µs, output_rows{partition=2, __distributed_datafusion__task_id=0}=0, output_bytes{partition=2, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=2, __distributed_datafusion__task_id=0}=0, time_calculating_group_ids{partition=2, __distributed_datafusion__task_id=0}=1ns, aggregate_arguments_time{partition=2, __distributed_datafusion__task_id=0}=1ns, aggregation_time{partition=2, __distributed_datafusion__task_id=0}=1ns, emitting_time{partition=2, __distributed_datafusion__task_id=0}=1ns, spill_count{partition=2, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=2, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=2, __distributed_datafusion__task_id=0}=0, peak_mem_used{partition=2, __distributed_datafusion__task_id=0}=16.79 K, start_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.770458 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784680 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=1}=33.92µs, output_rows{partition=0, __distributed_datafusion__task_id=1}=0, output_bytes{partition=0, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=0, __distributed_datafusion__task_id=1}=0, time_calculating_group_ids{partition=0, __distributed_datafusion__task_id=1}=1ns, aggregate_arguments_time{partition=0, __distributed_datafusion__task_id=1}=1ns, aggregation_time{partition=0, __distributed_datafusion__task_id=1}=1ns, emitting_time{partition=0, __distributed_datafusion__task_id=1}=1ns, spill_count{partition=0, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=1}=0, peak_mem_used{partition=0, __distributed_datafusion__task_id=1}=16.79 K, start_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.770747 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784715 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=1}=15.08µs, output_rows{partition=1, __distributed_datafusion__task_id=1}=0, output_bytes{partition=1, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=1, __distributed_datafusion__task_id=1}=0, time_calculating_group_ids{partition=1, __distributed_datafusion__task_id=1}=1ns, aggregate_arguments_time{partition=1, __distributed_datafusion__task_id=1}=1ns, aggregation_time{partition=1, __distributed_datafusion__task_id=1}=1ns, emitting_time{partition=1, __distributed_datafusion__task_id=1}=1ns, spill_count{partition=1, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=1, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=1, __distributed_datafusion__task_id=1}=0, peak_mem_used{partition=1, __distributed_datafusion__task_id=1}=16.79 K, start_timestamp{partition=2, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.770834 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784813 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=1}=49.29µs, output_rows{partition=2, __distributed_datafusion__task_id=1}=1, output_bytes{partition=2, __distributed_datafusion__task_id=1}=16.0 KB, output_batches{partition=2, __distributed_datafusion__task_id=1}=1, time_calculating_group_ids{partition=2, __distributed_datafusion__task_id=1}=12.92µs, aggregate_arguments_time{partition=2, __distributed_datafusion__task_id=1}=4.25µs, aggregation_time{partition=2, __distributed_datafusion__task_id=1}=3.54µs, emitting_time{partition=2, __distributed_datafusion__task_id=1}=2.42µs, spill_count{partition=2, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=2, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=2, __distributed_datafusion__task_id=1}=0, peak_mem_used{partition=2, __distributed_datafusion__task_id=1}=16.89 K]
              [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
                RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[spill_count{partition=3, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=3, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=3, __distributed_datafusion__task_id=0}=0, fetch_time{partition=0, __distributed_datafusion__task_id=0}=1.52ms, repartition_time{partition=0, __distributed_datafusion__task_id=0}=23.08µs, send_time{partition=0, outputPartition=0, __distributed_datafusion__task_id=0}=5.00µs, send_time{partition=0, outputPartition=1, __distributed_datafusion__task_id=0}=1ns, send_time{partition=0, outputPartition=2, __distributed_datafusion__task_id=0}=1ns, send_time{partition=0, outputPartition=3, __distributed_datafusion__task_id=0}=1ns, send_time{partition=0, outputPartition=4, __distributed_datafusion__task_id=0}=1ns, send_time{partition=0, outputPartition=5, __distributed_datafusion__task_id=0}=1.58µs, spill_count{partition=4, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=4, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=4, __distributed_datafusion__task_id=0}=0, spill_count{partition=5, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=5, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=5, __distributed_datafusion__task_id=0}=0, start_timestamp{partition=3, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.774666 UTC, end_timestamp{partition=3, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.776493 UTC, elapsed_compute{partition=3, __distributed_datafusion__task_id=0}=1.12µs, output_rows{partition=3, __distributed_datafusion__task_id=0}=0, output_bytes{partition=3, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=3, __distributed_datafusion__task_id=0}=0, start_timestamp{partition=4, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.774686 UTC, end_timestamp{partition=4, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.776515 UTC, elapsed_compute{partition=4, __distributed_datafusion__task_id=0}=457ns, output_rows{partition=4, __distributed_datafusion__task_id=0}=0, output_bytes{partition=4, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=4, __distributed_datafusion__task_id=0}=0, start_timestamp{partition=5, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.774697 UTC, end_timestamp{partition=5, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.776661 UTC, elapsed_compute{partition=5, __distributed_datafusion__task_id=0}=18.08µs, output_rows{partition=5, __distributed_datafusion__task_id=0}=1, output_bytes{partition=5, __distributed_datafusion__task_id=0}=192.0 KB, output_batches{partition=5, __distributed_datafusion__task_id=0}=1, spill_count{partition=0, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=0}=0, spill_count{partition=1, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=1, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=1, __distributed_datafusion__task_id=0}=0, spill_count{partition=2, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=2, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=2, __distributed_datafusion__task_id=0}=0, start_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.781306 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.781380 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=0}=11.50µs, output_rows{partition=0, __distributed_datafusion__task_id=0}=1, output_bytes{partition=0, __distributed_datafusion__task_id=0}=192.0 KB, output_batches{partition=0, __distributed_datafusion__task_id=0}=1, start_timestamp{partition=1, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.781391 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.781409 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=0}=292ns, output_rows{partition=1, __distributed_datafusion__task_id=0}=0, output_bytes{partition=1, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=1, __distributed_datafusion__task_id=0}=0, start_timestamp{partition=2, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.781417 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.781431 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=0}=250ns, output_rows{partition=2, __distributed_datafusion__task_id=0}=0, output_bytes{partition=2, __distributed_datafusion__task_id=0}=0.0 B, output_batches{partition=2, __distributed_datafusion__task_id=0}=0, spill_count{partition=3, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=3, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=3, __distributed_datafusion__task_id=1}=0, fetch_time{partition=0, __distributed_datafusion__task_id=1}=2.07ms, repartition_time{partition=0, __distributed_datafusion__task_id=1}=8.46µs, send_time{partition=0, outputPartition=0, __distributed_datafusion__task_id=1}=1.96µs, send_time{partition=0, outputPartition=1, __distributed_datafusion__task_id=1}=1ns, send_time{partition=0, outputPartition=2, __distributed_datafusion__task_id=1}=1ns, send_time{partition=0, outputPartition=3, __distributed_datafusion__task_id=1}=1ns, send_time{partition=0, outputPartition=4, __distributed_datafusion__task_id=1}=1ns, send_time{partition=0, outputPartition=5, __distributed_datafusion__task_id=1}=1.12µs, spill_count{partition=4, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=4, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=4, __distributed_datafusion__task_id=1}=0, spill_count{partition=5, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=5, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=5, __distributed_datafusion__task_id=1}=0, spill_count{partition=0, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=1}=0, spill_count{partition=1, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=1, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=1, __distributed_datafusion__task_id=1}=0, spill_count{partition=2, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=2, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=2, __distributed_datafusion__task_id=1}=0, start_timestamp{partition=3, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.781702 UTC, end_timestamp{partition=3, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784054 UTC, elapsed_compute{partition=3, __distributed_datafusion__task_id=1}=460ns, output_rows{partition=3, __distributed_datafusion__task_id=1}=0, output_bytes{partition=3, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=3, __distributed_datafusion__task_id=1}=0, start_timestamp{partition=4, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.781723 UTC, end_timestamp{partition=4, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784001 UTC, elapsed_compute{partition=4, __distributed_datafusion__task_id=1}=376ns, output_rows{partition=4, __distributed_datafusion__task_id=1}=0, output_bytes{partition=4, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=4, __distributed_datafusion__task_id=1}=0, start_timestamp{partition=5, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.781743 UTC, end_timestamp{partition=5, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784101 UTC, elapsed_compute{partition=5, __distributed_datafusion__task_id=1}=4.75µs, output_rows{partition=5, __distributed_datafusion__task_id=1}=1, output_bytes{partition=5, __distributed_datafusion__task_id=1}=192.0 KB, output_batches{partition=5, __distributed_datafusion__task_id=1}=1, start_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.781762 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784049 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=1}=9.83µs, output_rows{partition=0, __distributed_datafusion__task_id=1}=1, output_bytes{partition=0, __distributed_datafusion__task_id=1}=192.0 KB, output_batches{partition=0, __distributed_datafusion__task_id=1}=1, start_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.781783 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784106 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=1}=292ns, output_rows{partition=1, __distributed_datafusion__task_id=1}=0, output_bytes{partition=1, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=1, __distributed_datafusion__task_id=1}=0, start_timestamp{partition=2, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.781799 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.784008 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=1}=293ns, output_rows{partition=2, __distributed_datafusion__task_id=1}=0, output_bytes{partition=2, __distributed_datafusion__task_id=1}=0.0 B, output_batches{partition=2, __distributed_datafusion__task_id=1}=0, spill_count{partition=0, __distributed_datafusion__task_id=2}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=2}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=2}=0, fetch_time{partition=0, __distributed_datafusion__task_id=2}=1.51ms, repartition_time{partition=0, __distributed_datafusion__task_id=2}=14.58µs, send_time{partition=0, outputPartition=0, __distributed_datafusion__task_id=2}=3.17µs, send_time{partition=0, outputPartition=1, __distributed_datafusion__task_id=2}=1ns, send_time{partition=0, outputPartition=2, __distributed_datafusion__task_id=2}=1ns, send_time{partition=0, outputPartition=3, __distributed_datafusion__task_id=2}=1ns, send_time{partition=0, outputPartition=4, __distributed_datafusion__task_id=2}=1ns, send_time{partition=0, outputPartition=5, __distributed_datafusion__task_id=2}=1.50µs, spill_count{partition=1, __distributed_datafusion__task_id=2}=0, spilled_bytes{partition=1, __distributed_datafusion__task_id=2}=0.0 B, spilled_rows{partition=1, __distributed_datafusion__task_id=2}=0, spill_count{partition=2, __distributed_datafusion__task_id=2}=0, spilled_bytes{partition=2, __distributed_datafusion__task_id=2}=0.0 B, spilled_rows{partition=2, __distributed_datafusion__task_id=2}=0, spill_count{partition=3, __distributed_datafusion__task_id=2}=0, spilled_bytes{partition=3, __distributed_datafusion__task_id=2}=0.0 B, spilled_rows{partition=3, __distributed_datafusion__task_id=2}=0, spill_count{partition=4, __distributed_datafusion__task_id=2}=0, spilled_bytes{partition=4, __distributed_datafusion__task_id=2}=0.0 B, spilled_rows{partition=4, __distributed_datafusion__task_id=2}=0, spill_count{partition=5, __distributed_datafusion__task_id=2}=0, spilled_bytes{partition=5, __distributed_datafusion__task_id=2}=0.0 B, spilled_rows{partition=5, __distributed_datafusion__task_id=2}=0, start_timestamp{partition=0, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.781222 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.783414 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=2}=10.04µs, output_rows{partition=0, __distributed_datafusion__task_id=2}=1, output_bytes{partition=0, __distributed_datafusion__task_id=2}=192.0 KB, output_batches{partition=0, __distributed_datafusion__task_id=2}=1, start_timestamp{partition=1, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.781240 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.783352 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=2}=500ns, output_rows{partition=1, __distributed_datafusion__task_id=2}=0, output_bytes{partition=1, __distributed_datafusion__task_id=2}=0.0 B, output_batches{partition=1, __distributed_datafusion__task_id=2}=0, start_timestamp{partition=2, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.781253 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.783425 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=2}=458ns, output_rows{partition=2, __distributed_datafusion__task_id=2}=0, output_bytes{partition=2, __distributed_datafusion__task_id=2}=0.0 B, output_batches{partition=2, __distributed_datafusion__task_id=2}=0, start_timestamp{partition=3, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.781268 UTC, end_timestamp{partition=3, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.783360 UTC, elapsed_compute{partition=3, __distributed_datafusion__task_id=2}=292ns, output_rows{partition=3, __distributed_datafusion__task_id=2}=0, output_bytes{partition=3, __distributed_datafusion__task_id=2}=0.0 B, output_batches{partition=3, __distributed_datafusion__task_id=2}=0, start_timestamp{partition=4, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.781279 UTC, end_timestamp{partition=4, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.783420 UTC, elapsed_compute{partition=4, __distributed_datafusion__task_id=2}=333ns, output_rows{partition=4, __distributed_datafusion__task_id=2}=0, output_bytes{partition=4, __distributed_datafusion__task_id=2}=0.0 B, output_batches{partition=4, __distributed_datafusion__task_id=2}=0, start_timestamp{partition=5, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.781293 UTC, end_timestamp{partition=5, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.783468 UTC, elapsed_compute{partition=5, __distributed_datafusion__task_id=2}=4.75µs, output_rows{partition=5, __distributed_datafusion__task_id=2}=1, output_bytes{partition=5, __distributed_datafusion__task_id=2}=192.0 KB, output_batches{partition=5, __distributed_datafusion__task_id=2}=1]
                  AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[start_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.774418 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.776231 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=0}=99.92µs, output_rows{partition=0, __distributed_datafusion__task_id=0}=2, output_bytes{partition=0, __distributed_datafusion__task_id=0}=16.0 KB, output_batches{partition=0, __distributed_datafusion__task_id=0}=1, time_calculating_group_ids{partition=0, __distributed_datafusion__task_id=0}=37.38µs, aggregate_arguments_time{partition=0, __distributed_datafusion__task_id=0}=8.58µs, aggregation_time{partition=0, __distributed_datafusion__task_id=0}=5.04µs, emitting_time{partition=0, __distributed_datafusion__task_id=0}=7.25µs, spill_count{partition=0, __distributed_datafusion__task_id=0}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=0}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=0}=0, peak_mem_used{partition=0, __distributed_datafusion__task_id=0}=18.82 K, skipped_aggregation_rows{partition=0, __distributed_datafusion__task_id=0}=0, reduction_factor{partition=0, __distributed_datafusion__task_id=0}=1.6% (2/122), start_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.780632 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.783978 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=1}=55.29µs, output_rows{partition=0, __distributed_datafusion__task_id=1}=2, output_bytes{partition=0, __distributed_datafusion__task_id=1}=16.0 KB, output_batches{partition=0, __distributed_datafusion__task_id=1}=1, time_calculating_group_ids{partition=0, __distributed_datafusion__task_id=1}=26.12µs, aggregate_arguments_time{partition=0, __distributed_datafusion__task_id=1}=2.67µs, aggregation_time{partition=0, __distributed_datafusion__task_id=1}=2.00µs, emitting_time{partition=0, __distributed_datafusion__task_id=1}=3.33µs, spill_count{partition=0, __distributed_datafusion__task_id=1}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=1}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=1}=0, peak_mem_used{partition=0, __distributed_datafusion__task_id=1}=18.82 K, skipped_aggregation_rows{partition=0, __distributed_datafusion__task_id=1}=0, reduction_factor{partition=0, __distributed_datafusion__task_id=1}=1.6% (2/122), start_timestamp{partition=0, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.779136 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.783327 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=2}=63.29µs, output_rows{partition=0, __distributed_datafusion__task_id=2}=2, output_bytes{partition=0, __distributed_datafusion__task_id=2}=16.0 KB, output_batches{partition=0, __distributed_datafusion__task_id=2}=1, time_calculating_group_ids{partition=0, __distributed_datafusion__task_id=2}=26.33µs, aggregate_arguments_time{partition=0, __distributed_datafusion__task_id=2}=3.25µs, aggregation_time{partition=0, __distributed_datafusion__task_id=2}=2.08µs, emitting_time{partition=0, __distributed_datafusion__task_id=2}=5.12µs, spill_count{partition=0, __distributed_datafusion__task_id=2}=0, spilled_bytes{partition=0, __distributed_datafusion__task_id=2}=0.0 B, spilled_rows{partition=0, __distributed_datafusion__task_id=2}=0, peak_mem_used{partition=0, __distributed_datafusion__task_id=2}=18.82 K, skipped_aggregation_rows{partition=0, __distributed_datafusion__task_id=2}=0, reduction_factor{partition=0, __distributed_datafusion__task_id=2}=1.6% (2/122)]
                    PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
                      DataSourceExec: file_groups={3 groups: [[Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet], [Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet], [Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[time_elapsed_opening{partition=0, __distributed_datafusion__task_id=0}=784.62µs, time_elapsed_scanning_until_data{partition=0, __distributed_datafusion__task_id=0}=554.04µs, time_elapsed_scanning_total{partition=0, __distributed_datafusion__task_id=0}=660.88µs, time_elapsed_processing{partition=0, __distributed_datafusion__task_id=0}=616.37µs, file_open_errors{partition=0, __distributed_datafusion__task_id=0}=0, file_scan_errors{partition=0, __distributed_datafusion__task_id=0}=0, start_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.774414 UTC, end_timestamp{partition=0, __distributed_datafusion__task_id=0}=2026-01-26 08:59:13.776161 UTC, elapsed_compute{partition=0, __distributed_datafusion__task_id=0}=1ns, output_rows{partition=0, __distributed_datafusion__task_id=0}=122, output_bytes{partition=0, __distributed_datafusion__task_id=0}=2.7 KB, output_batches{partition=0, __distributed_datafusion__task_id=0}=1, batches_split{partition=0, __distributed_datafusion__task_id=0}=0, row_groups_pruned_bloom_filter{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1 total → 1 matched, row_groups_pruned_statistics{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1 total → 1 matched, page_index_rows_pruned{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0 total → 0 matched, bytes_scanned{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, metadata_load_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=713.42µs, files_ranges_pruned_statistics{partition=0, __distributed_datafusion__task_id=0}=1 total → 1 matched, scan_efficiency_ratio{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=N/A (0/0), predicate_evaluation_errors{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, pushdown_rows_pruned{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, pushdown_rows_matched{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, row_pushdown_eval_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1ns, statistics_eval_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1ns, bloom_filter_eval_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1ns, page_index_eval_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1ns, predicate_cache_inner_records{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, predicate_cache_records{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, row_groups_pruned_bloom_filter{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0 total → 0 matched, row_groups_pruned_statistics{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0 total → 0 matched, page_index_rows_pruned{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0 total → 0 matched, bytes_scanned{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1.07 K, metadata_load_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1ns, files_ranges_pruned_statistics{partition=0, __distributed_datafusion__task_id=0}=0 total → 0 matched, scan_efficiency_ratio{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=2.3% (1.07 K/46.08 K), predicate_evaluation_errors{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, pushdown_rows_pruned{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, pushdown_rows_matched{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, row_pushdown_eval_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1ns, statistics_eval_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1ns, bloom_filter_eval_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1ns, page_index_eval_time{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=1ns, predicate_cache_inner_records{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, predicate_cache_records{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet, __distributed_datafusion__task_id=0}=0, num_predicate_creation_errors{__distributed_datafusion__task_id=0}=0, time_elapsed_opening{partition=1, __distributed_datafusion__task_id=1}=1.10ms, time_elapsed_scanning_until_data{partition=1, __distributed_datafusion__task_id=1}=862.08µs, time_elapsed_scanning_total{partition=1, __distributed_datafusion__task_id=1}=928.21µs, time_elapsed_processing{partition=1, __distributed_datafusion__task_id=1}=400.67µs, file_open_errors{partition=1, __distributed_datafusion__task_id=1}=0, file_scan_errors{partition=1, __distributed_datafusion__task_id=1}=0, start_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.780629 UTC, end_timestamp{partition=1, __distributed_datafusion__task_id=1}=2026-01-26 08:59:13.783941 UTC, elapsed_compute{partition=1, __distributed_datafusion__task_id=1}=1ns, output_rows{partition=1, __distributed_datafusion__task_id=1}=122, output_bytes{partition=1, __distributed_datafusion__task_id=1}=2.7 KB, output_batches{partition=1, __distributed_datafusion__task_id=1}=1, batches_split{partition=1, __distributed_datafusion__task_id=1}=0, row_groups_pruned_bloom_filter{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1 total → 1 matched, row_groups_pruned_statistics{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1 total → 1 matched, page_index_rows_pruned{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0 total → 0 matched, bytes_scanned{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, metadata_load_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1.06ms, files_ranges_pruned_statistics{partition=1, __distributed_datafusion__task_id=1}=1 total → 1 matched, scan_efficiency_ratio{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=N/A (0/0), predicate_evaluation_errors{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, pushdown_rows_pruned{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, pushdown_rows_matched{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, row_pushdown_eval_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1ns, statistics_eval_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1ns, bloom_filter_eval_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1ns, page_index_eval_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1ns, predicate_cache_inner_records{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, predicate_cache_records{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, row_groups_pruned_bloom_filter{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0 total → 0 matched, row_groups_pruned_statistics{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0 total → 0 matched, page_index_rows_pruned{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0 total → 0 matched, bytes_scanned{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1.01 K, metadata_load_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1ns, files_ranges_pruned_statistics{partition=1, __distributed_datafusion__task_id=1}=0 total → 0 matched, scan_efficiency_ratio{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=2.2% (1.01 K/45.46 K), predicate_evaluation_errors{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, pushdown_rows_pruned{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, pushdown_rows_matched{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, row_pushdown_eval_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1ns, statistics_eval_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1ns, bloom_filter_eval_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1ns, page_index_eval_time{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=1ns, predicate_cache_inner_records{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, predicate_cache_records{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet, __distributed_datafusion__task_id=1}=0, num_predicate_creation_errors{__distributed_datafusion__task_id=1}=0, time_elapsed_opening{partition=2, __distributed_datafusion__task_id=2}=994.67µs, time_elapsed_scanning_until_data{partition=2, __distributed_datafusion__task_id=2}=390.46µs, time_elapsed_scanning_total{partition=2, __distributed_datafusion__task_id=2}=463.08µs, time_elapsed_processing{partition=2, __distributed_datafusion__task_id=2}=505.33µs, file_open_errors{partition=2, __distributed_datafusion__task_id=2}=0, file_scan_errors{partition=2, __distributed_datafusion__task_id=2}=0, start_timestamp{partition=2, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.779133 UTC, end_timestamp{partition=2, __distributed_datafusion__task_id=2}=2026-01-26 08:59:13.783277 UTC, elapsed_compute{partition=2, __distributed_datafusion__task_id=2}=1ns, output_rows{partition=2, __distributed_datafusion__task_id=2}=122, output_bytes{partition=2, __distributed_datafusion__task_id=2}=2.7 KB, output_batches{partition=2, __distributed_datafusion__task_id=2}=1, batches_split{partition=2, __distributed_datafusion__task_id=2}=0, row_groups_pruned_bloom_filter{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1 total → 1 matched, row_groups_pruned_statistics{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1 total → 1 matched, page_index_rows_pruned{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0 total → 0 matched, bytes_scanned{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, metadata_load_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=933.50µs, files_ranges_pruned_statistics{partition=2, __distributed_datafusion__task_id=2}=1 total → 1 matched, scan_efficiency_ratio{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=N/A (0/0), predicate_evaluation_errors{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, pushdown_rows_pruned{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, pushdown_rows_matched{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, row_pushdown_eval_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1ns, statistics_eval_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1ns, bloom_filter_eval_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1ns, page_index_eval_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1ns, predicate_cache_inner_records{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, predicate_cache_records{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, row_groups_pruned_bloom_filter{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0 total → 0 matched, row_groups_pruned_statistics{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0 total → 0 matched, page_index_rows_pruned{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0 total → 0 matched, bytes_scanned{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1.03 K, metadata_load_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1ns, files_ranges_pruned_statistics{partition=2, __distributed_datafusion__task_id=2}=0 total → 0 matched, scan_efficiency_ratio{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=2.3% (1.03 K/45.82 K), predicate_evaluation_errors{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, pushdown_rows_pruned{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, pushdown_rows_matched{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, row_pushdown_eval_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1ns, statistics_eval_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1ns, bloom_filter_eval_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1ns, page_index_eval_time{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=1ns, predicate_cache_inner_records{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, predicate_cache_records{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet, __distributed_datafusion__task_id=2}=0, num_predicate_creation_errors{__distributed_datafusion__task_id=2}=0]

In #307:

DistributedExec, metrics=[]
  ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[expr_0_eval_time_0{partition=0, expr=count(*) AS count(*)}=208ns, expr_1_eval_time_0{partition=0, expr=RainToday AS RainToday}=41ns, start_timestamp_0{partition=0}=1769417.9 T, end_timestamp_0{partition=0}=1769417.9 T, elapsed_compute_0{partition=0}=1.71µs, output_rows_0{partition=0}=2, output_bytes_0{partition=0}=48, output_batches_0{partition=0}=1]
    SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[start_timestamp_0{partition=0}=1769417.9 T, end_timestamp_0{partition=0}=1769417.9 T, elapsed_compute_0{partition=0}=24.54µs, output_rows_0{partition=0}=2, output_bytes_0{partition=0}=64, output_batches_0{partition=0}=1]
      [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
        SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[start_timestamp_0{partition=0}=1769417.9 T, end_timestamp_0{partition=0}=1769417.9 T, elapsed_compute_0{partition=0}=8.71µs, output_rows_0{partition=0}=1, output_bytes_0{partition=0}=0, output_batches_0{partition=0}=0, spill_count_0{partition=0}=0, spilled_bytes_0{partition=0}=0, spilled_rows_0{partition=0}=0, start_timestamp_0{partition=1}=1769417.9 T, end_timestamp_0{partition=1}=1769417.9 T, elapsed_compute_0{partition=1}=1ns, output_rows_0{partition=1}=0, output_bytes_0{partition=1}=0, output_batches_0{partition=1}=0, spill_count_0{partition=1}=0, spilled_bytes_0{partition=1}=0, spilled_rows_0{partition=1}=0, start_timestamp_0{partition=2}=1769417.9 T, end_timestamp_0{partition=2}=1769417.9 T, elapsed_compute_0{partition=2}=1ns, output_rows_0{partition=2}=0, output_bytes_0{partition=2}=0, output_batches_0{partition=2}=0, spill_count_0{partition=2}=0, spilled_bytes_0{partition=2}=0, spilled_rows_0{partition=2}=0, start_timestamp_1{partition=0}=1769417.9 T, end_timestamp_1{partition=0}=1769417.9 T, elapsed_compute_1{partition=0}=1ns, output_rows_1{partition=0}=0, output_bytes_1{partition=0}=0, output_batches_1{partition=0}=0, spill_count_1{partition=0}=0, spilled_bytes_1{partition=0}=0, spilled_rows_1{partition=0}=0, start_timestamp_1{partition=1}=1769417.9 T, end_timestamp_1{partition=1}=1769417.9 T, elapsed_compute_1{partition=1}=1ns, output_rows_1{partition=1}=0, output_bytes_1{partition=1}=0, output_batches_1{partition=1}=0, spill_count_1{partition=1}=0, spilled_bytes_1{partition=1}=0, spilled_rows_1{partition=1}=0, start_timestamp_1{partition=2}=1769417.9 T, end_timestamp_1{partition=2}=1769417.9 T, elapsed_compute_1{partition=2}=4.83µs, output_rows_1{partition=2}=1, output_bytes_1{partition=2}=0, output_batches_1{partition=2}=0, spill_count_1{partition=2}=0, spilled_bytes_1{partition=2}=0, spilled_rows_1{partition=2}=0]
          ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[expr_0_eval_time_0{partition=0, expr=count(Int64(1)) AS count(*)}=125ns, expr_1_eval_time_0{partition=0, expr=RainToday AS RainToday}=83ns, expr_2_eval_time_0{partition=0, expr=count(Int64(1)) AS count(Int64(1))}=84ns, start_timestamp_0{partition=0}=1769417.9 T, end_timestamp_0{partition=0}=1769417.9 T, elapsed_compute_0{partition=0}=2.08µs, output_rows_0{partition=0}=1, output_bytes_0{partition=0}=16.42 K, output_batches_0{partition=0}=1, expr_0_eval_time_0{partition=1, expr=count(Int64(1)) AS count(*)}=1ns, expr_1_eval_time_0{partition=1, expr=RainToday AS RainToday}=1ns, expr_2_eval_time_0{partition=1, expr=count(Int64(1)) AS count(Int64(1))}=1ns, start_timestamp_0{partition=1}=1769417.9 T, end_timestamp_0{partition=1}=1769417.9 T, elapsed_compute_0{partition=1}=1ns, output_rows_0{partition=1}=0, output_bytes_0{partition=1}=0, output_batches_0{partition=1}=0, expr_0_eval_time_0{partition=2, expr=count(Int64(1)) AS count(*)}=1ns, expr_1_eval_time_0{partition=2, expr=RainToday AS RainToday}=1ns, expr_2_eval_time_0{partition=2, expr=count(Int64(1)) AS count(Int64(1))}=1ns, start_timestamp_0{partition=2}=1769417.9 T, end_timestamp_0{partition=2}=1769417.9 T, elapsed_compute_0{partition=2}=1ns, output_rows_0{partition=2}=0, output_bytes_0{partition=2}=0, output_batches_0{partition=2}=0, expr_0_eval_time_1{partition=0, expr=count(Int64(1)) AS count(*)}=1ns, expr_1_eval_time_1{partition=0, expr=RainToday AS RainToday}=1ns, expr_2_eval_time_1{partition=0, expr=count(Int64(1)) AS count(Int64(1))}=1ns, start_timestamp_1{partition=0}=1769417.9 T, end_timestamp_1{partition=0}=1769417.9 T, elapsed_compute_1{partition=0}=1ns, output_rows_1{partition=0}=0, output_bytes_1{partition=0}=0, output_batches_1{partition=0}=0, expr_0_eval_time_1{partition=1, expr=count(Int64(1)) AS count(*)}=1ns, expr_1_eval_time_1{partition=1, expr=RainToday AS RainToday}=1ns, expr_2_eval_time_1{partition=1, expr=count(Int64(1)) AS count(Int64(1))}=1ns, start_timestamp_1{partition=1}=1769417.9 T, end_timestamp_1{partition=1}=1769417.9 T, elapsed_compute_1{partition=1}=1ns, output_rows_1{partition=1}=0, output_bytes_1{partition=1}=0, output_batches_1{partition=1}=0, expr_0_eval_time_1{partition=2, expr=count(Int64(1)) AS count(*)}=208ns, expr_1_eval_time_1{partition=2, expr=RainToday AS RainToday}=84ns, expr_2_eval_time_1{partition=2, expr=count(Int64(1)) AS count(Int64(1))}=41ns, start_timestamp_1{partition=2}=1769417.9 T, end_timestamp_1{partition=2}=1769417.9 T, elapsed_compute_1{partition=2}=1.62µs, output_rows_1{partition=2}=1, output_bytes_1{partition=2}=16.42 K, output_batches_1{partition=2}=1]
            AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[start_timestamp_0{partition=0}=1769417.9 T, end_timestamp_0{partition=0}=1769417.9 T, elapsed_compute_0{partition=0}=53.83µs, output_rows_0{partition=0}=1, output_bytes_0{partition=0}=16.42 K, output_batches_0{partition=0}=1, time_calculating_group_ids_0{partition=0}=4.96µs, aggregate_arguments_time_0{partition=0}=2.21µs, aggregation_time_0{partition=0}=1.96µs, emitting_time_0{partition=0}=2.79µs, spill_count_0{partition=0}=0, spilled_bytes_0{partition=0}=0, spilled_rows_0{partition=0}=0, peak_mem_used_0{partition=0}=16.89 K, start_timestamp_0{partition=1}=1769417.9 T, end_timestamp_0{partition=1}=1769417.9 T, elapsed_compute_0{partition=1}=20.83µs, output_rows_0{partition=1}=0, output_bytes_0{partition=1}=0, output_batches_0{partition=1}=0, time_calculating_group_ids_0{partition=1}=1ns, aggregate_arguments_time_0{partition=1}=1ns, aggregation_time_0{partition=1}=1ns, emitting_time_0{partition=1}=1ns, spill_count_0{partition=1}=0, spilled_bytes_0{partition=1}=0, spilled_rows_0{partition=1}=0, peak_mem_used_0{partition=1}=16.79 K, start_timestamp_0{partition=2}=1769417.9 T, end_timestamp_0{partition=2}=1769417.9 T, elapsed_compute_0{partition=2}=20.17µs, output_rows_0{partition=2}=0, output_bytes_0{partition=2}=0, output_batches_0{partition=2}=0, time_calculating_group_ids_0{partition=2}=1ns, aggregate_arguments_time_0{partition=2}=1ns, aggregation_time_0{partition=2}=1ns, emitting_time_0{partition=2}=1ns, spill_count_0{partition=2}=0, spilled_bytes_0{partition=2}=0, spilled_rows_0{partition=2}=0, peak_mem_used_0{partition=2}=16.79 K, start_timestamp_1{partition=0}=1769417.9 T, end_timestamp_1{partition=0}=1769417.9 T, elapsed_compute_1{partition=0}=17.92µs, output_rows_1{partition=0}=0, output_bytes_1{partition=0}=0, output_batches_1{partition=0}=0, time_calculating_group_ids_1{partition=0}=1ns, aggregate_arguments_time_1{partition=0}=1ns, aggregation_time_1{partition=0}=1ns, emitting_time_1{partition=0}=1ns, spill_count_1{partition=0}=0, spilled_bytes_1{partition=0}=0, spilled_rows_1{partition=0}=0, peak_mem_used_1{partition=0}=16.79 K, start_timestamp_1{partition=1}=1769417.9 T, end_timestamp_1{partition=1}=1769417.9 T, elapsed_compute_1{partition=1}=16.54µs, output_rows_1{partition=1}=0, output_bytes_1{partition=1}=0, output_batches_1{partition=1}=0, time_calculating_group_ids_1{partition=1}=1ns, aggregate_arguments_time_1{partition=1}=1ns, aggregation_time_1{partition=1}=1ns, emitting_time_1{partition=1}=1ns, spill_count_1{partition=1}=0, spilled_bytes_1{partition=1}=0, spilled_rows_1{partition=1}=0, peak_mem_used_1{partition=1}=16.79 K, start_timestamp_1{partition=2}=1769417.9 T, end_timestamp_1{partition=2}=1769417.9 T, elapsed_compute_1{partition=2}=40.21µs, output_rows_1{partition=2}=1, output_bytes_1{partition=2}=16.42 K, output_batches_1{partition=2}=1, time_calculating_group_ids_1{partition=2}=5.71µs, aggregate_arguments_time_1{partition=2}=2.96µs, aggregation_time_1{partition=2}=2.83µs, emitting_time_1{partition=2}=2.29µs, spill_count_1{partition=2}=0, spilled_bytes_1{partition=2}=0, spilled_rows_1{partition=2}=0, peak_mem_used_1{partition=2}=16.89 K]
              [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
                RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[spill_count_0{partition=0}=0, spilled_bytes_0{partition=0}=0, spilled_rows_0{partition=0}=0, fetch_time_0{partition=0}=2.61ms, repartition_time_0{partition=0}=7.42µs, send_time_0{partition=0, outputPartition=0}=1.62µs, send_time_0{partition=0, outputPartition=1}=1ns, send_time_0{partition=0, outputPartition=2}=1ns, send_time_0{partition=0, outputPartition=3}=1ns, send_time_0{partition=0, outputPartition=4}=1ns, send_time_0{partition=0, outputPartition=5}=1.29µs, spill_count_0{partition=1}=0, spilled_bytes_0{partition=1}=0, spilled_rows_0{partition=1}=0, spill_count_0{partition=2}=0, spilled_bytes_0{partition=2}=0, spilled_rows_0{partition=2}=0, spill_count_0{partition=3}=0, spilled_bytes_0{partition=3}=0, spilled_rows_0{partition=3}=0, spill_count_0{partition=4}=0, spilled_bytes_0{partition=4}=0, spilled_rows_0{partition=4}=0, spill_count_0{partition=5}=0, spilled_bytes_0{partition=5}=0, spilled_rows_0{partition=5}=0, start_timestamp_0{partition=0}=1769417.9 T, end_timestamp_0{partition=0}=1769417.9 T, elapsed_compute_0{partition=0}=6.00µs, output_rows_0{partition=0}=1, output_bytes_0{partition=0}=196.6 K, output_batches_0{partition=0}=1, start_timestamp_0{partition=1}=1769417.9 T, end_timestamp_0{partition=1}=1769417.9 T, elapsed_compute_0{partition=1}=418ns, output_rows_0{partition=1}=0, output_bytes_0{partition=1}=0, output_batches_0{partition=1}=0, start_timestamp_0{partition=2}=1769417.9 T, end_timestamp_0{partition=2}=1769417.9 T, elapsed_compute_0{partition=2}=250ns, output_rows_0{partition=2}=0, output_bytes_0{partition=2}=0, output_batches_0{partition=2}=0, start_timestamp_0{partition=3}=1769417.9 T, end_timestamp_0{partition=3}=1769417.9 T, elapsed_compute_0{partition=3}=290ns, output_rows_0{partition=3}=0, output_bytes_0{partition=3}=0, output_batches_0{partition=3}=0, start_timestamp_0{partition=4}=1769417.9 T, end_timestamp_0{partition=4}=1769417.9 T, elapsed_compute_0{partition=4}=417ns, output_rows_0{partition=4}=0, output_bytes_0{partition=4}=0, output_batches_0{partition=4}=0, start_timestamp_0{partition=5}=1769417.9 T, end_timestamp_0{partition=5}=1769417.9 T, elapsed_compute_0{partition=5}=4.54µs, output_rows_0{partition=5}=1, output_bytes_0{partition=5}=196.6 K, output_batches_0{partition=5}=1, spill_count_1{partition=0}=0, spilled_bytes_1{partition=0}=0, spilled_rows_1{partition=0}=0, fetch_time_1{partition=0}=2.05ms, repartition_time_1{partition=0}=10.00µs, send_time_1{partition=0, outputPartition=0}=2.17µs, send_time_1{partition=0, outputPartition=1}=1ns, send_time_1{partition=0, outputPartition=2}=1ns, send_time_1{partition=0, outputPartition=3}=1ns, send_time_1{partition=0, outputPartition=4}=1ns, send_time_1{partition=0, outputPartition=5}=1.50µs, spill_count_1{partition=1}=0, spilled_bytes_1{partition=1}=0, spilled_rows_1{partition=1}=0, spill_count_1{partition=2}=0, spilled_bytes_1{partition=2}=0, spilled_rows_1{partition=2}=0, spill_count_1{partition=3}=0, spilled_bytes_1{partition=3}=0, spilled_rows_1{partition=3}=0, spill_count_1{partition=4}=0, spilled_bytes_1{partition=4}=0, spilled_rows_1{partition=4}=0, spill_count_1{partition=5}=0, spilled_bytes_1{partition=5}=0, spilled_rows_1{partition=5}=0, start_timestamp_1{partition=0}=1769417.9 T, end_timestamp_1{partition=0}=1769417.9 T, elapsed_compute_1{partition=0}=5.67µs, output_rows_1{partition=0}=1, output_bytes_1{partition=0}=196.6 K, output_batches_1{partition=0}=1, start_timestamp_1{partition=1}=1769417.9 T, end_timestamp_1{partition=1}=1769417.9 T, elapsed_compute_1{partition=1}=416ns, output_rows_1{partition=1}=0, output_bytes_1{partition=1}=0, output_batches_1{partition=1}=0, start_timestamp_1{partition=2}=1769417.9 T, end_timestamp_1{partition=2}=1769417.9 T, elapsed_compute_1{partition=2}=335ns, output_rows_1{partition=2}=0, output_bytes_1{partition=2}=0, output_batches_1{partition=2}=0, start_timestamp_1{partition=3}=1769417.9 T, end_timestamp_1{partition=3}=1769417.9 T, elapsed_compute_1{partition=3}=375ns, output_rows_1{partition=3}=0, output_bytes_1{partition=3}=0, output_batches_1{partition=3}=0, start_timestamp_1{partition=4}=1769417.9 T, end_timestamp_1{partition=4}=1769417.9 T, elapsed_compute_1{partition=4}=376ns, output_rows_1{partition=4}=0, output_bytes_1{partition=4}=0, output_batches_1{partition=4}=0, start_timestamp_1{partition=5}=1769417.9 T, end_timestamp_1{partition=5}=1769417.9 T, elapsed_compute_1{partition=5}=4.75µs, output_rows_1{partition=5}=1, output_bytes_1{partition=5}=196.6 K, output_batches_1{partition=5}=1, spill_count_2{partition=0}=0, spilled_bytes_2{partition=0}=0, spilled_rows_2{partition=0}=0, fetch_time_2{partition=0}=5.50ms, repartition_time_2{partition=0}=16.79µs, send_time_2{partition=0, outputPartition=0}=2.29µs, send_time_2{partition=0, outputPartition=1}=1ns, send_time_2{partition=0, outputPartition=2}=1ns, send_time_2{partition=0, outputPartition=3}=1ns, send_time_2{partition=0, outputPartition=4}=1ns, send_time_2{partition=0, outputPartition=5}=1.25µs, spill_count_2{partition=1}=0, spilled_bytes_2{partition=1}=0, spilled_rows_2{partition=1}=0, spill_count_2{partition=2}=0, spilled_bytes_2{partition=2}=0, spilled_rows_2{partition=2}=0, start_timestamp_2{partition=0}=1769417.9 T, end_timestamp_2{partition=0}=1769417.9 T, elapsed_compute_2{partition=0}=9.04µs, output_rows_2{partition=0}=1, output_bytes_2{partition=0}=196.6 K, output_batches_2{partition=0}=1, start_timestamp_2{partition=1}=1769417.9 T, end_timestamp_2{partition=1}=1769417.9 T, elapsed_compute_2{partition=1}=460ns, output_rows_2{partition=1}=0, output_bytes_2{partition=1}=0, output_batches_2{partition=1}=0, start_timestamp_2{partition=2}=1769417.9 T, end_timestamp_2{partition=2}=1769417.9 T, elapsed_compute_2{partition=2}=417ns, output_rows_2{partition=2}=0, output_bytes_2{partition=2}=0, output_batches_2{partition=2}=0, spill_count_2{partition=3}=0, spilled_bytes_2{partition=3}=0, spilled_rows_2{partition=3}=0, spill_count_2{partition=4}=0, spilled_bytes_2{partition=4}=0, spilled_rows_2{partition=4}=0, spill_count_2{partition=5}=0, spilled_bytes_2{partition=5}=0, spilled_rows_2{partition=5}=0, start_timestamp_2{partition=3}=1769417.9 T, end_timestamp_2{partition=3}=1769417.9 T, elapsed_compute_2{partition=3}=543ns, output_rows_2{partition=3}=0, output_bytes_2{partition=3}=0, output_batches_2{partition=3}=0, start_timestamp_2{partition=4}=1769417.9 T, end_timestamp_2{partition=4}=1769417.9 T, elapsed_compute_2{partition=4}=375ns, output_rows_2{partition=4}=0, output_bytes_2{partition=4}=0, output_batches_2{partition=4}=0, start_timestamp_2{partition=5}=1769417.9 T, end_timestamp_2{partition=5}=1769417.9 T, elapsed_compute_2{partition=5}=6.83µs, output_rows_2{partition=5}=1, output_bytes_2{partition=5}=196.6 K, output_batches_2{partition=5}=1]
                  AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[start_timestamp_0{partition=0}=1769417.9 T, end_timestamp_0{partition=0}=1769417.9 T, elapsed_compute_0{partition=0}=55.67µs, output_rows_0{partition=0}=2, output_bytes_0{partition=0}=16.42 K, output_batches_0{partition=0}=1, time_calculating_group_ids_0{partition=0}=27.88µs, aggregate_arguments_time_0{partition=0}=2.46µs, aggregation_time_0{partition=0}=1.88µs, emitting_time_0{partition=0}=3.83µs, spill_count_0{partition=0}=0, spilled_bytes_0{partition=0}=0, spilled_rows_0{partition=0}=0, peak_mem_used_0{partition=0}=18.82 K, skipped_aggregation_rows_0{partition=0}=0, reduction_factor_0{partition=0}=1, start_timestamp_1{partition=0}=1769417.9 T, end_timestamp_1{partition=0}=1769417.9 T, elapsed_compute_1{partition=0}=58.25µs, output_rows_1{partition=0}=2, output_bytes_1{partition=0}=16.42 K, output_batches_1{partition=0}=1, time_calculating_group_ids_1{partition=0}=26.00µs, aggregate_arguments_time_1{partition=0}=2.88µs, aggregation_time_1{partition=0}=1.96µs, emitting_time_1{partition=0}=4.04µs, spill_count_1{partition=0}=0, spilled_bytes_1{partition=0}=0, spilled_rows_1{partition=0}=0, peak_mem_used_1{partition=0}=18.82 K, skipped_aggregation_rows_1{partition=0}=0, reduction_factor_1{partition=0}=1, start_timestamp_2{partition=0}=1769417.9 T, end_timestamp_2{partition=0}=1769417.9 T, elapsed_compute_2{partition=0}=70.29µs, output_rows_2{partition=0}=2, output_bytes_2{partition=0}=16.42 K, output_batches_2{partition=0}=1, time_calculating_group_ids_2{partition=0}=29.42µs, aggregate_arguments_time_2{partition=0}=4.46µs, aggregation_time_2{partition=0}=3.00µs, emitting_time_2{partition=0}=5.58µs, spill_count_2{partition=0}=0, spilled_bytes_2{partition=0}=0, spilled_rows_2{partition=0}=0, peak_mem_used_2{partition=0}=18.82 K, skipped_aggregation_rows_2{partition=0}=0, reduction_factor_2{partition=0}=1]
                    PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
                      DataSourceExec: file_groups={3 groups: [[Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet], [Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet], [Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[time_elapsed_opening_0{partition=0}=1.72ms, time_elapsed_scanning_until_data_0{partition=0}=776.62µs, time_elapsed_scanning_total_0{partition=0}=844.75µs, time_elapsed_processing_0{partition=0}=392.79µs, file_open_errors_0{partition=0}=0, file_scan_errors_0{partition=0}=0, start_timestamp_0{partition=0}=1769417.9 T, end_timestamp_0{partition=0}=1769417.9 T, elapsed_compute_0{partition=0}=1ns, output_rows_0{partition=0}=122, output_bytes_0{partition=0}=2.81 K, output_batches_0{partition=0}=1, batches_split_0{partition=0}=0, row_groups_pruned_bloom_filter_matched_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1, row_groups_pruned_statistics_matched_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1, page_index_rows_pruned_matched_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, bytes_scanned_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, metadata_load_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1.69ms, files_ranges_pruned_statistics_matched_0{partition=0}=1, scan_efficiency_ratio_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, predicate_evaluation_errors_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, pushdown_rows_pruned_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, pushdown_rows_matched_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, row_pushdown_eval_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1ns, statistics_eval_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1ns, bloom_filter_eval_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1ns, page_index_eval_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1ns, predicate_cache_inner_records_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, predicate_cache_records_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, row_groups_pruned_bloom_filter_matched_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, row_groups_pruned_statistics_matched_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, page_index_rows_pruned_matched_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, bytes_scanned_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1.07 K, metadata_load_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1ns, files_ranges_pruned_statistics_matched_0{partition=0}=0, scan_efficiency_ratio_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=2, predicate_evaluation_errors_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, pushdown_rows_pruned_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, pushdown_rows_matched_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, row_pushdown_eval_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1ns, statistics_eval_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1ns, bloom_filter_eval_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1ns, page_index_eval_time_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=1ns, predicate_cache_inner_records_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, predicate_cache_records_0{partition=0, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000000.parquet}=0, num_predicate_creation_errors_0=0, time_elapsed_opening_1{partition=1}=1.46ms, time_elapsed_scanning_until_data_1{partition=1}=476.08µs, time_elapsed_scanning_total_1{partition=1}=541.21µs, time_elapsed_processing_1{partition=1}=418.92µs, file_open_errors_1{partition=1}=0, file_scan_errors_1{partition=1}=0, start_timestamp_1{partition=1}=1769417.9 T, end_timestamp_1{partition=1}=1769417.9 T, elapsed_compute_1{partition=1}=1ns, output_rows_1{partition=1}=122, output_bytes_1{partition=1}=2.79 K, output_batches_1{partition=1}=1, batches_split_1{partition=1}=0, row_groups_pruned_bloom_filter_matched_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1, row_groups_pruned_statistics_matched_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1, page_index_rows_pruned_matched_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, bytes_scanned_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, metadata_load_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1.40ms, files_ranges_pruned_statistics_matched_1{partition=1}=1, scan_efficiency_ratio_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, predicate_evaluation_errors_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, pushdown_rows_pruned_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, pushdown_rows_matched_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, row_pushdown_eval_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1ns, statistics_eval_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1ns, bloom_filter_eval_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1ns, page_index_eval_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1ns, predicate_cache_inner_records_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, predicate_cache_records_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, row_groups_pruned_bloom_filter_matched_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, row_groups_pruned_statistics_matched_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, page_index_rows_pruned_matched_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, bytes_scanned_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1.01 K, metadata_load_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1ns, files_ranges_pruned_statistics_matched_1{partition=1}=0, scan_efficiency_ratio_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=2, predicate_evaluation_errors_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, pushdown_rows_pruned_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, pushdown_rows_matched_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, row_pushdown_eval_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1ns, statistics_eval_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1ns, bloom_filter_eval_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1ns, page_index_eval_time_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=1ns, predicate_cache_inner_records_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, predicate_cache_records_1{partition=1, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000001.parquet}=0, num_predicate_creation_errors_1=0, time_elapsed_opening_2{partition=2}=4.84ms, time_elapsed_scanning_until_data_2{partition=2}=533.58µs, time_elapsed_scanning_total_2{partition=2}=614.04µs, time_elapsed_processing_2{partition=2}=469.79µs, file_open_errors_2{partition=2}=0, file_scan_errors_2{partition=2}=0, start_timestamp_2{partition=2}=1769417.9 T, end_timestamp_2{partition=2}=1769417.9 T, elapsed_compute_2{partition=2}=1ns, output_rows_2{partition=2}=122, output_bytes_2{partition=2}=2.80 K, output_batches_2{partition=2}=1, batches_split_2{partition=2}=0, row_groups_pruned_bloom_filter_matched_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1, row_groups_pruned_statistics_matched_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1, page_index_rows_pruned_matched_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, bytes_scanned_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, metadata_load_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=4.78ms, files_ranges_pruned_statistics_matched_2{partition=2}=1, scan_efficiency_ratio_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, predicate_evaluation_errors_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, pushdown_rows_pruned_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, pushdown_rows_matched_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, row_pushdown_eval_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1ns, statistics_eval_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1ns, bloom_filter_eval_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1ns, page_index_eval_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1ns, predicate_cache_inner_records_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, predicate_cache_records_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, row_groups_pruned_bloom_filter_matched_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, row_groups_pruned_statistics_matched_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, page_index_rows_pruned_matched_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, bytes_scanned_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1.03 K, metadata_load_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1ns, files_ranges_pruned_statistics_matched_2{partition=2}=0, scan_efficiency_ratio_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=2, predicate_evaluation_errors_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, pushdown_rows_pruned_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, pushdown_rows_matched_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, row_pushdown_eval_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1ns, statistics_eval_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1ns, bloom_filter_eval_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1ns, page_index_eval_time_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=1ns, predicate_cache_inner_records_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, predicate_cache_records_2{partition=2, filename=Users/gabriel.musatmestre/github/datafusion-distributed-2/testdata/weather/result-000002.parquet}=0, num_predicate_creation_errors_2=0]

Even if it's a bit more verbose, I think what this PR displays in that case is slightly better.

metric.value().clone(),
metric.partition(),
labels,
)));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's much better definitely! at least now we don't perform atomic operations, I think that's a major win.

Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 left just 1 non-blocking comment. I think this PR is slightly better than #307, so for me +1 to this one.

pub use task_metrics_rewriter::rewrite_distributed_plan_with_metrics;
pub use task_metrics_rewriter::{DistributedMetricsFormat, rewrite_distributed_plan_with_metrics};

pub const DISTRIBUTED_DATAFUSION_TASK_ID_LABEL: &str = "__distributed_datafusion__task_id";
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tag might be displayed in a more DataFusion-native way of rendering plans. See #309 (review).

I think it might be worth it to assume that it's actually going to be displayed to humans, so I would call it something like just "task".

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure that sounds reasonable.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit odd to display the distributed plan using this btw.

println!("{}", DisplayableExecutionPlan::with_full_metrics(s_physical.as_ref()).indent(true).to_string());

You have to call rewrite_distributed_plan_with_metrics to hydrate the plan nodes with metrics (except for the root node) and to add the task id label.

This change adds a new enum `DistributedMetricsFormat` which is either `Aggregated` or `PerTask`.

`Aggregated` is the default. When we use `DisplayableExecutionPlan::with_metrics`, metrics are aggregated by name. So we leave metrics as is before displaying.

`PerTask` will configure the metrics re-writer to label metrics like `output_rows` with `__distributed_datafusion__task_id=x` at re-write time. At display time, we do 3 things to the `MetricSet` of a node:
1. Aggregate by name, task_id
2. Sort by name, task_id
3. Display metrics in the format `output_rows_x` where `x` is the task id.

If you look at what `DisplayableExecutionPlan::with_metrics` does, it does the exact same 3 steps above, except it only aggregates by name. The alternative approach is to rename metrics like in #307. I felt that this approach was more datafusion-naitive.

I also considered labelling metrics at collection-time, but I figured it's not worth the effort.

Sample output:

1. `DistributedMetricsFormat::Aggregated`
```
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_rows=2, elapsed_compute=1.92µs, output_bytes=48.0 B, output_batches=1, expr_0_eval_time=208ns, expr_1_eval_time=125ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_rows=2, elapsed_compute=10.83µs, output_bytes=64.0 B, output_batches=1]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=10.80µs, output_bytes=0.0 B, output_batches=0, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_rows=2, elapsed_compute=5.09µs, output_bytes=32.1 KB, output_batches=2, expr_0_eval_time=462ns,
expr_1_eval_time=254ns, expr_2_eval_time=170ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=165.50µs, output_bytes=32.1 KB, output_batches=2, spill_count=0, spilled_bytes=0.0 B, spilled_
rows=0, peak_mem_used=100.9 K, aggregate_arguments_time=9.38µs, aggregation_time=9.21µs, emitting_time=5.34µs, time_calculating_group_ids=18.75µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_rows=6, elapsed_compute=52.26µs, output_bytes=1152.0 KB, output_batches=6, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10
.72ms, repartition_time=39.67µs, send_time=13.72µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_rows=6, elapsed_compute=192.54µs, output_bytes=48.1 KB, output_batches=3, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, s
kipped_aggregation_rows=0, peak_mem_used=56.47 K, aggregate_arguments_time=10.42µs, aggregation_time=7.58µs, emitting_time=17.00µs, time_calculating_group_ids=81.88µs, reduction_factor=1.6% (6/366)]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[output_rows=366, elapsed_compute=3ns, output_bytes=8.2 KB, output_batches=3, file
s_ranges_pruned_statistics=3 total → 3 matched, row_groups_pruned_statistics=3 total → 3 matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_rows_pruned=0 total → 0 matched, batches_split=0, bytes_scanned=3.12 K, fil
e_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=6ns
, metadata_load_time=5.62ms, page_index_eval_time=6ns, row_pushdown_eval_time=6ns, statistics_eval_time=6ns, time_elapsed_opening=5.79ms, time_elapsed_processing=1.51ms, time_elapsed_scanning_total=4.77ms, time_elapsed_scanning_until_da
ta=4.54ms, scan_efficiency_ratio=2.3% (3.12 K/137.4 K)]
    └──────────────────────────────────────────────────
```

2. `DistributedMetricsFormat::PerTask`
```
───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday], metrics=[output_batches_0=1, output_bytes_0=48, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=1.58µs, expr_0_eva
l_time_0=167ns, expr_1_eval_time_0=83ns]
│   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST], metrics=[output_batches_0=1, output_bytes_0=64, output_rows_0=2, end_timestamp_0=1769118.1 T, start_timestamp_0=1769118.1 T, elapsed_compute_0=27.04µs]
│     [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2]
  │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_batches_0=0, output_batches_1=0, output_bytes_0=0, output_bytes_1=0, output_rows_0=1, output_rows_1=1, spill_count_0=0, spill_count_1=0, spill
ed_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=8.42µs, elapsed_compute_1=4.9
2µs]
  │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1
, output_rows_1=1, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, start_timestamp_0=5307354.2 T, start_timestamp_1=5307354.2 T, elapsed_compute_0=1.83µs, elapsed_compute_1=1.96µs, expr_0_eval_time_0=168ns, expr_0_eval_time_1=
168ns, expr_1_eval_time_0=43ns, expr_1_eval_time_1=85ns, expr_2_eval_time_0=86ns, expr_2_eval_time_1=85ns]
  │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_rows_0=1, output_rows_1=1, spi
ll_count_0=0, spill_count_1=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_rows_0=0, spilled_rows_1=0, end_timestamp_0=5307354.2 T, end_timestamp_1=5307354.2 T, peak_mem_used_0=50.47 K, peak_mem_used_1=50.47 K, start_timestamp_0=53073
54.2 T, start_timestamp_1=5307354.2 T, aggregate_arguments_time_0=3.17µs, aggregate_arguments_time_1=2.59µs, aggregation_time_0=3.13µs, aggregation_time_1=2.04µs, elapsed_compute_0=93.29µs, elapsed_compute_1=57.67µs, emitting_time_0=2.5
0µs, emitting_time_1=2.04µs, time_calculating_group_ids_0=7.75µs, time_calculating_group_ids_1=4.67µs]
  │       [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5]
    │ RepartitionExec: partitioning=Hash([RainToday@0], 6), input_partitions=1, metrics=[output_batches_0=2, output_batches_1=2, output_batches_2=2, output_bytes_0=393.2 K, output_bytes_1=393.2 K, output_bytes_2=393.2 K, output_rows_0=2
, output_rows_1=2, output_rows_2=2, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=10614708.4 T, end_time
stamp_1=10614708.4 T, end_timestamp_2=10614708.4 T, start_timestamp_0=10614708.4 T, start_timestamp_1=10614708.4 T, start_timestamp_2=10614708.4 T, elapsed_compute_0=10.42µs, elapsed_compute_1=11.29µs, elapsed_compute_2=19.63µs, fetch_t
ime_0=3.92ms, fetch_time_1=3.35ms, fetch_time_2=4.08ms, repartition_time_0=7.21µs, repartition_time_1=14.33µs, repartition_time_2=20.21µs, send_time_0=2.63µs, send_time_1=3.63µs, send_time_2=5.46µs]
    │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))], metrics=[output_batches_0=1, output_batches_1=1, output_batches_2=1, output_bytes_0=16.42 K, output_bytes_1=16.42 K, output_bytes_2=16.42 K, ou
tput_rows_0=2, output_rows_1=2, output_rows_2=2, skipped_aggregation_rows_0=0, skipped_aggregation_rows_1=0, skipped_aggregation_rows_2=0, spill_count_0=0, spill_count_1=0, spill_count_2=0, spilled_bytes_0=0, spilled_bytes_1=0, spilled_
bytes_2=0, spilled_rows_0=0, spilled_rows_1=0, spilled_rows_2=0, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, peak_mem_used_0=18.82 K, peak_mem_used_1=18.82 K, peak_mem_used_2=18.82 K, reduction
_factor_0=1, reduction_factor_1=1, reduction_factor_2=1, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, start_timestamp_2=1769118.1 T, aggregate_arguments_time_0=2.83µs, aggregate_arguments_time_1=5.17µs, aggregate_argume
nts_time_2=7.21µs, aggregation_time_0=2.00µs, aggregation_time_1=3.42µs, aggregation_time_2=5.12µs, elapsed_compute_0=54.71µs, elapsed_compute_1=71.75µs, elapsed_compute_2=85.38µs, emitting_time_0=3.38µs, emitting_time_1=6.83µs, emittin
g_time_2=8.33µs, time_calculating_group_ids_0=25.96µs, time_calculating_group_ids_1=30.79µs, time_calculating_group_ids_2=34.00µs]
    │     PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] , metrics=[]
    │       DataSourceExec: file_groups={3 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000000.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000001.parq
uet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet, metrics=[batches_split_0=0, batches_split_1=0, batches_split_2=0, bytes_scanned_0=1.07 K,
bytes_scanned_1=1.01 K, bytes_scanned_2=1.03 K, file_open_errors_0=0, file_open_errors_1=0, file_open_errors_2=0, file_scan_errors_0=0, file_scan_errors_1=0, file_scan_errors_2=0, files_ranges_pruned_statistics_matched_0=1, files_ranges
_pruned_statistics_matched_1=1, files_ranges_pruned_statistics_matched_2=1, num_predicate_creation_errors_0=0, num_predicate_creation_errors_1=0, num_predicate_creation_errors_2=0, output_batches_0=1, output_batches_1=1, output_batches_
2=1, output_bytes_0=2.81 K, output_bytes_1=2.79 K, output_bytes_2=2.80 K, output_rows_0=122, output_rows_1=122, output_rows_2=122, page_index_rows_pruned_matched_0=0, page_index_rows_pruned_matched_1=0, page_index_rows_pruned_matched_2=
0, predicate_cache_inner_records_0=0, predicate_cache_inner_records_1=0, predicate_cache_inner_records_2=0, predicate_cache_records_0=0, predicate_cache_records_1=0, predicate_cache_records_2=0, predicate_evaluation_errors_0=0, predicat
e_evaluation_errors_1=0, predicate_evaluation_errors_2=0, pushdown_rows_matched_0=0, pushdown_rows_matched_1=0, pushdown_rows_matched_2=0, pushdown_rows_pruned_0=0, pushdown_rows_pruned_1=0, pushdown_rows_pruned_2=0, row_groups_pruned_b
loom_filter_matched_0=1, row_groups_pruned_bloom_filter_matched_1=1, row_groups_pruned_bloom_filter_matched_2=1, row_groups_pruned_statistics_matched_0=1, row_groups_pruned_statistics_matched_1=1, row_groups_pruned_statistics_matched_2=
1, end_timestamp_0=1769118.1 T, end_timestamp_1=1769118.1 T, end_timestamp_2=1769118.1 T, scan_efficiency_ratio_0=2, scan_efficiency_ratio_1=2, scan_efficiency_ratio_2=2, start_timestamp_0=1769118.1 T, start_timestamp_1=1769118.1 T, sta
rt_timestamp_2=1769118.1 T, bloom_filter_eval_time_0=2ns, bloom_filter_eval_time_1=2ns, bloom_filter_eval_time_2=2ns, elapsed_compute_0=1ns, elapsed_compute_1=1ns, elapsed_compute_2=1ns, metadata_load_time_0=2.92ms, metadata_load_time_1
=2.77ms, metadata_load_time_2=3.29ms, page_index_eval_time_0=2ns, page_index_eval_time_1=2ns, page_index_eval_time_2=2ns, row_pushdown_eval_time_0=2ns, row_pushdown_eval_time_1=2ns, row_pushdown_eval_time_2=2ns, statistics_eval_time_0=2
ns, statistics_eval_time_1=2ns, statistics_eval_time_2=2ns, time_elapsed_opening_0=2.94ms, time_elapsed_opening_1=2.82ms, time_elapsed_opening_2=3.33ms, time_elapsed_processing_0=374.50µs, time_elapsed_processing_1=432.71µs, time_elapse
d_processing_2=485.12µs, time_elapsed_scanning_total_0=929.29µs, time_elapsed_scanning_total_1=444.71µs, time_elapsed_scanning_total_2=699.75µs, time_elapsed_scanning_until_data_0=866.62µs, time_elapsed_scanning_until_data_1=364.79µs, t
ime_elapsed_scanning_until_data_2=598.04µs]
    └──────────────────────────────────────────────────
```
Closes #304
@jayshrivastava jayshrivastava merged commit 394a521 into main Jan 26, 2026
7 checks passed
@jayshrivastava jayshrivastava deleted the js/per-task-metrics-2 branch January 26, 2026 17:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Aggregate metrics per task instead of globally

2 participants