From 20aa89cad836450878d11aaeee37fbe248466548 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 20 Nov 2024 17:57:34 -0700 Subject: [PATCH 01/10] Add custom metric for native shuffle read time --- .../src/execution/datafusion/shuffle_writer.rs | 18 ++++++++++++++++++ native/core/src/execution/operators/scan.rs | 5 +++-- .../shuffle/CometShuffleExchangeExec.scala | 8 +++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index 7587ff06dc..a080bf22b8 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -65,6 +65,7 @@ use crate::{ errors::{CometError, CometResult}, }; use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes; +use crate::execution::operators::ScanExec; /// The status of appending rows to a partition buffer. enum AppendRowStatus { @@ -139,6 +140,13 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result { let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); + let read_time = MetricBuilder::new(&self.metrics).subset_time("read_time", 0); + + let scan_time = if let Some(scan) = self.input.as_any().downcast_ref::() { + Some(scan.baseline_metrics.elapsed_compute().clone()) + } else { + None + }; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -151,6 +159,8 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), metrics, context, + read_time, + scan_time ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -1091,6 +1101,8 @@ async fn external_shuffle( partitioning: Partitioning, metrics: ShuffleRepartitionerMetrics, context: Arc, + read_time: Time, + scan_time: Option