Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow_ipc::CompressionType;
#[cfg(feature = "parquet_encryption")]
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
use crate::error::_config_err;
use crate::format::ExplainFormat;
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
Expand Down Expand Up @@ -893,6 +893,11 @@ config_namespace! {
/// (format=tree only) Maximum total width of the rendered tree.
/// When set to 0, the tree will have no width limit.
pub tree_maximum_render_width: usize, default = 240

/// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
/// "summary" shows common metrics for high-level insights.
/// "dev" provides deep operator-level introspection for developers.
pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
}
}

Expand Down
45 changes: 45 additions & 0 deletions datafusion/common/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,48 @@ impl ConfigField for ExplainFormat {
Ok(())
}
}

/// Verbosity levels controlling how `EXPLAIN ANALYZE` renders metrics
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ExplainAnalyzeLevel {
/// Show a compact view containing high-level metrics
Summary,
/// Show a developer-focused view with per-operator details
Dev,
// When adding new enum, update the error message in `from_str()` accordingly.
}

impl FromStr for ExplainAnalyzeLevel {
type Err = DataFusionError;

fn from_str(level: &str) -> Result<Self, Self::Err> {
match level.to_lowercase().as_str() {
"summary" => Ok(ExplainAnalyzeLevel::Summary),
"dev" => Ok(ExplainAnalyzeLevel::Dev),
other => Err(DataFusionError::Configuration(format!(
"Invalid explain analyze level. Expected 'summary' or 'dev'. Got '{other}'"
))),
}
}
}

impl Display for ExplainAnalyzeLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
ExplainAnalyzeLevel::Summary => "summary",
ExplainAnalyzeLevel::Dev => "dev",
};
write!(f, "{s}")
}
}

