Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding node_id to ExecutionPlanProperties #12186

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion datafusion-examples/examples/planner_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::error::Result;
use datafusion::physical_plan::displayable;
use datafusion::physical_planner::DefaultPhysicalPlanner;
use datafusion::prelude::*;
use datafusion::{error::Result, physical_plan::ExecutionPlan};
use datafusion_expr::{LogicalPlan, PlanType};

/// This example demonstrates the process of converting logical plan
Expand Down Expand Up @@ -82,9 +82,35 @@ async fn to_physical_plan_in_one_api_demo(
.plan
);

let traversal = extract_node_ids_from_execution_plan_tree(physical_plan.as_ref());
let expected_traversal = vec![
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
Some(7),
Some(8),
Some(9),
];
assert_eq!(expected_traversal, traversal);
Ok(())
}

fn extract_node_ids_from_execution_plan_tree(
physical_plan: &dyn ExecutionPlan,
) -> Vec<Option<usize>> {
let mut traversed_nodes: Vec<Option<usize>> = vec![];
for child in physical_plan.children() {
let node_ids = extract_node_ids_from_execution_plan_tree(child.as_ref());
traversed_nodes.extend(node_ids);
}
traversed_nodes.push(physical_plan.properties().node_id());
traversed_nodes
}

/// Converts a logical plan into a physical plan by utilizing the analyzer,
/// optimizer, and query planner APIs separately. This flavor gives more
/// control over the planning process.
Expand Down
16 changes: 16 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,22 @@ impl ExecutionPlan for ArrowExec {
cache: self.cache.clone(),
}))
}

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let new_cache = self.cache.clone().with_node_id(_node_id);

Ok(Some(Arc::new(Self {
base_config: self.base_config.clone(),
projected_statistics: self.projected_statistics.clone(),
projected_schema: self.projected_schema.clone(),
projected_output_ordering: self.projected_output_ordering.clone(),
metrics: self.metrics.clone(),
cache: new_cache,
})))
}
}

pub struct ArrowOpener {
Expand Down
16 changes: 16 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,22 @@ impl ExecutionPlan for AvroExec {
cache: self.cache.clone(),
}))
}

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let new_cache = self.cache.clone().with_node_id(_node_id);

Ok(Some(Arc::new(Self {
base_config: self.base_config.clone(),
projected_statistics: self.projected_statistics.clone(),
projected_schema: self.projected_schema.clone(),
projected_output_ordering: self.projected_output_ordering.clone(),
metrics: self.metrics.clone(),
cache: new_cache,
})))
}
}

#[cfg(feature = "avro")]
Expand Down
21 changes: 21 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,27 @@ impl ExecutionPlan for CsvExec {
cache: self.cache.clone(),
}))
}

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let new_cache = self.cache.clone().with_node_id(_node_id);

Ok(Some(Arc::new(Self {
base_config: self.base_config.clone(),
projected_statistics: self.projected_statistics.clone(),
has_header: self.has_header,
delimiter: self.delimiter,
quote: self.quote,
escape: self.escape,
comment: self.comment,
newlines_in_values: self.newlines_in_values,
metrics: self.metrics.clone(),
file_compression_type: self.file_compression_type,
cache: new_cache,
})))
}
}

/// A Config for [`CsvOpener`]
Expand Down
15 changes: 15 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,21 @@ impl ExecutionPlan for NdJsonExec {
cache: self.cache.clone(),
}))
}

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let new_cache = self.cache.clone().with_node_id(_node_id);

Ok(Some(Arc::new(Self {
base_config: self.base_config.clone(),
projected_statistics: self.projected_statistics.clone(),
metrics: self.metrics.clone(),
file_compression_type: self.file_compression_type,
cache: new_cache,
})))
}
}

/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
Expand Down
23 changes: 23 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,29 @@ impl ExecutionPlan for ParquetExec {
schema_adapter_factory: self.schema_adapter_factory.clone(),
}))
}

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let new_cache = self.cache.clone().with_node_id(_node_id);

