Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3376,6 +3376,16 @@ mod tests {
"query not supported".to_string(),
))
}

fn create_physical_expr(
&self,
_expr: &Expr,
_input_dfschema: &crate::logical_plan::DFSchema,
_input_schema: &Schema,
_ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn crate::physical_plan::PhysicalExpr>> {
unimplemented!()
}
}

struct MyQueryPlanner {}
Expand Down
14 changes: 2 additions & 12 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::LogicalPlan;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -122,16 +120,8 @@ impl SQLMetric {
}
}

/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
pub trait PhysicalPlanner {
/// Create a physical plan from a logical plan
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>>;
}
/// Physical planner interface
pub use self::planner::PhysicalPlanner;

/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
///
Expand Down
85 changes: 75 additions & 10 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ use crate::physical_plan::sort::SortExec;
use crate::physical_plan::udf;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{hash_utils, Partitioning};
use crate::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner, WindowExpr,
};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
Expand Down Expand Up @@ -172,16 +170,51 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
}
}

/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
pub trait PhysicalPlanner {
/// Create a physical plan from a logical plan
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>>;

/// Create a physical expression from a logical expression
/// suitable for evaluation
///
/// `expr`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `e`
///
/// `input_schema`: the physical schema for evaluating `e`
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn PhysicalExpr>>;
}

/// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`].
pub trait ExtensionPlanner {
/// Create a physical plan for a [`UserDefinedLogicalNode`].
/// This errors when the planner knows how to plan the concrete implementation of `node`
/// but errors while doing so, and `None` when the planner does not know how to plan the `node`
/// and wants to delegate the planning to another [`ExtensionPlanner`].
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The challenge prior to this PR is if the extension node has any Exprs to convert them into PhysicalExprs requires access to the LogicalPlan

Furthermore, there was no way to call back into create_physical_expr from the ExtensionPlanner so I added that to the trait as well

/// `input_dfschema`: the logical plan schema for the inputs to this node
///
/// Returns an error when the planner knows how to plan the concrete
/// implementation of `node` but errors while doing so.
///
/// Returns `None` when the planner does not know how to plan the
/// `node` and wants to delegate the planning to another
/// [`ExtensionPlanner`].
fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
inputs: &[Arc<dyn ExecutionPlan>],
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
Expand Down Expand Up @@ -210,6 +243,30 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
self.optimize_plan(plan, ctx_state)
}

/// Create a physical expression from a logical expression
/// suitable for evaluation
///
/// `e`: the expression to convert
///
/// `input_dfschema`: the logical plan schema for evaluating `e`
///
/// `input_schema`: the physical schema for evaluating `e`
fn create_physical_expr(
&self,
expr: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn PhysicalExpr>> {
DefaultPhysicalPlanner::create_physical_expr(
self,
expr,
input_dfschema,
input_schema,
ctx_state,
)
}
}

impl DefaultPhysicalPlanner {
Expand Down Expand Up @@ -721,7 +778,7 @@ impl DefaultPhysicalPlanner {
)))
}
LogicalPlan::Extension { node } => {
let inputs = node
let physical_inputs = node
.inputs()
.into_iter()
.map(|input_plan| self.create_initial_plan(input_plan, ctx_state))
Expand All @@ -733,7 +790,13 @@ impl DefaultPhysicalPlanner {
if let Some(plan) = maybe_plan {
Ok(Some(plan))
} else {
planner.plan_extension(node.as_ref(), &inputs, ctx_state)
planner.plan_extension(
self,
node.as_ref(),
&node.inputs(),
&physical_inputs,
ctx_state,
)
}
},
)?;
Expand Down Expand Up @@ -1644,8 +1707,10 @@ mod tests {
/// Create a physical plan for an extension node
fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
_node: &dyn UserDefinedLogicalNode,
_inputs: &[Arc<dyn ExecutionPlan>],
_logical_inputs: &[&LogicalPlan],
_physical_inputs: &[Arc<dyn ExecutionPlan>],
_ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(Some(Arc::new(NoOpExecutionPlan {
Expand Down
9 changes: 6 additions & 3 deletions datafusion/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,16 +321,19 @@ impl ExtensionPlanner for TopKPlanner {
/// Create a physical plan for an extension node
fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
inputs: &[Arc<dyn ExecutionPlan>],
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(
if let Some(topk_node) = node.as_any().downcast_ref::<TopKPlanNode>() {
assert_eq!(inputs.len(), 1, "Inconsistent number of inputs");
assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs");
assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs");
// figure out input name
Some(Arc::new(TopKExec {
input: inputs[0].clone(),
input: physical_inputs[0].clone(),
k: topk_node.k,
}))
} else {
Expand Down