diff --git a/Cargo.toml b/Cargo.toml index c52e442619..ba72bab789 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,5 @@ [workspace] -members = [ - "crates/*", - "delta-inspect", - "python", -] +members = ["crates/*", "delta-inspect", "python"] exclude = ["proofs"] resolver = "2" @@ -31,28 +27,29 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "50" } -arrow-arith = { version = "50" } -arrow-array = { version = "50", features = ["chrono-tz"]} -arrow-buffer = { version = "50" } -arrow-cast = { version = "50" } -arrow-ipc = { version = "50" } -arrow-json = { version = "50" } -arrow-ord = { version = "50" } -arrow-row = { version = "50" } -arrow-schema = { version = "50" } -arrow-select = { version = "50" } +arrow = { version = "51" } +arrow-arith = { version = "51" } +arrow-array = { version = "51", features = ["chrono-tz"] } +arrow-buffer = { version = "51" } +arrow-cast = { version = "51" } +arrow-ipc = { version = "51" } +arrow-json = { version = "51" } +arrow-ord = { version = "51" } +arrow-row = { version = "51" } +arrow-schema = { version = "51" } +arrow-select = { version = "51" } object_store = { version = "0.9" } -parquet = { version = "50" } +parquet = { version = "51" } # datafusion -datafusion = { version = "36" } -datafusion-expr = { version = "36" } -datafusion-common = { version = "36" } -datafusion-proto = { version = "36" } -datafusion-sql = { version = "36" } -datafusion-physical-expr = { version = "36" } -datafusion-functions = { version = "36" } +datafusion = { version = "37.1" } +datafusion-expr = { version = "37.1" } +datafusion-common = { version = "37.1" } +datafusion-proto = { version = "37.1" } +datafusion-sql = { version = "37.1" } +datafusion-physical-expr = { version = "37.1" } +datafusion-functions = { version = "37.1" } +datafusion-functions-array = { version = "37.1" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 7b5cd2adbc..1f5c136b47 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -41,6 +41,7 @@ datafusion-proto = { workspace = true, optional = true } datafusion-sql = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } datafusion-functions = { workspace = true, optional = true } +datafusion-functions-array = { workspace = true, optional = true } # serde serde = { workspace = true, features = ["derive"] } @@ -123,6 +124,7 @@ datafusion = [ "datafusion-physical-expr", "datafusion-sql", "datafusion-functions", + "datafusion-functions-array", "sqlparser", ] datafusion-ext = ["datafusion"] diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index 5a25054316..fc30f32144 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -110,12 +110,13 @@ impl SchemaProvider for ListingSchemaProvider { self.tables.iter().map(|t| t.key().clone()).collect() } - async fn table(&self, name: &str) -> Option> { - let location = self.tables.get(name).map(|t| t.clone())?; - let provider = open_table_with_storage_options(location, self.storage_options.0.clone()) - .await - .ok()?; - Some(Arc::new(provider) as Arc) + async fn table(&self, name: &str) -> datafusion_common::Result>> { + let Some(location) = self.tables.get(name).map(|t| t.clone()) else { + return Ok(None); + }; + let provider = + open_table_with_storage_options(location, self.storage_options.0.clone()).await?; + Ok(Some(Arc::new(provider) as Arc)) } fn register_table( diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 0ed539e708..6b6a4b4a63 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -8,6 +8,7 @@ use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::TableProvider; +use datafusion_common::DataFusionError; use tracing::error; use super::models::{GetTableResponse, ListCatalogsResponse, ListTableSummariesResponse}; @@ -180,25 +181,24 @@ impl SchemaProvider for UnitySchemaProvider { self.table_names.clone() } - async fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> datafusion_common::Result>> { let maybe_table = self .client .get_table(&self.catalog_name, &self.schema_name, name) .await - .ok()?; + .map_err(|err| DataFusionError::External(Box::new(err)))?; match maybe_table { GetTableResponse::Success(table) => { let table = DeltaTableBuilder::from_uri(table.storage_location) .with_storage_options(self.storage_options.clone()) .load() - .await - .ok()?; - Some(Arc::new(table)) + .await?; + Ok(Some(Arc::new(table))) } GetTableResponse::Error(err) => { error!("failed to fetch table from unity catalog: {}", err.message); - None + Err(DataFusionError::External(Box::new(err))) } } } diff --git a/crates/core/src/data_catalog/unity/models.rs b/crates/core/src/data_catalog/unity/models.rs index e1c8b7d1b7..265149b969 100644 --- a/crates/core/src/data_catalog/unity/models.rs +++ b/crates/core/src/data_catalog/unity/models.rs @@ -1,17 +1,24 @@ //! Api models for databricks unity catalog APIs +use core::fmt; use std::collections::HashMap; use serde::Deserialize; /// Error response from unity API -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] pub struct ErrorResponse { /// The error code pub error_code: String, /// The error message pub message: String, } +impl fmt::Display for ErrorResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "[{}] {}", self.error_code, self.message) + } +} +impl std::error::Error for ErrorResponse {} /// List catalogs response #[derive(Deserialize)] diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 36aa4c4b7d..67ab33bb80 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -22,7 +22,7 @@ //! Utility functions for Datafusion's Expressions use std::{ - fmt::{self, format, Display, Error, Formatter, Write}, + fmt::{self, Display, Error, Formatter, Write}, sync::Arc, }; @@ -76,6 +76,18 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> { fn get_table_source(&self, _name: TableReference) -> DFResult> { unimplemented!() } + + fn udfs_names(&self) -> Vec { + unimplemented!() + } + + fn udafs_names(&self) -> Vec { + unimplemented!() + } + + fn udwfs_names(&self) -> Vec { + unimplemented!() + } } /// Parse a string predicate into an `Expr` @@ -416,8 +428,13 @@ mod test { use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; use datafusion_common::{Column, ScalarValue, ToDFSchema}; - use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable}; + use datafusion_expr::expr::ScalarFunction; + use datafusion_expr::{ + col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition, + }; + use datafusion_functions::core::arrow_cast; use datafusion_functions::encoding::expr_fn::decode; + use datafusion_functions_array::expr_fn::cardinality; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType}; @@ -539,13 +556,24 @@ mod test { // String expression that we output must be parsable for conflict resolution. let tests = vec![ - simple!( - Expr::Cast(Cast { + ParseTest { + expr: Expr::Cast(Cast { expr: Box::new(lit(1_i64)), data_type: ArrowDataType::Int32 }), - "arrow_cast(1, 'Int32')".to_string() - ), + expected: "arrow_cast(1, 'Int32')".to_string(), + override_expected_expr: Some( + datafusion_expr::Expr::ScalarFunction( + ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(arrow_cast()), + args: vec![ + lit(ScalarValue::Int64(Some(1))), + lit(ScalarValue::Utf8(Some("Int32".into()))) + ] + } + ) + ), + }, simple!( Expr::Column(Column::from_qualified_name_ignore_case("Value3")).eq(lit(3_i64)), "Value3 = 3".to_string() @@ -624,9 +652,8 @@ mod test { substring(col("modified"), lit(0_i64), lit(4_i64)).eq(lit("2021")), "substr(modified, 0, 4) = '2021'".to_string() ), - simple!( - col("value") - .cast_to( + ParseTest { + expr: col("value").cast_to( &arrow_schema::DataType::Utf8, &table .snapshot() @@ -640,8 +667,23 @@ mod test { ) .unwrap() .eq(lit("1")), - "arrow_cast(value, 'Utf8') = '1'".to_string() - ), + expected: "arrow_cast(value, 'Utf8') = '1'".to_string(), + override_expected_expr: Some( + datafusion_expr::Expr::BinaryExpr(BinaryExpr { + left: Box::new(datafusion_expr::Expr::ScalarFunction( + ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(arrow_cast()), + args: vec![ + col("value"), + lit(ScalarValue::Utf8(Some("Utf8".into()))) + ] + } + )), + op: datafusion_expr::Operator::Eq, + right: Box::new(lit(ScalarValue::Utf8(Some("1".into())))) + }) + ), + }, simple!( col("_struct").field("a").eq(lit(20_i64)), "_struct['a'] = 20".to_string() @@ -662,11 +704,16 @@ mod test { expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))), expected: "_timestamp_ntz > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, None)')".to_string(), override_expected_expr: Some(col("_timestamp_ntz").gt( - datafusion_expr::Expr::Cast( Cast { - expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), - data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None) - } - ))), + datafusion_expr::Expr::ScalarFunction( + ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(arrow_cast()), + args: vec![ + lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))), + lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, None)".into()))) + ] + } + ) + )), }, ParseTest { expr: col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond( @@ -675,10 +722,16 @@ mod test { ))), expected: "_timestamp > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, Some(\"UTC\"))')".to_string(), override_expected_expr: Some(col("_timestamp").gt( - datafusion_expr::Expr::Cast( Cast { - expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), - data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into())) - }))), + datafusion_expr::Expr::ScalarFunction( + ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(arrow_cast()), + args: vec![ + lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))), + lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, Some(\"UTC\"))".into()))) + ] + } + ) + )), }, ]; diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 65d113ee5b..2e8d26dee3 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -17,7 +17,6 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, Phy use datafusion::prelude::SessionContext; use datafusion_common::{DFSchemaRef, Result, ToDFSchema}; use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode}; -use datafusion_physical_expr::create_physical_expr; use lazy_static::lazy_static; use crate::delta_datafusion::find_files::logical::FindFilesNode; @@ -29,6 +28,8 @@ use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTableError; +use super::create_physical_expr_fix; + pub mod logical; pub mod physical; @@ -160,8 +161,8 @@ async fn scan_table_by_files( let input_schema = scan.logical_schema.as_ref().to_owned(); let input_dfschema = input_schema.clone().try_into()?; - let predicate_expr = create_physical_expr( - &Expr::IsTrue(Box::new(expression.clone())), + let predicate_expr = create_physical_expr_fix( + Expr::IsTrue(Box::new(expression.clone())), &input_dfschema, state.execution_props(), )?; diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs index 56c7ca9989..eb09d2d94b 100644 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -9,11 +9,13 @@ use arrow_schema::SchemaRef; use datafusion::error::Result; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::memory::MemoryStream; -use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; use datafusion::prelude::SessionContext; use datafusion_common::tree_node::TreeNode; use datafusion_expr::Expr; -use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use futures::stream::BoxStream; use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; @@ -28,6 +30,7 @@ pub struct FindFilesExec { predicate: Expr, state: DeltaTableState, log_store: LogStoreRef, + plan_properties: PlanProperties, } impl FindFilesExec { @@ -36,6 +39,11 @@ impl FindFilesExec { predicate, log_store, state, + plan_properties: PlanProperties::new( + EquivalenceProperties::new(ONLY_FILES_SCHEMA.clone()), + Partitioning::RoundRobinBatch(num_cpus::get()), + ExecutionMode::Bounded, + ), }) } } @@ -85,12 +93,8 @@ impl ExecutionPlan for FindFilesExec { ONLY_FILES_SCHEMA.clone() } - fn output_partitioning(&self) -> Partitioning { - Partitioning::RoundRobinBatch(num_cpus::get()) - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None + fn properties(&self) -> &PlanProperties { + &self.plan_properties } fn children(&self) -> Vec> { diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 85db6d6722..178133f54d 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -49,16 +49,15 @@ use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; -use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, Statistics, }; use datafusion_common::scalar::ScalarValue; -use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult, ToDFSchema, @@ -66,9 +65,14 @@ use datafusion_common::{ use datafusion_expr::expr::ScalarFunction; use datafusion_expr::logical_plan::CreateExternalTable; use datafusion_expr::utils::conjunction; -use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility}; +use datafusion_expr::{ + col, Expr, Extension, GetFieldAccess, GetIndexedField, LogicalPlan, + TableProviderFilterPushDown, Volatility, +}; +use datafusion_functions::expr_fn::get_field; +use datafusion_functions_array::extract::{array_element, array_slice}; use datafusion_physical_expr::execution_props::ExecutionProps; -use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; +use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_sql::planner::ParserOptions; @@ -248,7 +252,7 @@ pub(crate) fn files_matching_predicate<'a>( if let Some(Some(predicate)) = (!filters.is_empty()).then_some(conjunction(filters.iter().cloned())) { - let expr = logical_expr_to_physical_expr(&predicate, snapshot.arrow_schema()?.as_ref()); + let expr = logical_expr_to_physical_expr(predicate, snapshot.arrow_schema()?.as_ref()); let pruning_predicate = PruningPredicate::try_new(expr, snapshot.arrow_schema()?)?; Ok(Either::Left( snapshot @@ -528,7 +532,7 @@ impl<'a> DeltaScanBuilder<'a> { let logical_filter = self .filter - .map(|expr| logical_expr_to_physical_expr(&expr, &logical_schema)); + .map(|expr| logical_expr_to_physical_expr(expr, &logical_schema)); // Perform Pruning of files to scan let files = match self.files { @@ -820,12 +824,8 @@ impl ExecutionPlan for DeltaScan { self.parquet_scan.schema() } - fn output_partitioning(&self) -> Partitioning { - self.parquet_scan.output_partitioning() - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.parquet_scan.output_ordering() + fn properties(&self) -> &PlanProperties { + self.parquet_scan.properties() } fn children(&self) -> Vec> { @@ -934,6 +934,10 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult Err(DeltaTableError::Generic(format!( "Unsupported data type for Delta Lake {}", t @@ -1039,20 +1043,64 @@ pub(crate) fn to_correct_scalar_value( } pub(crate) fn logical_expr_to_physical_expr( - expr: &Expr, + expr: Expr, schema: &ArrowSchema, ) -> Arc { let df_schema = schema.clone().to_dfschema().unwrap(); let execution_props = ExecutionProps::new(); - create_physical_expr(expr, &df_schema, &execution_props).unwrap() + create_physical_expr_fix(expr, &df_schema, &execution_props).unwrap() +} + +// TODO This should be removed after datafusion v38 +pub(crate) fn create_physical_expr_fix( + expr: Expr, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result, DataFusionError> { + // Support Expr::struct by rewriting expressions. + let expr = expr + .transform_up(&|expr| { + // see https://github.com/apache/datafusion/issues/10181 + // This is part of the function rewriter code in DataFusion inlined here temporarily + Ok(match expr { + Expr::GetIndexedField(GetIndexedField { + expr, + field: GetFieldAccess::NamedStructField { name }, + }) => { + let name = Expr::Literal(name); + Transformed::yes(get_field(*expr, name)) + } + // expr[idx] ==> array_element(expr, idx) + Expr::GetIndexedField(GetIndexedField { + expr, + field: GetFieldAccess::ListIndex { key }, + }) => Transformed::yes(array_element(*expr, *key)), + + // expr[start, stop, stride] ==> array_slice(expr, start, stop, stride) + Expr::GetIndexedField(GetIndexedField { + expr, + field: + GetFieldAccess::ListRange { + start, + stop, + stride, + }, + }) => Transformed::yes(array_slice(*expr, *start, *stop, *stride)), + + _ => Transformed::no(expr), + }) + })? + .data; + + datafusion_physical_expr::create_physical_expr(&expr, input_dfschema, execution_props) } pub(crate) async fn execute_plan_to_batch( state: &SessionState, plan: Arc, ) -> DeltaResult { - let data = - futures::future::try_join_all((0..plan.output_partitioning().partition_count()).map(|p| { + let data = futures::future::try_join_all( + (0..plan.properties().output_partitioning().partition_count()).map(|p| { let plan_copy = plan.clone(); let task_context = state.task_ctx().clone(); async move { @@ -1064,8 +1112,9 @@ pub(crate) async fn execute_plan_to_batch( DataFusionResult::<_>::Ok(arrow::compute::concat_batches(&schema, batches.iter())?) } - })) - .await?; + }), + ) + .await?; let batch = arrow::compute::concat_batches(&plan.schema(), data.iter())?; @@ -1315,9 +1364,9 @@ pub(crate) struct FindFilesExprProperties { /// non-deterministic functions, and determine if the expression only contains /// partition columns impl TreeNodeVisitor for FindFilesExprProperties { - type N = Expr; + type Node = Expr; - fn pre_visit(&mut self, expr: &Self::N) -> datafusion_common::Result { + fn f_down(&mut self, expr: &Self::Node) -> datafusion_common::Result { // TODO: We can likely relax the volatility to STABLE. Would require further // research to confirm the same value is generated during the scan and // rewrite phases. @@ -1358,7 +1407,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { self.result = Err(DeltaTableError::Generic(format!( "Cannot determine volatility of find files predicate function {n}", ))); - return Ok(VisitRecursion::Stop); + return Ok(TreeNodeRecursion::Stop); } }; if v > Volatility::Immutable { @@ -1366,7 +1415,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { "Find files predicate contains nondeterministic function {}", func_def.name() ))); - return Ok(VisitRecursion::Stop); + return Ok(TreeNodeRecursion::Stop); } } _ => { @@ -1374,11 +1423,11 @@ impl TreeNodeVisitor for FindFilesExprProperties { "Find files predicate contains unsupported expression {}", expr ))); - return Ok(VisitRecursion::Stop); + return Ok(TreeNodeRecursion::Stop); } } - Ok(VisitRecursion::Continue) + Ok(TreeNodeRecursion::Continue) } } @@ -1478,8 +1527,8 @@ pub(crate) async fn find_files_scan<'a>( let input_schema = scan.logical_schema.as_ref().to_owned(); let input_dfschema = input_schema.clone().try_into()?; - let predicate_expr = create_physical_expr( - &Expr::IsTrue(Box::new(expression.clone())), + let predicate_expr = create_physical_expr_fix( + Expr::IsTrue(Box::new(expression.clone())), &input_dfschema, state.execution_props(), )?; diff --git a/crates/core/src/delta_datafusion/physical.rs b/crates/core/src/delta_datafusion/physical.rs index 954df0b046..0251836fa8 100644 --- a/crates/core/src/delta_datafusion/physical.rs +++ b/crates/core/src/delta_datafusion/physical.rs @@ -82,12 +82,8 @@ impl ExecutionPlan for MetricObserverExec { self.parent.schema() } - fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { - self.parent.output_partitioning() - } - - fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { - self.parent.output_ordering() + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + self.parent.properties() } fn children(&self) -> Vec> { diff --git a/crates/core/src/kernel/expressions/scalars.rs b/crates/core/src/kernel/expressions/scalars.rs index f3753bcb60..571c2abf92 100644 --- a/crates/core/src/kernel/expressions/scalars.rs +++ b/crates/core/src/kernel/expressions/scalars.rs @@ -273,6 +273,10 @@ impl Scalar { | Dictionary(_, _) | RunEndEncoded(_, _) | Union(_, _) + | Utf8View + | BinaryView + | ListView(_) + | LargeListView(_) | Null => None, } } diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 1c5be2ccfc..91539ef1a6 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -128,7 +128,7 @@ impl std::future::IntoFuture for ConstraintBuilder { let plan: Arc = Arc::new(scan); let mut tasks = vec![]; - for p in 0..plan.output_partitioning().partition_count() { + for p in 0..plan.properties().output_partitioning().partition_count() { let inner_plan = plan.clone(); let inner_checker = checker.clone(); let task_ctx = Arc::new(TaskContext::from(&state)); diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 6fb76891b1..8d13d51b4e 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -23,7 +23,6 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH}; use crate::logstore::LogStoreRef; use datafusion::execution::context::{SessionContext, SessionState}; -use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; @@ -37,7 +36,8 @@ use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ - find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext, + create_physical_expr_fix, find_files, register_store, DataFusionMixins, DeltaScanBuilder, + DeltaSessionContext, }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; @@ -148,11 +148,8 @@ async fn excute_non_empty_expr( // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); - let predicate_expr = create_physical_expr( - &negated_expression, - &input_dfschema, - state.execution_props(), - )?; + let predicate_expr = + create_physical_expr_fix(negated_expression, &input_dfschema, state.execution_props())?; let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 8cc0c2d804..2a247a717b 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -75,18 +75,14 @@ impl ExecutionPlan for MergeBarrierExec { self.input.schema() } - fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning { - self.input.output_partitioning() + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + self.input.properties() } fn required_input_distribution(&self) -> Vec { vec![Distribution::HashPartitioned(vec![self.expr.clone()]); 1] } - fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { - None - } - fn children(&self) -> Vec> { vec![self.input.clone()] } diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index e901d5e8ae..44c3b499c1 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -850,11 +850,12 @@ fn replace_placeholders(expr: Expr, placeholders: &HashMap) Expr::Placeholder(Placeholder { id, .. }) => { let value = placeholders[&id].clone(); // Replace the placeholder with the value - Ok(Transformed::Yes(Expr::Literal(value))) + Ok(Transformed::yes(Expr::Literal(value))) } - _ => Ok(Transformed::No(expr)), + _ => Ok(Transformed::no(expr)), }) .unwrap() + .data } async fn try_construct_early_filter( @@ -1468,16 +1469,17 @@ async fn execute( fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr { expr.transform(&|expr| match expr { Expr::Column(c) => match c.relation { - Some(rel) if rel.table() == table_alias => Ok(Transformed::Yes(Expr::Column( + Some(rel) if rel.table() == table_alias => Ok(Transformed::yes(Expr::Column( Column::new_unqualified(c.name), ))), - _ => Ok(Transformed::No(Expr::Column(Column::new( + _ => Ok(Transformed::no(Expr::Column(Column::new( c.relation, c.name, )))), }, - _ => Ok(Transformed::No(expr)), + _ => Ok(Transformed::no(expr)), }) .unwrap() + .data } // TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future. diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index 8f21018364..cf8255d4bf 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -144,7 +144,7 @@ impl<'a> AddContainer<'a> { /// so evaluating expressions is inexact. However, excluded files are guaranteed (for a correct log) /// to not contain matches by the predicate expression. pub fn predicate_matches(&self, predicate: Expr) -> DeltaResult> { - let expr = logical_expr_to_physical_expr(&predicate, &self.schema); + let expr = logical_expr_to_physical_expr(predicate, &self.schema); let pruning_predicate = PruningPredicate::try_new(expr, self.schema.clone())?; Ok(self .inner @@ -214,6 +214,21 @@ impl<'a> PruningStatistics for AddContainer<'a> { ScalarValue::iter_to_array(values).ok() } + /// return the number of rows for the named column in each container + /// as an `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows + fn row_counts(&self, _column: &Column) -> Option { + let values = self.inner.iter().map(|add| { + if let Ok(Some(statistics)) = add.get_stats() { + ScalarValue::UInt64(Some(statistics.num_records as u64)) + } else { + ScalarValue::UInt64(None) + } + }); + ScalarValue::iter_to_array(values).ok() + } + // This function is required since DataFusion 35.0, but is implemented as a no-op // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 fn contained(&self, _column: &Column, _value: &HashSet) -> Option { @@ -257,6 +272,17 @@ impl PruningStatistics for EagerSnapshot { container.null_counts(column) } + /// return the number of rows for the named column in each container + /// as an `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows + fn row_counts(&self, column: &Column) -> Option { + let files = self.file_actions().ok()?.collect_vec(); + let partition_columns = &self.metadata().partition_columns; + let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); + container.row_counts(column) + } + // This function is required since DataFusion 35.0, but is implemented as a no-op // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 fn contained(&self, _column: &Column, _value: &HashSet) -> Option { @@ -281,6 +307,10 @@ impl PruningStatistics for DeltaTableState { self.snapshot.null_counts(column) } + fn row_counts(&self, column: &Column) -> Option { + self.snapshot.row_counts(column) + } + fn contained(&self, column: &Column, values: &HashSet) -> Option { self.snapshot.contained(column, values) } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index b0406fa9a5..9f4f6d51a3 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -34,7 +34,6 @@ use datafusion::{ use datafusion_common::{Column, DFSchema, ScalarValue}; use datafusion_expr::{case, col, lit, when, Expr}; use datafusion_physical_expr::{ - create_physical_expr, expressions::{self}, PhysicalExpr, }; @@ -49,8 +48,8 @@ use super::{ transaction::{CommitBuilder, CommitProperties}, }; use crate::delta_datafusion::{ - expr::fmt_expr_to_sql, physical::MetricObserverExec, DataFusionMixins, DeltaColumn, - DeltaSessionContext, + create_physical_expr_fix, expr::fmt_expr_to_sql, physical::MetricObserverExec, + DataFusionMixins, DeltaColumn, DeltaSessionContext, }; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; @@ -265,7 +264,8 @@ async fn execute( let predicate_null = when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; - let predicate_expr = create_physical_expr(&predicate_null, &input_dfschema, execution_props)?; + let predicate_expr = + create_physical_expr_fix(predicate_null, &input_dfschema, execution_props)?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); let projection_predicate: Arc = @@ -312,7 +312,7 @@ async fn execute( let expr = case(col("__delta_rs_update_predicate")) .when(lit(true), expr.to_owned()) .otherwise(col(column.to_owned()))?; - let predicate_expr = create_physical_expr(&expr, &input_dfschema, execution_props)?; + let predicate_expr = create_physical_expr_fix(expr, &input_dfschema, execution_props)?; map.insert(column.name.clone(), expressions.len()); let c = "__delta_rs_".to_string() + &column.name; expressions.push((predicate_expr, c.clone())); diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index e87f3b6e50..475abefbe2 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -34,7 +34,6 @@ use arrow_array::RecordBatch; use arrow_cast::can_cast_types; use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef}; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; -use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; use datafusion_common::DFSchema; @@ -49,7 +48,9 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::CreateBuilder; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::expr::parse_predicate_expression; -use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; +use crate::delta_datafusion::{ + create_physical_expr_fix, find_files, register_store, DeltaScanBuilder, +}; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, StructType}; @@ -375,7 +376,7 @@ async fn write_execution_plan_with_predicate( // Write data to disk let mut tasks = vec![]; - for i in 0..plan.output_partitioning().partition_count() { + for i in 0..plan.properties().output_partitioning().partition_count() { let inner_plan = plan.clone(); let inner_schema = schema.clone(); let task_ctx = Arc::new(TaskContext::from(&state)); @@ -478,11 +479,8 @@ async fn execute_non_empty_expr( // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); - let predicate_expr = create_physical_expr( - &negated_expression, - &input_dfschema, - state.execution_props(), - )?; + let predicate_expr = + create_physical_expr_fix(negated_expression, &input_dfschema, state.execution_props())?; let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index 4be66534fe..54a70faf53 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -440,7 +440,7 @@ mod local { ) -> Result { let mut metrics = ExecutionMetricsCollector::default(); let scan = table.scan(state, None, e, None).await?; - if scan.output_partitioning().partition_count() > 0 { + if scan.properties().output_partitioning().partition_count() > 0 { let plan = CoalescePartitionsExec::new(scan); let task_ctx = Arc::new(TaskContext::from(state)); let _result = collect(plan.execute(0, task_ctx)?).await?; diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 099f97087d..0be14d59b0 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -155,6 +155,18 @@ mod tests { fn get_window_meta(&self, _name: &str) -> Option> { None } + + fn udfs_names(&self) -> Vec { + Vec::new() + } + + fn udafs_names(&self) -> Vec { + Vec::new() + } + + fn udwfs_names(&self) -> Vec { + Vec::new() + } } fn create_table_source(fields: Vec) -> Arc {