diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index daeb21db9d05..735a381586ad 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -678,6 +678,12 @@ doc_comment::doctest!( library_user_guide_sql_api ); +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/library-user-guide/building-logical-plans.md", + library_user_guide_logical_plans +); + #[cfg(doctest)] doc_comment::doctest!( "../../../docs/source/library-user-guide/using-the-dataframe-api.md", diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index b58208591920..5b5a842fa4cf 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -26,7 +26,7 @@ pub mod tree_node; pub use builder::{ build_join_schema, table_scan, union, wrap_projection_for_join_if_necessary, - LogicalPlanBuilder, UNNAMED_TABLE, + LogicalPlanBuilder, LogicalTableSource, UNNAMED_TABLE, }; pub use ddl::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, diff --git a/docs/source/library-user-guide/building-logical-plans.md b/docs/source/library-user-guide/building-logical-plans.md index fe922d8eaeb1..556deb02e980 100644 --- a/docs/source/library-user-guide/building-logical-plans.md +++ b/docs/source/library-user-guide/building-logical-plans.md @@ -31,44 +31,52 @@ explained in more detail in the [Query Planning and Execution Overview] section DataFusion's [LogicalPlan] is an enum containing variants representing all the supported operators, and also contains an `Extension` variant that allows projects building on DataFusion to add custom logical operators. -It is possible to create logical plans by directly creating instances of the [LogicalPlan] enum as follows, but is is +It is possible to create logical plans by directly creating instances of the [LogicalPlan] enum as shown, but it is much easier to use the [LogicalPlanBuilder], which is described in the next section. Here is an example of building a logical plan directly: - - ```rust -// create a logical table source -let schema = Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("name", DataType::Utf8, true), -]); -let table_source = LogicalTableSource::new(SchemaRef::new(schema)); - -// create a TableScan plan -let projection = None; // optional projection -let filters = vec![]; // optional filters to push down -let fetch = None; // optional LIMIT -let table_scan = LogicalPlan::TableScan(TableScan::try_new( - "person", - Arc::new(table_source), - projection, - filters, - fetch, -)?); - -// create a Filter plan that evaluates `id > 500` that wraps the TableScan -let filter_expr = col("id").gt(lit(500)); -let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan))?); - -// print the plan -println!("{}", plan.display_indent_schema()); +use datafusion::common::DataFusionError; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::logical_expr::{Filter, LogicalPlan, TableScan, LogicalTableSource}; +use datafusion::prelude::*; +use std::sync::Arc; + +fn main() -> Result<(), DataFusionError> { + // create a logical table source + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ]); + let table_source = LogicalTableSource::new(SchemaRef::new(schema)); + + // create a TableScan plan + let projection = None; // optional projection + let filters = vec![]; // optional filters to push down + let fetch = None; // optional LIMIT + let table_scan = LogicalPlan::TableScan(TableScan::try_new( + "person", + Arc::new(table_source), + projection, + filters, + fetch, + )? + ); + + // create a Filter plan that evaluates `id > 500` that wraps the TableScan + let filter_expr = col("id").gt(lit(500)); + let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan)) ? ); + + // print the plan + println!("{}", plan.display_indent_schema()); + Ok(()) +} ``` This example produces the following plan: -``` +```text Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] TableScan: person [id:Int32;N, name:Utf8;N] ``` @@ -78,7 +86,7 @@ Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] DataFusion logical plans can be created using the [LogicalPlanBuilder] struct. There is also a [DataFrame] API which is a higher-level API that delegates to [LogicalPlanBuilder]. -The following associated functions can be used to create a new builder: +There are several functions that can can be used to create a new builder, such as - `empty` - create an empty plan with no fields - `values` - create a plan from a set of literal values @@ -102,41 +110,107 @@ The following example demonstrates building the same simple query plan as the pr ```rust -// create a logical table source -let schema = Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("name", DataType::Utf8, true), -]); -let table_source = LogicalTableSource::new(SchemaRef::new(schema)); - -// optional projection -let projection = None; - -// create a LogicalPlanBuilder for a table scan -let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?; - -// perform a filter operation and build the plan -let plan = builder - .filter(col("id").gt(lit(500)))? // WHERE id > 500 - .build()?; - -// print the plan -println!("{}", plan.display_indent_schema()); +use datafusion::common::DataFusionError; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource}; +use datafusion::prelude::*; +use std::sync::Arc; + +fn main() -> Result<(), DataFusionError> { + // create a logical table source + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ]); + let table_source = LogicalTableSource::new(SchemaRef::new(schema)); + + // optional projection + let projection = None; + + // create a LogicalPlanBuilder for a table scan + let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?; + + // perform a filter operation and build the plan + let plan = builder + .filter(col("id").gt(lit(500)))? // WHERE id > 500 + .build()?; + + // print the plan + println!("{}", plan.display_indent_schema()); + Ok(()) +} ``` This example produces the following plan: -``` +```text Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] TableScan: person [id:Int32;N, name:Utf8;N] ``` +## Translating Logical Plan to Physical Plan + +Logical plans can not be directly executed. They must be "compiled" into an +[`ExecutionPlan`], which is often referred to as a "physical plan". + +Compared to `LogicalPlan`s `ExecutionPlans` have many more details such as +specific algorithms and detailed optimizations compared to. Given a +`LogicalPlan` the easiest way to create an `ExecutionPlan` is using +[`SessionState::create_physical_plan`] as shown below + +```rust +use datafusion::datasource::{provider_as_source, MemTable}; +use datafusion::common::DataFusionError; +use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource}; +use datafusion::prelude::*; +use std::sync::Arc; + +// Creating physical plans may access remote catalogs and data sources +// thus it must be run with an async runtime. +#[tokio::main] +async fn main() -> Result<(), DataFusionError> { + + // create a default table source + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ]); + // To create an ExecutionPlan we must provide an actual + // TableProvider. For this example, we don't provide any data + // but in production code, this would have `RecordBatch`es with + // in memory data + let table_provider = Arc::new(MemTable::try_new(Arc::new(schema), vec![])?); + // Use the provider_as_source function to convert the TableProvider to a table source + let table_source = provider_as_source(table_provider); + + // create a LogicalPlanBuilder for a table scan without projection or filters + let logical_plan = LogicalPlanBuilder::scan("person", table_source, None)?.build()?; + + // Now create the physical plan by calling `create_physical_plan` + let ctx = SessionContext::new(); + let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?; + + // print the plan + println!("{}", DisplayableExecutionPlan::new(physical_plan.as_ref()).indent(true)); + Ok(()) +} +``` + +This example produces the following physical plan: + +```text +MemoryExec: partitions=0, partition_sizes=[] +``` + ## Table Sources -The previous example used a [LogicalTableSource], which is used for tests and documentation in DataFusion, and is also -suitable if you are using DataFusion to build logical plans but do not use DataFusion's physical planner. However, if you -want to use a [TableSource] that can be executed in DataFusion then you will need to use [DefaultTableSource], which is a -wrapper for a [TableProvider]. +The previous examples use a [LogicalTableSource], which is used for tests and documentation in DataFusion, and is also +suitable if you are using DataFusion to build logical plans but do not use DataFusion's physical planner. + +However, it is more common to use a [TableProvider]. To get a [TableSource] from a +[TableProvider], use [provider_as_source] or [DefaultTableSource]. [query planning and execution overview]: https://docs.rs/datafusion/latest/datafusion/index.html#query-planning-and-execution-overview [architecture guide]: https://docs.rs/datafusion/latest/datafusion/index.html#architecture @@ -145,5 +219,8 @@ wrapper for a [TableProvider]. [dataframe]: using-the-dataframe-api.md [logicaltablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalTableSource.html [defaulttablesource]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html +[provider_as_source]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/fn.provider_as_source.html [tableprovider]: https://docs.rs/datafusion/latest/datafusion/datasource/provider/trait.TableProvider.html [tablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.TableSource.html +[`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html +[`sessionstate::create_physical_plan`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.create_physical_plan