diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 39d730eaafb49..9c536efce58bb 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -22,7 +22,7 @@ use arrow_ipc::CompressionType; #[cfg(feature = "parquet_encryption")] use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; -use crate::format::ExplainFormat; +use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; use crate::{DataFusionError, Result}; @@ -893,6 +893,11 @@ config_namespace! { /// (format=tree only) Maximum total width of the rendered tree. /// When set to 0, the tree will have no width limit. pub tree_maximum_render_width: usize, default = 240 + + /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev" + /// "summary" shows common metrics for high-level insights. + /// "dev" provides deep operator-level introspection for developers. + pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev } } diff --git a/datafusion/common/src/format.rs b/datafusion/common/src/format.rs index 06ec519ef356c..764190e1189bf 100644 --- a/datafusion/common/src/format.rs +++ b/datafusion/common/src/format.rs @@ -205,3 +205,48 @@ impl ConfigField for ExplainFormat { Ok(()) } } + +/// Verbosity levels controlling how `EXPLAIN ANALYZE` renders metrics +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ExplainAnalyzeLevel { + /// Show a compact view containing high-level metrics + Summary, + /// Show a developer-focused view with per-operator details + Dev, + // When adding new enum, update the error message in `from_str()` accordingly. +} + +impl FromStr for ExplainAnalyzeLevel { + type Err = DataFusionError; + + fn from_str(level: &str) -> Result { + match level.to_lowercase().as_str() { + "summary" => Ok(ExplainAnalyzeLevel::Summary), + "dev" => Ok(ExplainAnalyzeLevel::Dev), + other => Err(DataFusionError::Configuration(format!( + "Invalid explain analyze level. Expected 'summary' or 'dev'. Got '{other}'" + ))), + } + } +} + +impl Display for ExplainAnalyzeLevel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + ExplainAnalyzeLevel::Summary => "summary", + ExplainAnalyzeLevel::Dev => "dev", + }; + write!(f, "{s}") + } +} + +impl ConfigField for ExplainAnalyzeLevel { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = ExplainAnalyzeLevel::from_str(value)?; + Ok(()) + } +} diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index d0774e57174ee..10a475c1cc9a6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -64,7 +64,9 @@ mod tests { use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_plan::analyze::AnalyzeExec; use datafusion_physical_plan::collect; - use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; + use datafusion_physical_plan::metrics::{ + ExecutionPlanMetricsSet, MetricType, MetricsSet, + }; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use chrono::{TimeZone, Utc}; @@ -238,6 +240,7 @@ mod tests { let analyze_exec = Arc::new(AnalyzeExec::new( false, false, + vec![MetricType::SUMMARY, MetricType::DEV], // use a new ParquetSource to avoid sharing execution metrics self.build_parquet_exec( Arc::clone(table_schema), diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c28e56790e660..0fa17deea1295 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -62,6 +62,7 @@ use arrow::compute::SortOptions; use arrow::datatypes::Schema; use datafusion_catalog::ScanArgs; use datafusion_common::display::ToStringifiedPlan; +use datafusion_common::format::ExplainAnalyzeLevel; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::TableReference; use datafusion_common::{ @@ -90,6 +91,7 @@ use datafusion_physical_expr::{ use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::execution_plan::InvariantLevel; +use datafusion_physical_plan::metrics::MetricType; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::recursive_query::RecursiveQueryExec; use datafusion_physical_plan::unnest::ListUnnest; @@ -2073,9 +2075,15 @@ impl DefaultPhysicalPlanner { let input = self.create_physical_plan(&a.input, session_state).await?; let schema = Arc::clone(a.schema.inner()); let show_statistics = session_state.config_options().explain.show_statistics; + let analyze_level = session_state.config_options().explain.analyze_level; + let metric_types = match analyze_level { + ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY], + ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV], + }; Ok(Arc::new(AnalyzeExec::new( a.verbose, show_statistics, + metric_types, input, schema, ))) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index e082cabaadaff..54a57ed901162 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -22,6 +22,7 @@ use rstest::rstest; use datafusion::config::ConfigOptions; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::metrics::Timestamp; +use datafusion_common::format::ExplainAnalyzeLevel; use object_store::path::Path; #[tokio::test] @@ -158,6 +159,40 @@ async fn explain_analyze_baseline_metrics() { fn nanos_from_timestamp(ts: &Timestamp) -> i64 { ts.value().unwrap().timestamp_nanos_opt().unwrap() } + +// Test different detail level for config `datafusion.explain.analyze_level` +#[tokio::test] +async fn explain_analyze_level() { + async fn collect_plan(level: ExplainAnalyzeLevel) -> String { + let mut config = SessionConfig::new(); + config.options_mut().explain.analyze_level = level; + let ctx = SessionContext::new_with_config(config); + let sql = "EXPLAIN ANALYZE \ + SELECT * \ + FROM generate_series(10) as t1(v1) \ + ORDER BY v1 DESC"; + let dataframe = ctx.sql(sql).await.unwrap(); + let batches = dataframe.collect().await.unwrap(); + arrow::util::pretty::pretty_format_batches(&batches) + .unwrap() + .to_string() + } + + for (level, needle, should_contain) in [ + (ExplainAnalyzeLevel::Summary, "spill_count", false), + (ExplainAnalyzeLevel::Summary, "output_rows", true), + (ExplainAnalyzeLevel::Dev, "spill_count", true), + (ExplainAnalyzeLevel::Dev, "output_rows", true), + ] { + let plan = collect_plan(level).await; + assert_eq!( + plan.contains(needle), + should_contain, + "plan for level {level:?} unexpected content: {plan}" + ); + } +} + #[tokio::test] async fn csv_explain_plans() { // This test verify the look of each plan in its full cycle plan creation diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index c095afe5e716e..c696cf5aa5e60 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -26,6 +26,7 @@ use super::{ SendableRecordBatchStream, }; use crate::display::DisplayableExecutionPlan; +use crate::metrics::MetricType; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; @@ -44,6 +45,8 @@ pub struct AnalyzeExec { verbose: bool, /// If statistics should be displayed show_statistics: bool, + /// Which metric categories should be displayed + metric_types: Vec, /// The input plan (the plan being analyzed) pub(crate) input: Arc, /// The output schema for RecordBatches of this exec node @@ -56,6 +59,7 @@ impl AnalyzeExec { pub fn new( verbose: bool, show_statistics: bool, + metric_types: Vec, input: Arc, schema: SchemaRef, ) -> Self { @@ -63,6 +67,7 @@ impl AnalyzeExec { AnalyzeExec { verbose, show_statistics, + metric_types, input, schema, cache, @@ -145,6 +150,7 @@ impl ExecutionPlan for AnalyzeExec { Ok(Arc::new(Self::new( self.verbose, self.show_statistics, + self.metric_types.clone(), children.pop().unwrap(), Arc::clone(&self.schema), ))) @@ -182,6 +188,7 @@ impl ExecutionPlan for AnalyzeExec { let captured_schema = Arc::clone(&self.schema); let verbose = self.verbose; let show_statistics = self.show_statistics; + let metric_types = self.metric_types.clone(); // future that gathers the results from all the tasks in the // JoinSet that computes the overall row count and final @@ -201,6 +208,7 @@ impl ExecutionPlan for AnalyzeExec { duration, captured_input, captured_schema, + &metric_types, ) }; @@ -219,6 +227,7 @@ fn create_output_batch( duration: std::time::Duration, input: Arc, schema: SchemaRef, + metric_types: &[MetricType], ) -> Result { let mut type_builder = StringBuilder::with_capacity(1, 1024); let mut plan_builder = StringBuilder::with_capacity(1, 1024); @@ -227,6 +236,7 @@ fn create_output_batch( type_builder.append_value("Plan with Metrics"); let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref()) + .set_metric_types(metric_types.to_vec()) .set_show_statistics(show_statistics) .indent(verbose) .to_string(); @@ -238,6 +248,7 @@ fn create_output_batch( type_builder.append_value("Plan with Full Metrics"); let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref()) + .set_metric_types(metric_types.to_vec()) .set_show_statistics(show_statistics) .indent(verbose) .to_string(); @@ -282,7 +293,13 @@ mod tests { let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); - let analyze_exec = Arc::new(AnalyzeExec::new(true, false, blocking_exec, schema)); + let analyze_exec = Arc::new(AnalyzeExec::new( + true, + false, + vec![MetricType::SUMMARY, MetricType::DEV], + blocking_exec, + schema, + )); let fut = collect(analyze_exec, task_ctx); let mut fut = fut.boxed(); diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 2420edfc743da..35ca0b65ae294 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -28,6 +28,7 @@ use datafusion_common::display::{GraphvizBuilder, PlanType, StringifiedPlan}; use datafusion_expr::display_schema; use datafusion_physical_expr::LexOrdering; +use crate::metrics::MetricType; use crate::render_tree::RenderTree; use super::{accept, ExecutionPlan, ExecutionPlanVisitor}; @@ -120,11 +121,17 @@ pub struct DisplayableExecutionPlan<'a> { show_statistics: bool, /// If schema should be displayed. See [`Self::set_show_schema`] show_schema: bool, + /// Which metric categories should be included when rendering + metric_types: Vec, // (TreeRender) Maximum total width of the rendered tree tree_maximum_render_width: usize, } impl<'a> DisplayableExecutionPlan<'a> { + fn default_metric_types() -> Vec { + vec![MetricType::SUMMARY, MetricType::DEV] + } + /// Create a wrapper around an [`ExecutionPlan`] which can be /// pretty printed in a variety of ways pub fn new(inner: &'a dyn ExecutionPlan) -> Self { @@ -133,6 +140,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: ShowMetrics::None, show_statistics: false, show_schema: false, + metric_types: Self::default_metric_types(), tree_maximum_render_width: 240, } } @@ -146,6 +154,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: ShowMetrics::Aggregated, show_statistics: false, show_schema: false, + metric_types: Self::default_metric_types(), tree_maximum_render_width: 240, } } @@ -159,6 +168,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: ShowMetrics::Full, show_statistics: false, show_schema: false, + metric_types: Self::default_metric_types(), tree_maximum_render_width: 240, } } @@ -178,6 +188,12 @@ impl<'a> DisplayableExecutionPlan<'a> { self } + /// Specify which metric types should be rendered alongside the plan + pub fn set_metric_types(mut self, metric_types: Vec) -> Self { + self.metric_types = metric_types; + self + } + /// Set the maximum render width for the tree format pub fn set_tree_maximum_render_width(mut self, width: usize) -> Self { self.tree_maximum_render_width = width; @@ -206,6 +222,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: ShowMetrics, show_statistics: bool, show_schema: bool, + metric_types: Vec, } impl fmt::Display for Wrapper<'_> { fn fmt(&self, f: &mut Formatter) -> fmt::Result { @@ -216,6 +233,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: self.show_metrics, show_statistics: self.show_statistics, show_schema: self.show_schema, + metric_types: &self.metric_types, }; accept(self.plan, &mut visitor) } @@ -226,6 +244,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: self.show_metrics, show_statistics: self.show_statistics, show_schema: self.show_schema, + metric_types: self.metric_types.clone(), } } @@ -245,6 +264,7 @@ impl<'a> DisplayableExecutionPlan<'a> { plan: &'a dyn ExecutionPlan, show_metrics: ShowMetrics, show_statistics: bool, + metric_types: Vec, } impl fmt::Display for Wrapper<'_> { fn fmt(&self, f: &mut Formatter) -> fmt::Result { @@ -255,6 +275,7 @@ impl<'a> DisplayableExecutionPlan<'a> { t, show_metrics: self.show_metrics, show_statistics: self.show_statistics, + metric_types: &self.metric_types, graphviz_builder: GraphvizBuilder::default(), parents: Vec::new(), }; @@ -272,6 +293,7 @@ impl<'a> DisplayableExecutionPlan<'a> { plan: self.inner, show_metrics: self.show_metrics, show_statistics: self.show_statistics, + metric_types: self.metric_types.clone(), } } @@ -306,6 +328,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: ShowMetrics, show_statistics: bool, show_schema: bool, + metric_types: Vec, } impl fmt::Display for Wrapper<'_> { @@ -317,6 +340,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: self.show_metrics, show_statistics: self.show_statistics, show_schema: self.show_schema, + metric_types: &self.metric_types, }; visitor.pre_visit(self.plan)?; Ok(()) @@ -328,6 +352,7 @@ impl<'a> DisplayableExecutionPlan<'a> { show_metrics: self.show_metrics, show_statistics: self.show_statistics, show_schema: self.show_schema, + metric_types: self.metric_types.clone(), } } @@ -382,6 +407,8 @@ struct IndentVisitor<'a, 'b> { show_statistics: bool, /// If schema should be displayed show_schema: bool, + /// Which metric types should be rendered + metric_types: &'a [MetricType], } impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { @@ -394,6 +421,7 @@ impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { ShowMetrics::Aggregated => { if let Some(metrics) = plan.metrics() { let metrics = metrics + .filter_by_metric_types(self.metric_types) .aggregate_by_name() .sorted_for_display() .timestamps_removed(); @@ -405,6 +433,7 @@ impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { } ShowMetrics::Full => { if let Some(metrics) = plan.metrics() { + let metrics = metrics.filter_by_metric_types(self.metric_types); write!(self.f, ", metrics=[{metrics}]")?; } else { write!(self.f, ", metrics=[]")?; @@ -441,6 +470,8 @@ struct GraphvizVisitor<'a, 'b> { show_metrics: ShowMetrics, /// If statistics should be displayed show_statistics: bool, + /// Which metric types should be rendered + metric_types: &'a [MetricType], graphviz_builder: GraphvizBuilder, /// Used to record parent node ids when visiting a plan. @@ -478,6 +509,7 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { ShowMetrics::Aggregated => { if let Some(metrics) = plan.metrics() { let metrics = metrics + .filter_by_metric_types(self.metric_types) .aggregate_by_name() .sorted_for_display() .timestamps_removed(); @@ -489,6 +521,7 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { } ShowMetrics::Full => { if let Some(metrics) = plan.metrics() { + let metrics = metrics.filter_by_metric_types(self.metric_types); format!("metrics=[{metrics}]") } else { "metrics=[]".to_string() diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs index 15efb8f90aa20..45cef58b5dd8c 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/baseline.rs @@ -62,9 +62,15 @@ impl BaselineMetrics { start_time.record(); Self { - end_time: MetricBuilder::new(metrics).end_timestamp(partition), - elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition), - output_rows: MetricBuilder::new(metrics).output_rows(partition), + end_time: MetricBuilder::new(metrics) + .with_type(super::MetricType::SUMMARY) + .end_timestamp(partition), + elapsed_compute: MetricBuilder::new(metrics) + .with_type(super::MetricType::SUMMARY) + .elapsed_compute(partition), + output_rows: MetricBuilder::new(metrics) + .with_type(super::MetricType::SUMMARY) + .output_rows(partition), } } diff --git a/datafusion/physical-plan/src/metrics/builder.rs b/datafusion/physical-plan/src/metrics/builder.rs index dbda0a310ce52..74ba5a2a18343 100644 --- a/datafusion/physical-plan/src/metrics/builder.rs +++ b/datafusion/physical-plan/src/metrics/builder.rs @@ -19,6 +19,8 @@ use std::{borrow::Cow, sync::Arc}; +use crate::metrics::MetricType; + use super::{ Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp, }; @@ -52,15 +54,23 @@ pub struct MetricBuilder<'a> { /// arbitrary name=value pairs identifying this metric labels: Vec