Skip to content

Commit

Permalink
expose some fields on session state (apache#11716)
Browse files Browse the repository at this point in the history
* expose some fields on session state

Signed-off-by: Ruihang Xia <[email protected]>

* add example

Signed-off-by: Ruihang Xia <[email protected]>

* rename file

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Jul 31, 2024
1 parent 929568d commit 6508fa2
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 0 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ cargo run --example dataframe
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
- [`planner_api.rs](examples/planner_api.rs): APIs to manipulate logical and physical plans
- [`pruning.rs`](examples/pruning.rs): Use pruning to rule out files based on statistics
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
Expand Down
127 changes: 127 additions & 0 deletions datafusion-examples/examples/planner_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::error::Result;
use datafusion::physical_plan::displayable;
use datafusion::prelude::*;
use datafusion_expr::{LogicalPlan, PlanType};

/// This example demonstrates the process of converting logical plan
/// into physical execution plans using DataFusion.
///
/// Planning phase in DataFusion contains several steps:
/// 1. Analyzing and optimizing logical plan
/// 2. Converting logical plan into physical plan
///
/// The code in this example shows two ways to convert a logical plan into
/// physical plan:
/// - Via the combined `create_physical_plan` API.
/// - Utilizing the analyzer, optimizer, and query planner APIs separately.
#[tokio::main]
async fn main() -> Result<()> {
// Set up a DataFusion context and load a Parquet file
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
let df = ctx
.read_parquet(
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;

// Construct the input logical plan using DataFrame API
let df = df
.clone()
.select(vec![
df.parse_sql_expr("int_col")?,
df.parse_sql_expr("double_col")?,
])?
.filter(df.parse_sql_expr("int_col < 5 OR double_col = 8.0")?)?
.aggregate(
vec![df.parse_sql_expr("double_col")?],
vec![df.parse_sql_expr("SUM(int_col) as sum_int_col")?],
)?
.limit(0, Some(1))?;
let logical_plan = df.logical_plan().clone();

to_physical_plan_in_one_api_demo(&logical_plan, &ctx).await?;

to_physical_plan_step_by_step_demo(logical_plan, &ctx).await?;

Ok(())
}

/// Converts a logical plan into a physical plan using the combined
/// `create_physical_plan` API. It will first optimize the logical
/// plan and then convert it into physical plan.
async fn to_physical_plan_in_one_api_demo(
input: &LogicalPlan,
ctx: &SessionContext,
) -> Result<()> {
let physical_plan = ctx.state().create_physical_plan(input).await?;

println!(
"Physical plan direct from logical plan:\n\n{}\n\n",
displayable(physical_plan.as_ref())
.to_stringified(false, PlanType::InitialPhysicalPlan)
.plan
);

Ok(())
}

/// Converts a logical plan into a physical plan by utilizing the analyzer,
/// optimizer, and query planner APIs separately. This flavor gives more
/// control over the planning process.
async fn to_physical_plan_step_by_step_demo(
input: LogicalPlan,
ctx: &SessionContext,
) -> Result<()> {
// First analyze the logical plan
let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
input,
ctx.state().config_options(),
|_, _| (),
)?;
println!("Analyzed logical plan:\n\n{:?}\n\n", analyzed_logical_plan);

// Optimize the analyzed logical plan
let optimized_logical_plan = ctx.state().optimizer().optimize(
analyzed_logical_plan,
&ctx.state(),
|_, _| (),
)?;
println!(
"Optimized logical plan:\n\n{:?}\n\n",
optimized_logical_plan
);

// Create the physical plan
let physical_plan = ctx
.state()
.query_planner()
.create_physical_plan(&optimized_logical_plan, &ctx.state())
.await?;
println!(
"Final physical plan:\n\n{}\n\n",
displayable(physical_plan.as_ref())
.to_stringified(false, PlanType::InitialPhysicalPlan)
.plan
);

Ok(())
}
15 changes: 15 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,21 @@ impl SessionState {
query.sql_to_expr(sql_expr, df_schema, &mut PlannerContext::new())
}

/// Returns the [`Analyzer`] for this session
pub fn analyzer(&self) -> &Analyzer {
&self.analyzer
}

/// Returns the [`Optimizer`] for this session
pub fn optimizer(&self) -> &Optimizer {
&self.optimizer
}

/// Returns the [`QueryPlanner`] for this session
pub fn query_planner(&self) -> &Arc<dyn QueryPlanner + Send + Sync> {
&self.query_planner
}

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {
if let LogicalPlan::Explain(e) = plan {
Expand Down

0 comments on commit 6508fa2

Please sign in to comment.