impl ConfigField for ExplainAnalyzeLevel {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = ExplainAnalyzeLevel::from_str(value)?;
Ok(())
}
}
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ mod tests {
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_plan::analyze::AnalyzeExec;
use datafusion_physical_plan::collect;
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricType, MetricsSet,
};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -238,6 +240,7 @@ mod tests {
let analyze_exec = Arc::new(AnalyzeExec::new(
false,
false,
vec![MetricType::SUMMARY, MetricType::DEV],
// use a new ParquetSource to avoid sharing execution metrics
self.build_parquet_exec(
Arc::clone(table_schema),
Expand Down
8 changes: 8 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use arrow::compute::SortOptions;
use arrow::datatypes::Schema;
use datafusion_catalog::ScanArgs;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::format::ExplainAnalyzeLevel;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::TableReference;
use datafusion_common::{
Expand Down Expand Up @@ -90,6 +91,7 @@ use datafusion_physical_expr::{
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::metrics::MetricType;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
use datafusion_physical_plan::unnest::ListUnnest;
Expand Down Expand Up @@ -2073,9 +2075,15 @@ impl DefaultPhysicalPlanner {
let input = self.create_physical_plan(&a.input, session_state).await?;
let schema = Arc::clone(a.schema.inner());
let show_statistics = session_state.config_options().explain.show_statistics;
let analyze_level = session_state.config_options().explain.analyze_level;
let metric_types = match analyze_level {
ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY],
ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV],
};
Ok(Arc::new(AnalyzeExec::new(
a.verbose,
show_statistics,
metric_types,
input,
schema,
)))
Expand Down
35 changes: 35 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use rstest::rstest;
use datafusion::config::ConfigOptions;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::metrics::Timestamp;
use datafusion_common::format::ExplainAnalyzeLevel;
use object_store::path::Path;

#[tokio::test]
Expand Down Expand Up @@ -158,6 +159,40 @@ async fn explain_analyze_baseline_metrics() {
fn nanos_from_timestamp(ts: &Timestamp) -> i64 {
ts.value().unwrap().timestamp_nanos_opt().unwrap()
}

// Test different detail level for config `datafusion.explain.analyze_level`
#[tokio::test]
async fn explain_analyze_level() {
async fn collect_plan(level: ExplainAnalyzeLevel) -> String {
let mut config = SessionConfig::new();
config.options_mut().explain.analyze_level = level;
let ctx = SessionContext::new_with_config(config);
let sql = "EXPLAIN ANALYZE \
SELECT * \
FROM generate_series(10) as t1(v1) \
ORDER BY v1 DESC";
let dataframe = ctx.sql(sql).await.unwrap();
let batches = dataframe.collect().await.unwrap();
arrow::util::pretty::pretty_format_batches(&batches)
.unwrap()
.to_string()
}

for (level, needle, should_contain) in [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

(ExplainAnalyzeLevel::Summary, "spill_count", false),
(ExplainAnalyzeLevel::Summary, "output_rows", true),
(ExplainAnalyzeLevel::Dev, "spill_count", true),
(ExplainAnalyzeLevel::Dev, "output_rows", true),
] {
let plan = collect_plan(level).await;
assert_eq!(
plan.contains(needle),
should_contain,
"plan for level {level:?} unexpected content: {plan}"
);
}
}

#[tokio::test]
async fn csv_explain_plans() {
// This test verify the look of each plan in its full cycle plan creation
Expand Down
19 changes: 18 additions & 1 deletion datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::{
SendableRecordBatchStream,
};
use crate::display::DisplayableExecutionPlan;
use crate::metrics::MetricType;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
Expand All @@ -44,6 +45,8 @@ pub struct AnalyzeExec {
verbose: bool,
/// If statistics should be displayed
show_statistics: bool,
/// Which metric categories should be displayed
metric_types: Vec<MetricType>,
/// The input plan (the plan being analyzed)
pub(crate) input: Arc<dyn ExecutionPlan>,
/// The output schema for RecordBatches of this exec node
Expand All @@ -56,13 +59,15 @@ impl AnalyzeExec {
pub fn new(
verbose: bool,
show_statistics: bool,
metric_types: Vec<MetricType>,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> Self {
let cache = Self::compute_properties(&input, Arc::clone(&schema));
AnalyzeExec {
verbose,
show_statistics,
metric_types,
input,
schema,
cache,
Expand Down Expand Up @@ -145,6 +150,7 @@ impl ExecutionPlan for AnalyzeExec {
Ok(Arc::new(Self::new(
self.verbose,
self.show_statistics,
self.metric_types.clone(),
children.pop().unwrap(),
Arc::clone(&self.schema),
)))
Expand Down Expand Up @@ -182,6 +188,7 @@ impl ExecutionPlan for AnalyzeExec {
let captured_schema = Arc::clone(&self.schema);
let verbose = self.verbose;
let show_statistics = self.show_statistics;
let metric_types = self.metric_types.clone();

// future that gathers the results from all the tasks in the
// JoinSet that computes the overall row count and final
Expand All @@ -201,6 +208,7 @@ impl ExecutionPlan for AnalyzeExec {
duration,
captured_input,
captured_schema,
&metric_types,
)
};

Expand All @@ -219,6 +227,7 @@ fn create_output_batch(
duration: std::time::Duration,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
metric_types: &[MetricType],
) -> Result<RecordBatch> {
let mut type_builder = StringBuilder::with_capacity(1, 1024);
let mut plan_builder = StringBuilder::with_capacity(1, 1024);
Expand All @@ -227,6 +236,7 @@ fn create_output_batch(
type_builder.append_value("Plan with Metrics");

let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
.set_metric_types(metric_types.to_vec())
.set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
Expand All @@ -238,6 +248,7 @@ fn create_output_batch(
type_builder.append_value("Plan with Full Metrics");

let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
.set_metric_types(metric_types.to_vec())
.set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
Expand Down Expand Up @@ -282,7 +293,13 @@ mod tests {

let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
let refs = blocking_exec.refs();
let analyze_exec = Arc::new(AnalyzeExec::new(true, false, blocking_exec, schema));
let analyze_exec = Arc::new(AnalyzeExec::new(
true,
false,
vec![MetricType::SUMMARY, MetricType::DEV],
blocking_exec,
schema,
));

let fut = collect(analyze_exec, task_ctx);
let mut fut = fut.boxed();
Expand Down
Loading