let new_plan = Self {
base_config: self.base_config.clone(),
projected_statistics: self.projected_statistics.clone(),
metrics: self.metrics.clone(),
predicate: self.predicate.clone(),
pruning_predicate: self.pruning_predicate.clone(),
page_pruning_predicate: self.page_pruning_predicate.clone(),
metadata_size_hint: self.metadata_size_hint,
parquet_file_reader_factory: self.parquet_file_reader_factory.clone(),
cache: new_cache,
table_parquet_options: self.table_parquet_options.clone(),
schema_adapter_factory: self.schema_adapter_factory.clone(),
};

Ok(Some(Arc::new(new_plan)))
}
}

fn should_enable_page_index(
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ use datafusion_optimizer::{
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::node_id::{
annotate_node_id_for_execution_plan, NodeIdAnnotator,
};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_sql::parser::{DFParser, Statement};
use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel};
Expand Down Expand Up @@ -732,9 +735,12 @@ impl SessionState {
logical_plan: &LogicalPlan,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let logical_plan = self.optimize(logical_plan)?;
self.query_planner
let physical_plan = self
.query_planner
.create_physical_plan(&logical_plan, self)
.await
.await?;
let mut id_annotator = NodeIdAnnotator::new();
annotate_node_id_for_execution_plan(&physical_plan, &mut id_annotator)
}

/// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
Expand Down
18 changes: 18 additions & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,24 @@ impl ExecutionPlan for AggregateExec {
}
}
}

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut new_plan = AggregateExec::try_new_with_schema(
self.mode,
self.group_by.clone(),
self.aggr_expr.clone(),
self.filter_expr.clone(),
self.input().clone(),
Arc::clone(&self.input_schema),
Arc::clone(&self.schema),
)?;
let new_props: PlanProperties = new_plan.cache.clone().with_node_id(_node_id);
new_plan.cache = new_props;
Ok(Some(Arc::new(new_plan)))
}
}

fn create_schema(
Expand Down
15 changes: 15 additions & 0 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,21 @@ impl ExecutionPlan for AnalyzeExec {
futures::stream::once(output),
)))
}

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut new_plan = AnalyzeExec::new(
self.verbose,
self.show_statistics,
self.input.clone(),
self.schema.clone(),
);
let new_props = new_plan.cache.clone().with_node_id(_node_id);
new_plan.cache = new_props;
Ok(Some(Arc::new(new_plan)))
}
}

/// Creates the output of AnalyzeExec as a RecordBatch
Expand Down
11 changes: 11 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ impl ExecutionPlan for CoalesceBatchesExec {
fn fetch(&self) -> Option<usize> {
self.fetch
}

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut new_plan =
CoalesceBatchesExec::new(self.input.clone(), self.target_batch_size);
let new_props = new_plan.cache.clone().with_node_id(_node_id);
new_plan.cache = new_props;
Ok(Some(Arc::new(new_plan)))
}
}

/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details.
Expand Down
10 changes: 10 additions & 0 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,16 @@ impl ExecutionPlan for CoalescePartitionsExec {
fn supports_limit_pushdown(&self) -> bool {
true
}

fn with_node_id(
self: Arc<Self>,
_node_id: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut new_plan = CoalescePartitionsExec::new(self.input.clone());
let new_props = new_plan.cache.clone().with_node_id(_node_id);
new_plan.cache = new_props;
Ok(Some(Arc::new(new_plan)))
}
}

#[cfg(test)]
Expand Down
15 changes: 14 additions & 1 deletion datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
write!(self.f, "{:indent$}", "", indent = self.indent * 2)?;
plan.fmt_as(self.t, self.f)?;
let node_id = plan
.properties()
.node_id()
.map_or("None".to_string(), |id| format!(", node_id={}", id));
write!(self.f, "{node_id}")?;
match self.show_metrics {
ShowMetrics::None => {}
ShowMetrics::Aggregated => {
Expand Down Expand Up @@ -392,11 +397,19 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> {
""
};

let node_id = plan
.properties()
.node_id()
.map_or("node_id=None".to_string(), |id| format!("node_id={}", id));

self.graphviz_builder.add_node(
self.f,
id,
&label,
Some(&format!("{}{}{}", metrics, delimiter, statistics)),
Some(&format!(
"{}{}{}{}",
metrics, delimiter, statistics, node_id
)),
)?;

if let Some(parent_node_id) = self.parents.last() {
Expand Down
Loading
Loading