From a3760c53762d8fa434805865d7aa7aa2aa9e2c8a Mon Sep 17 00:00:00 2001 From: Luis Alberto Santos Date: Sun, 7 Apr 2024 11:39:37 +0200 Subject: [PATCH 1/5] bump arrow and datafusion --- Cargo.toml | 39 ++++++++++--------- crates/core/Cargo.toml | 2 + crates/core/src/data_catalog/storage/mod.rs | 13 ++++--- .../core/src/data_catalog/unity/datafusion.rs | 12 +++--- crates/core/src/data_catalog/unity/models.rs | 9 ++++- crates/core/src/delta_datafusion/expr.rs | 19 +++++++-- .../delta_datafusion/find_files/physical.rs | 20 ++++++---- crates/core/src/delta_datafusion/mod.rs | 38 +++++++++--------- crates/core/src/delta_datafusion/physical.rs | 8 +--- crates/core/src/kernel/expressions/scalars.rs | 4 ++ crates/core/src/operations/constraints.rs | 2 +- crates/core/src/operations/merge/barrier.rs | 8 +--- crates/core/src/operations/merge/mod.rs | 12 +++--- .../core/src/operations/transaction/state.rs | 23 +++++++++++ crates/core/src/operations/write.rs | 2 +- crates/core/tests/integration_datafusion.rs | 2 +- crates/sql/src/planner.rs | 12 ++++++ 17 files changed, 143 insertions(+), 82 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c52e442619..932685c2cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,28 +31,29 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "50" } -arrow-arith = { version = "50" } -arrow-array = { version = "50", features = ["chrono-tz"]} -arrow-buffer = { version = "50" } -arrow-cast = { version = "50" } -arrow-ipc = { version = "50" } -arrow-json = { version = "50" } -arrow-ord = { version = "50" } -arrow-row = { version = "50" } -arrow-schema = { version = "50" } -arrow-select = { version = "50" } +arrow = { version = "51" } +arrow-arith = { version = "51" } +arrow-array = { version = "51", features = ["chrono-tz"] } +arrow-buffer = { version = "51" } +arrow-cast = { version = "51" } +arrow-ipc = { version = "51" } +arrow-json = { version = "51" } +arrow-ord = { version = "51" } +arrow-row = { version = "51" } +arrow-schema = { version = "51" } +arrow-select = { version = "51" } object_store = { version = "0.9" } -parquet = { version = "50" } +parquet = { version = "51" } # datafusion -datafusion = { version = "36" } -datafusion-expr = { version = "36" } -datafusion-common = { version = "36" } -datafusion-proto = { version = "36" } -datafusion-sql = { version = "36" } -datafusion-physical-expr = { version = "36" } -datafusion-functions = { version = "36" } +datafusion = { version = "37" } +datafusion-expr = { version = "37" } +datafusion-common = { version = "37" } +datafusion-proto = { version = "37" } +datafusion-sql = { version = "37" } +datafusion-physical-expr = { version = "37" } +datafusion-functions = { version = "37" } +datafusion-functions-array = { version = "37" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 7b5cd2adbc..1f5c136b47 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -41,6 +41,7 @@ datafusion-proto = { workspace = true, optional = true } datafusion-sql = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } datafusion-functions = { workspace = true, optional = true } +datafusion-functions-array = { workspace = true, optional = true } # serde serde = { workspace = true, features = ["derive"] } @@ -123,6 +124,7 @@ datafusion = [ "datafusion-physical-expr", "datafusion-sql", "datafusion-functions", + "datafusion-functions-array", "sqlparser", ] datafusion-ext = ["datafusion"] diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index 5a25054316..fc30f32144 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -110,12 +110,13 @@ impl SchemaProvider for ListingSchemaProvider { self.tables.iter().map(|t| t.key().clone()).collect() } - async fn table(&self, name: &str) -> Option> { - let location = self.tables.get(name).map(|t| t.clone())?; - let provider = open_table_with_storage_options(location, self.storage_options.0.clone()) - .await - .ok()?; - Some(Arc::new(provider) as Arc) + async fn table(&self, name: &str) -> datafusion_common::Result>> { + let Some(location) = self.tables.get(name).map(|t| t.clone()) else { + return Ok(None); + }; + let provider = + open_table_with_storage_options(location, self.storage_options.0.clone()).await?; + Ok(Some(Arc::new(provider) as Arc)) } fn register_table( diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 0ed539e708..6b6a4b4a63 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -8,6 +8,7 @@ use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::TableProvider; +use datafusion_common::DataFusionError; use tracing::error; use super::models::{GetTableResponse, ListCatalogsResponse, ListTableSummariesResponse}; @@ -180,25 +181,24 @@ impl SchemaProvider for UnitySchemaProvider { self.table_names.clone() } - async fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> datafusion_common::Result>> { let maybe_table = self .client .get_table(&self.catalog_name, &self.schema_name, name) .await - .ok()?; + .map_err(|err| DataFusionError::External(Box::new(err)))?; match maybe_table { GetTableResponse::Success(table) => { let table = DeltaTableBuilder::from_uri(table.storage_location) .with_storage_options(self.storage_options.clone()) .load() - .await - .ok()?; - Some(Arc::new(table)) + .await?; + Ok(Some(Arc::new(table))) } GetTableResponse::Error(err) => { error!("failed to fetch table from unity catalog: {}", err.message); - None + Err(DataFusionError::External(Box::new(err))) } } } diff --git a/crates/core/src/data_catalog/unity/models.rs b/crates/core/src/data_catalog/unity/models.rs index e1c8b7d1b7..265149b969 100644 --- a/crates/core/src/data_catalog/unity/models.rs +++ b/crates/core/src/data_catalog/unity/models.rs @@ -1,17 +1,24 @@ //! Api models for databricks unity catalog APIs +use core::fmt; use std::collections::HashMap; use serde::Deserialize; /// Error response from unity API -#[derive(Deserialize)] +#[derive(Debug, Deserialize)] pub struct ErrorResponse { /// The error code pub error_code: String, /// The error message pub message: String, } +impl fmt::Display for ErrorResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "[{}] {}", self.error_code, self.message) + } +} +impl std::error::Error for ErrorResponse {} /// List catalogs response #[derive(Deserialize)] diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 4e6dc02ac4..8db4567ec0 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -22,12 +22,12 @@ //! Utility functions for Datafusion's Expressions use std::{ - fmt::{self, format, Display, Error, Formatter, Write}, + fmt::{self, Display, Error, Formatter, Write}, sync::Arc, }; use arrow_schema::DataType; -use chrono::{Date, NaiveDate, NaiveDateTime, TimeZone}; +use chrono::{NaiveDate, NaiveDateTime}; use datafusion::execution::context::SessionState; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; @@ -76,6 +76,18 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> { fn get_table_source(&self, _name: TableReference) -> DFResult> { unimplemented!() } + + fn udfs_names(&self) -> Vec { + unimplemented!() + } + + fn udafs_names(&self) -> Vec { + unimplemented!() + } + + fn udwfs_names(&self) -> Vec { + unimplemented!() + } } /// Parse a string predicate into an `Expr` @@ -417,8 +429,9 @@ mod test { use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; use datafusion_common::{Column, ScalarValue, ToDFSchema}; - use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable}; + use datafusion_expr::{col, lit, substring, Cast, Expr, ExprSchemable}; use datafusion_functions::encoding::expr_fn::decode; + use datafusion_functions_array::expr_fn::cardinality; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType}; diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs index 56c7ca9989..eb09d2d94b 100644 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -9,11 +9,13 @@ use arrow_schema::SchemaRef; use datafusion::error::Result; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::memory::MemoryStream; -use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; use datafusion::prelude::SessionContext; use datafusion_common::tree_node::TreeNode; use datafusion_expr::Expr; -use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use futures::stream::BoxStream; use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; @@ -28,6 +30,7 @@ pub struct FindFilesExec { predicate: Expr, state: DeltaTableState, log_store: LogStoreRef, + plan_properties: PlanProperties, } impl FindFilesExec { @@ -36,6 +39,11 @@ impl FindFilesExec { predicate, log_store, state, + plan_properties: PlanProperties::new( + EquivalenceProperties::new(ONLY_FILES_SCHEMA.clone()), + Partitioning::RoundRobinBatch(num_cpus::get()), + ExecutionMode::Bounded, + ), }) } } @@ -85,12 +93,8 @@ impl ExecutionPlan for FindFilesExec { ONLY_FILES_SCHEMA.clone() } - fn output_partitioning(&self) -> Partitioning { - Partitioning::RoundRobinBatch(num_cpus::get()) - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None + fn properties(&self) -> &PlanProperties { + &self.plan_properties } fn children(&self) -> Vec> { diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 4d6f2e827f..d6a9042e60 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -49,16 +49,15 @@ use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; -use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, Statistics, }; use datafusion_common::scalar::ScalarValue; -use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ Column, DFSchema, DataFusionError, Result as DataFusionResult, ToDFSchema, }; @@ -819,12 +818,8 @@ impl ExecutionPlan for DeltaScan { self.parquet_scan.schema() } - fn output_partitioning(&self) -> Partitioning { - self.parquet_scan.output_partitioning() - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.parquet_scan.output_ordering() + fn properties(&self) -> &PlanProperties { + self.parquet_scan.properties() } fn children(&self) -> Vec> { @@ -916,6 +911,10 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult Err(DeltaTableError::Generic(format!( "Unsupported data type for Delta Lake {}", t @@ -1033,8 +1032,8 @@ pub(crate) async fn execute_plan_to_batch( state: &SessionState, plan: Arc, ) -> DeltaResult { - let data = - futures::future::try_join_all((0..plan.output_partitioning().partition_count()).map(|p| { + let data = futures::future::try_join_all( + (0..plan.properties().output_partitioning().partition_count()).map(|p| { let plan_copy = plan.clone(); let task_context = state.task_ctx().clone(); async move { @@ -1046,8 +1045,9 @@ pub(crate) async fn execute_plan_to_batch( DataFusionResult::<_>::Ok(arrow::compute::concat_batches(&schema, batches.iter())?) } - })) - .await?; + }), + ) + .await?; let batch = arrow::compute::concat_batches(&plan.schema(), data.iter())?; @@ -1297,9 +1297,9 @@ pub(crate) struct FindFilesExprProperties { /// non-deterministic functions, and determine if the expression only contains /// partition columns impl TreeNodeVisitor for FindFilesExprProperties { - type N = Expr; + type Node = Expr; - fn pre_visit(&mut self, expr: &Self::N) -> datafusion_common::Result { + fn f_down(&mut self, expr: &Self::Node) -> datafusion_common::Result { // TODO: We can likely relax the volatility to STABLE. Would require further // research to confirm the same value is generated during the scan and // rewrite phases. @@ -1340,7 +1340,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { self.result = Err(DeltaTableError::Generic(format!( "Cannot determine volatility of find files predicate function {n}", ))); - return Ok(VisitRecursion::Stop); + return Ok(TreeNodeRecursion::Stop); } }; if v > Volatility::Immutable { @@ -1348,7 +1348,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { "Find files predicate contains nondeterministic function {}", func_def.name() ))); - return Ok(VisitRecursion::Stop); + return Ok(TreeNodeRecursion::Stop); } } _ => { @@ -1356,11 +1356,11 @@ impl TreeNodeVisitor for FindFilesExprProperties { "Find files predicate contains unsupported expression {}", expr ))); - return Ok(VisitRecursion::Stop); + return Ok(TreeNodeRecursion::Stop); } } - Ok(VisitRecursion::Continue) + Ok(TreeNodeRecursion::Continue) } } diff --git a/crates/core/src/delta_datafusion/physical.rs b/crates/core/src/delta_datafusion/physical.rs index 954df0b046..0251836fa8 100644 --- a/crates/core/src/delta_datafusion/physical.rs +++ b/crates/core/src/delta_datafusion/physical.rs @@ -82,12 +82,8 @@ impl ExecutionPlan for MetricObserverExec { self.parent.schema() } - fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { - self.parent.output_partitioning() - } - - fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { - self.parent.output_ordering() + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + self.parent.properties() } fn children(&self) -> Vec> { diff --git a/crates/core/src/kernel/expressions/scalars.rs b/crates/core/src/kernel/expressions/scalars.rs index f3753bcb60..571c2abf92 100644 --- a/crates/core/src/kernel/expressions/scalars.rs +++ b/crates/core/src/kernel/expressions/scalars.rs @@ -273,6 +273,10 @@ impl Scalar { | Dictionary(_, _) | RunEndEncoded(_, _) | Union(_, _) + | Utf8View + | BinaryView + | ListView(_) + | LargeListView(_) | Null => None, } } diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index ee5ed16da9..20490bd400 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -126,7 +126,7 @@ impl std::future::IntoFuture for ConstraintBuilder { let plan: Arc = Arc::new(scan); let mut tasks = vec![]; - for p in 0..plan.output_partitioning().partition_count() { + for p in 0..plan.properties().output_partitioning().partition_count() { let inner_plan = plan.clone(); let inner_checker = checker.clone(); let task_ctx = Arc::new(TaskContext::from(&state)); diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 8cc0c2d804..2a247a717b 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -75,18 +75,14 @@ impl ExecutionPlan for MergeBarrierExec { self.input.schema() } - fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning { - self.input.output_partitioning() + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + self.input.properties() } fn required_input_distribution(&self) -> Vec { vec![Distribution::HashPartitioned(vec![self.expr.clone()]); 1] } - fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { - None - } - fn children(&self) -> Vec> { vec![self.input.clone()] } diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 206ee0e899..86d09701d0 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -848,11 +848,12 @@ fn replace_placeholders(expr: Expr, placeholders: &HashMap) Expr::Placeholder(Placeholder { id, .. }) => { let value = placeholders[&id].clone(); // Replace the placeholder with the value - Ok(Transformed::Yes(Expr::Literal(value))) + Ok(Transformed::yes(Expr::Literal(value))) } - _ => Ok(Transformed::No(expr)), + _ => Ok(Transformed::no(expr)), }) .unwrap() + .data } async fn try_construct_early_filter( @@ -1468,16 +1469,17 @@ async fn execute( fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr { expr.transform(&|expr| match expr { Expr::Column(c) => match c.relation { - Some(rel) if rel.table() == table_alias => Ok(Transformed::Yes(Expr::Column( + Some(rel) if rel.table() == table_alias => Ok(Transformed::yes(Expr::Column( Column::new_unqualified(c.name), ))), - _ => Ok(Transformed::No(Expr::Column(Column::new( + _ => Ok(Transformed::no(Expr::Column(Column::new( c.relation, c.name, )))), }, - _ => Ok(Transformed::No(expr)), + _ => Ok(Transformed::no(expr)), }) .unwrap() + .data } // TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future. diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index 8f21018364..e6f48f4ec5 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -214,6 +214,14 @@ impl<'a> PruningStatistics for AddContainer<'a> { ScalarValue::iter_to_array(values).ok() } + /// return the number of rows for the named column in each container + /// as an `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows + fn row_counts(&self, column: &Column) -> Option { + todo!() + } + // This function is required since DataFusion 35.0, but is implemented as a no-op // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 fn contained(&self, _column: &Column, _value: &HashSet) -> Option { @@ -257,6 +265,17 @@ impl PruningStatistics for EagerSnapshot { container.null_counts(column) } + /// return the number of rows for the named column in each container + /// as an `Option`. + /// + /// Note: the returned array must contain `num_containers()` rows + fn row_counts(&self, column: &Column) -> Option { + let files = self.file_actions().ok()?.collect_vec(); + let partition_columns = &self.metadata().partition_columns; + let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?); + container.row_counts(column) + } + // This function is required since DataFusion 35.0, but is implemented as a no-op // https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550 fn contained(&self, _column: &Column, _value: &HashSet) -> Option { @@ -281,6 +300,10 @@ impl PruningStatistics for DeltaTableState { self.snapshot.null_counts(column) } + fn row_counts(&self, column: &Column) -> Option { + self.snapshot.row_counts(column) + } + fn contained(&self, column: &Column, values: &HashSet) -> Option { self.snapshot.contained(column, values) } diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 10b48a768c..d7438fc1fa 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -373,7 +373,7 @@ async fn write_execution_plan_with_predicate( // Write data to disk let mut tasks = vec![]; - for i in 0..plan.output_partitioning().partition_count() { + for i in 0..plan.properties().output_partitioning().partition_count() { let inner_plan = plan.clone(); let inner_schema = schema.clone(); let task_ctx = Arc::new(TaskContext::from(&state)); diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index 4be66534fe..54a70faf53 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -440,7 +440,7 @@ mod local { ) -> Result { let mut metrics = ExecutionMetricsCollector::default(); let scan = table.scan(state, None, e, None).await?; - if scan.output_partitioning().partition_count() > 0 { + if scan.properties().output_partitioning().partition_count() > 0 { let plan = CoalescePartitionsExec::new(scan); let task_ctx = Arc::new(TaskContext::from(state)); let _result = collect(plan.execute(0, task_ctx)?).await?; diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 099f97087d..0be14d59b0 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -155,6 +155,18 @@ mod tests { fn get_window_meta(&self, _name: &str) -> Option> { None } + + fn udfs_names(&self) -> Vec { + Vec::new() + } + + fn udafs_names(&self) -> Vec { + Vec::new() + } + + fn udwfs_names(&self) -> Vec { + Vec::new() + } } fn create_table_source(fields: Vec) -> Arc { From 55441d72f89a85904bbddaf3016a96b341bd0a74 Mon Sep 17 00:00:00 2001 From: Luis Alberto Santos Date: Sun, 7 Apr 2024 12:25:08 +0200 Subject: [PATCH 2/5] impl add row_counts --- crates/core/src/operations/transaction/state.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index e6f48f4ec5..60bfcbccf4 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -218,8 +218,15 @@ impl<'a> PruningStatistics for AddContainer<'a> { /// as an `Option`. /// /// Note: the returned array must contain `num_containers()` rows - fn row_counts(&self, column: &Column) -> Option { - todo!() + fn row_counts(&self, _column: &Column) -> Option { + let values = self.inner.iter().map(|add| { + if let Ok(Some(statistics)) = add.get_stats() { + ScalarValue::UInt64(Some(statistics.num_records as u64)) + } else { + ScalarValue::UInt64(None) + } + }); + ScalarValue::iter_to_array(values).ok() } // This function is required since DataFusion 35.0, but is implemented as a no-op From 30c52fe9408de8d96b4618c336c2202e896a967b Mon Sep 17 00:00:00 2001 From: Luis Alberto Santos Date: Sun, 7 Apr 2024 15:58:07 +0200 Subject: [PATCH 3/5] fix test_expr_sql --- crates/core/src/delta_datafusion/expr.rs | 78 ++++++++++++++++++------ 1 file changed, 59 insertions(+), 19 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 8db4567ec0..0b0e295658 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -429,7 +429,11 @@ mod test { use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; use datafusion_common::{Column, ScalarValue, ToDFSchema}; - use datafusion_expr::{col, lit, substring, Cast, Expr, ExprSchemable}; + use datafusion_expr::expr::ScalarFunction; + use datafusion_expr::{ + col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition, + }; + use datafusion_functions::core::arrow_cast; use datafusion_functions::encoding::expr_fn::decode; use datafusion_functions_array::expr_fn::cardinality; @@ -553,13 +557,24 @@ mod test { // String expression that we output must be parsable for conflict resolution. let tests = vec![ - simple!( - Expr::Cast(Cast { + ParseTest { + expr: Expr::Cast(Cast { expr: Box::new(lit(1_i64)), data_type: ArrowDataType::Int32 }), - "arrow_cast(1, 'Int32')".to_string() - ), + expected: "arrow_cast(1, 'Int32')".to_string(), + override_expected_expr: Some( + datafusion_expr::Expr::ScalarFunction( + ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(arrow_cast()), + args: vec![ + lit(ScalarValue::Int64(Some(1))), + lit(ScalarValue::Utf8(Some("Int32".into()))) + ] + } + ) + ), + }, simple!( Expr::Column(Column::from_qualified_name_ignore_case("Value3")).eq(lit(3_i64)), "Value3 = 3".to_string() @@ -638,9 +653,8 @@ mod test { substring(col("modified"), lit(0_i64), lit(4_i64)).eq(lit("2021")), "substr(modified, 0, 4) = '2021'".to_string() ), - simple!( - col("value") - .cast_to( + ParseTest { + expr: col("value").cast_to( &arrow_schema::DataType::Utf8, &table .snapshot() @@ -654,8 +668,23 @@ mod test { ) .unwrap() .eq(lit("1")), - "arrow_cast(value, 'Utf8') = '1'".to_string() - ), + expected: "arrow_cast(value, 'Utf8') = '1'".to_string(), + override_expected_expr: Some( + datafusion_expr::Expr::BinaryExpr(BinaryExpr { + left: Box::new(datafusion_expr::Expr::ScalarFunction( + ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(arrow_cast()), + args: vec![ + col("value"), + lit(ScalarValue::Utf8(Some("Utf8".into()))) + ] + } + )), + op: datafusion_expr::Operator::Eq, + right: Box::new(lit(ScalarValue::Utf8(Some("1".into())))) + }) + ), + }, simple!( col("_struct").field("a").eq(lit(20_i64)), "_struct['a'] = 20".to_string() @@ -676,11 +705,16 @@ mod test { expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))), expected: "_timestamp_ntz > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, None)')".to_string(), override_expected_expr: Some(col("_timestamp_ntz").gt( - datafusion_expr::Expr::Cast( Cast { - expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), - data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None) - } - ))), + datafusion_expr::Expr::ScalarFunction( + ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(arrow_cast()), + args: vec![ + lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))), + lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, None)".into()))) + ] + } + ) + )), }, ParseTest { expr: col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond( @@ -689,10 +723,16 @@ mod test { ))), expected: "_timestamp > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, Some(\"UTC\"))')".to_string(), override_expected_expr: Some(col("_timestamp").gt( - datafusion_expr::Expr::Cast( Cast { - expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), - data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into())) - }))), + datafusion_expr::Expr::ScalarFunction( + ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(arrow_cast()), + args: vec![ + lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))), + lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, Some(\"UTC\"))".into()))) + ] + } + ) + )), }, ]; From d81fb16e122fb2876406bd9ee60958c29ff59da3 Mon Sep 17 00:00:00 2001 From: Luis Alberto Santos Date: Fri, 26 Apr 2024 18:43:09 +0200 Subject: [PATCH 4/5] fix test_delete_nested --- Cargo.toml | 22 +++--- .../src/delta_datafusion/find_files/mod.rs | 7 +- crates/core/src/delta_datafusion/mod.rs | 68 ++++++++++++++++--- crates/core/src/operations/delete.rs | 11 ++- .../core/src/operations/transaction/state.rs | 2 +- crates/core/src/operations/update.rs | 10 +-- crates/core/src/operations/write.rs | 12 ++-- 7 files changed, 87 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 932685c2cf..ba72bab789 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,5 @@ [workspace] -members = [ - "crates/*", - "delta-inspect", - "python", -] +members = ["crates/*", "delta-inspect", "python"] exclude = ["proofs"] resolver = "2" @@ -46,14 +42,14 @@ object_store = { version = "0.9" } parquet = { version = "51" } # datafusion -datafusion = { version = "37" } -datafusion-expr = { version = "37" } -datafusion-common = { version = "37" } -datafusion-proto = { version = "37" } -datafusion-sql = { version = "37" } -datafusion-physical-expr = { version = "37" } -datafusion-functions = { version = "37" } -datafusion-functions-array = { version = "37" } +datafusion = { version = "37.1" } +datafusion-expr = { version = "37.1" } +datafusion-common = { version = "37.1" } +datafusion-proto = { version = "37.1" } +datafusion-sql = { version = "37.1" } +datafusion-physical-expr = { version = "37.1" } +datafusion-functions = { version = "37.1" } +datafusion-functions-array = { version = "37.1" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 65d113ee5b..2e8d26dee3 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -17,7 +17,6 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, Phy use datafusion::prelude::SessionContext; use datafusion_common::{DFSchemaRef, Result, ToDFSchema}; use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode}; -use datafusion_physical_expr::create_physical_expr; use lazy_static::lazy_static; use crate::delta_datafusion::find_files::logical::FindFilesNode; @@ -29,6 +28,8 @@ use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTableError; +use super::create_physical_expr_fix; + pub mod logical; pub mod physical; @@ -160,8 +161,8 @@ async fn scan_table_by_files( let input_schema = scan.logical_schema.as_ref().to_owned(); let input_dfschema = input_schema.clone().try_into()?; - let predicate_expr = create_physical_expr( - &Expr::IsTrue(Box::new(expression.clone())), + let predicate_expr = create_physical_expr_fix( + Expr::IsTrue(Box::new(expression.clone())), &input_dfschema, state.execution_props(), )?; diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 81d3d2c296..04f13f5199 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -57,7 +57,7 @@ use datafusion::physical_plan::{ Statistics, }; use datafusion_common::scalar::ScalarValue; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult, ToDFSchema, @@ -65,9 +65,14 @@ use datafusion_common::{ use datafusion_expr::expr::ScalarFunction; use datafusion_expr::logical_plan::CreateExternalTable; use datafusion_expr::utils::conjunction; -use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility}; +use datafusion_expr::{ + col, Expr, Extension, GetFieldAccess, GetIndexedField, LogicalPlan, + TableProviderFilterPushDown, Volatility, +}; +use datafusion_functions::expr_fn::get_field; +use datafusion_functions_array::extract::{array_element, array_slice}; use datafusion_physical_expr::execution_props::ExecutionProps; -use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; +use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_sql::planner::ParserOptions; @@ -247,7 +252,7 @@ pub(crate) fn files_matching_predicate<'a>( if let Some(Some(predicate)) = (!filters.is_empty()).then_some(conjunction(filters.iter().cloned())) { - let expr = logical_expr_to_physical_expr(&predicate, snapshot.arrow_schema()?.as_ref()); + let expr = logical_expr_to_physical_expr(predicate, snapshot.arrow_schema()?.as_ref()); let pruning_predicate = PruningPredicate::try_new(expr, snapshot.arrow_schema()?)?; Ok(Either::Left( snapshot @@ -527,7 +532,7 @@ impl<'a> DeltaScanBuilder<'a> { let logical_filter = self .filter - .map(|expr| logical_expr_to_physical_expr(&expr, &logical_schema)); + .map(|expr| logical_expr_to_physical_expr(expr, &logical_schema)); // Perform Pruning of files to scan let files = match self.files { @@ -1038,12 +1043,57 @@ pub(crate) fn to_correct_scalar_value( } pub(crate) fn logical_expr_to_physical_expr( - expr: &Expr, + expr: Expr, schema: &ArrowSchema, ) -> Arc { let df_schema = schema.clone().to_dfschema().unwrap(); let execution_props = ExecutionProps::new(); - create_physical_expr(expr, &df_schema, &execution_props).unwrap() + create_physical_expr_fix(expr, &df_schema, &execution_props).unwrap() +} + +// TODO This should be removed after datafusion v38 +pub(crate) fn create_physical_expr_fix( + expr: Expr, + input_dfschema: &DFSchema, + execution_props: &ExecutionProps, +) -> Result, DataFusionError> { + // Support Expr::struct by rewriting expressions. + let expr = expr + .transform_up(&|expr| { + // see https://github.com/apache/datafusion/issues/10181 + // This is part of the function rewriter code in DataFusion inlined here temporarily + Ok(match expr { + Expr::GetIndexedField(GetIndexedField { + expr, + field: GetFieldAccess::NamedStructField { name }, + }) => { + let name = Expr::Literal(name); + Transformed::yes(get_field(*expr, name)) + } + // expr[idx] ==> array_element(expr, idx) + Expr::GetIndexedField(GetIndexedField { + expr, + field: GetFieldAccess::ListIndex { key }, + }) => Transformed::yes(array_element(*expr, *key)), + + // expr[start, stop, stride] ==> array_slice(expr, start, stop, stride) + Expr::GetIndexedField(GetIndexedField { + expr, + field: + GetFieldAccess::ListRange { + start, + stop, + stride, + }, + }) => Transformed::yes(array_slice(*expr, *start, *stop, *stride)), + + _ => Transformed::no(expr), + }) + }) + .unwrap() + .data; + + datafusion_physical_expr::create_physical_expr(&expr, input_dfschema, execution_props) } pub(crate) async fn execute_plan_to_batch( @@ -1478,8 +1528,8 @@ pub(crate) async fn find_files_scan<'a>( let input_schema = scan.logical_schema.as_ref().to_owned(); let input_dfschema = input_schema.clone().try_into()?; - let predicate_expr = create_physical_expr( - &Expr::IsTrue(Box::new(expression.clone())), + let predicate_expr = create_physical_expr_fix( + Expr::IsTrue(Box::new(expression.clone())), &input_dfschema, state.execution_props(), )?; diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 6fb76891b1..8d13d51b4e 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -23,7 +23,6 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH}; use crate::logstore::LogStoreRef; use datafusion::execution::context::{SessionContext, SessionState}; -use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; @@ -37,7 +36,8 @@ use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ - find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext, + create_physical_expr_fix, find_files, register_store, DataFusionMixins, DeltaScanBuilder, + DeltaSessionContext, }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; @@ -148,11 +148,8 @@ async fn excute_non_empty_expr( // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); - let predicate_expr = create_physical_expr( - &negated_expression, - &input_dfschema, - state.execution_props(), - )?; + let predicate_expr = + create_physical_expr_fix(negated_expression, &input_dfschema, state.execution_props())?; let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index 60bfcbccf4..cf8255d4bf 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -144,7 +144,7 @@ impl<'a> AddContainer<'a> { /// so evaluating expressions is inexact. However, excluded files are guaranteed (for a correct log) /// to not contain matches by the predicate expression. pub fn predicate_matches(&self, predicate: Expr) -> DeltaResult> { - let expr = logical_expr_to_physical_expr(&predicate, &self.schema); + let expr = logical_expr_to_physical_expr(predicate, &self.schema); let pruning_predicate = PruningPredicate::try_new(expr, self.schema.clone())?; Ok(self .inner diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index b0406fa9a5..9f4f6d51a3 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -34,7 +34,6 @@ use datafusion::{ use datafusion_common::{Column, DFSchema, ScalarValue}; use datafusion_expr::{case, col, lit, when, Expr}; use datafusion_physical_expr::{ - create_physical_expr, expressions::{self}, PhysicalExpr, }; @@ -49,8 +48,8 @@ use super::{ transaction::{CommitBuilder, CommitProperties}, }; use crate::delta_datafusion::{ - expr::fmt_expr_to_sql, physical::MetricObserverExec, DataFusionMixins, DeltaColumn, - DeltaSessionContext, + create_physical_expr_fix, expr::fmt_expr_to_sql, physical::MetricObserverExec, + DataFusionMixins, DeltaColumn, DeltaSessionContext, }; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::kernel::{Action, Remove}; @@ -265,7 +264,8 @@ async fn execute( let predicate_null = when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?; - let predicate_expr = create_physical_expr(&predicate_null, &input_dfschema, execution_props)?; + let predicate_expr = + create_physical_expr_fix(predicate_null, &input_dfschema, execution_props)?; expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); let projection_predicate: Arc = @@ -312,7 +312,7 @@ async fn execute( let expr = case(col("__delta_rs_update_predicate")) .when(lit(true), expr.to_owned()) .otherwise(col(column.to_owned()))?; - let predicate_expr = create_physical_expr(&expr, &input_dfschema, execution_props)?; + let predicate_expr = create_physical_expr_fix(expr, &input_dfschema, execution_props)?; map.insert(column.name.clone(), expressions.len()); let c = "__delta_rs_".to_string() + &column.name; expressions.push((predicate_expr, c.clone())); diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index c750c23da5..475abefbe2 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -34,7 +34,6 @@ use arrow_array::RecordBatch; use arrow_cast::can_cast_types; use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef}; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; -use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; use datafusion_common::DFSchema; @@ -49,7 +48,9 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::CreateBuilder; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::expr::parse_predicate_expression; -use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; +use crate::delta_datafusion::{ + create_physical_expr_fix, find_files, register_store, DeltaScanBuilder, +}; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, StructType}; @@ -478,11 +479,8 @@ async fn execute_non_empty_expr( // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); - let predicate_expr = create_physical_expr( - &negated_expression, - &input_dfschema, - state.execution_props(), - )?; + let predicate_expr = + create_physical_expr_fix(negated_expression, &input_dfschema, state.execution_props())?; let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); From ce7c9e67acad4b11dd2ea2316e4adbc257af94b3 Mon Sep 17 00:00:00 2001 From: Luis Alberto Santos Date: Fri, 26 Apr 2024 18:46:14 +0200 Subject: [PATCH 5/5] removed unwrap --- crates/core/src/delta_datafusion/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 04f13f5199..178133f54d 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -1089,8 +1089,7 @@ pub(crate) fn create_physical_expr_fix( _ => Transformed::no(expr), }) - }) - .unwrap() + })? .data; datafusion_physical_expr::create_physical_expr(&expr, input_dfschema, execution_props)