diff --git a/Cargo.toml b/Cargo.toml index 4cc6bad1b9..e5057e8d5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,14 +45,14 @@ object_store = { version = "0.9" } parquet = { version = "51" } # datafusion -datafusion = { version = "37.1" } -datafusion-expr = { version = "37.1" } -datafusion-common = { version = "37.1" } -datafusion-proto = { version = "37.1" } -datafusion-sql = { version = "37.1" } -datafusion-physical-expr = { version = "37.1" } -datafusion-functions = { version = "37.1" } -datafusion-functions-array = { version = "37.1" } +datafusion = { version = "38.0" } +datafusion-expr = { version = "38.0" } +datafusion-common = { version = "38.0" } +datafusion-proto = { version = "38.0" } +datafusion-sql = { version = "38.0" } +datafusion-physical-expr = { version = "38.0" } +datafusion-functions = { version = "38.0" } +datafusion-functions-array = { version = "38.0" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index bb178a192d..2465e23d94 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -7,9 +7,10 @@ use arrow::datatypes::Schema as ArrowSchema; use arrow_array::{RecordBatch, StringArray, UInt32Array}; use chrono::Duration; use clap::{command, Args, Parser, Subcommand}; +use datafusion::functions::expr_fn::random; use datafusion::{datasource::MemTable, prelude::DataFrame}; use datafusion_common::DataFusionError; -use datafusion_expr::{cast, col, lit, random}; +use datafusion_expr::{cast, col, lit}; use deltalake_core::protocol::SaveMode; use deltalake_core::{ arrow::{ diff --git a/crates/core/src/delta_datafusion/cdf/scan_utils.rs b/crates/core/src/delta_datafusion/cdf/scan_utils.rs index 434afa4f74..b7c890a7b1 100644 --- a/crates/core/src/delta_datafusion/cdf/scan_utils.rs +++ b/crates/core/src/delta_datafusion/cdf/scan_utils.rs @@ -80,6 +80,7 @@ pub fn create_partition_values( last_modified: chrono::Utc.timestamp_nanos(0), version: None, }, + statistics: None, partition_values: new_part_values.clone(), extensions: None, range: None, diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 41e6a84b4f..0c2e198f17 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -429,10 +429,11 @@ mod test { use datafusion_common::{Column, ScalarValue, ToDFSchema}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{ - col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition, + col, lit, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition, }; use datafusion_functions::core::arrow_cast; use datafusion_functions::encoding::expr_fn::decode; + use datafusion_functions::expr_fn::substring; use datafusion_functions_array::expr_fn::cardinality; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 9c87411973..6a1ed118f4 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -699,11 +699,11 @@ impl TableProvider for DeltaTable { Ok(Arc::new(scan)) } - fn supports_filter_pushdown( + fn supports_filters_pushdown( &self, - _filter: &Expr, - ) -> DataFusionResult { - Ok(TableProviderFilterPushDown::Inexact) + _filter: &[&Expr], + ) -> DataFusionResult> { + Ok(vec![TableProviderFilterPushDown::Inexact]) } fn statistics(&self) -> Option { @@ -778,11 +778,11 @@ impl TableProvider for DeltaTableProvider { Ok(Arc::new(scan)) } - fn supports_filter_pushdown( + fn supports_filters_pushdown( &self, - _filter: &Expr, - ) -> DataFusionResult { - Ok(TableProviderFilterPushDown::Inexact) + _filter: &[&Expr], + ) -> DataFusionResult> { + Ok(vec![TableProviderFilterPushDown::Inexact]) } fn statistics(&self) -> Option { @@ -990,6 +990,7 @@ pub(crate) fn partitioned_file_from_action( ..action.try_into().unwrap() }, partition_values, + statistics: None, range: None, extensions: None, } @@ -1426,14 +1427,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { | Expr::TryCast(_) => (), Expr::ScalarFunction(ScalarFunction { func_def, .. }) => { let v = match func_def { - datafusion_expr::ScalarFunctionDefinition::BuiltIn(f) => f.volatility(), datafusion_expr::ScalarFunctionDefinition::UDF(u) => u.signature().volatility, - datafusion_expr::ScalarFunctionDefinition::Name(n) => { - self.result = Err(DeltaTableError::Generic(format!( - "Cannot determine volatility of find files predicate function {n}", - ))); - return Ok(TreeNodeRecursion::Stop); - } }; if v > Volatility::Immutable { self.result = Err(DeltaTableError::Generic(format!( @@ -1900,6 +1894,7 @@ mod tests { version: None, }, partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(), + statistics: None, range: None, extensions: None, }; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 6c783bc9b4..efc54c1869 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -502,7 +502,7 @@ impl MergeOperation { relation: Some(TableReference::Bare { table }), name, } => { - if table.eq(alias) { + if table.as_ref() == alias { Column { relation: Some(r), name, @@ -863,8 +863,8 @@ async fn try_construct_early_filter( table_snapshot: &DeltaTableState, session_state: &SessionState, source: &LogicalPlan, - source_name: &TableReference<'_>, - target_name: &TableReference<'_>, + source_name: &TableReference, + target_name: &TableReference, ) -> DeltaResult> { let table_metadata = table_snapshot.metadata(); let partition_columns = &table_metadata.partition_columns; @@ -1324,9 +1324,9 @@ async fn execute( let plan = projection.into_unoptimized_plan(); let mut fields: Vec = plan .schema() - .fields() + .columns() .iter() - .map(|f| col(f.qualified_column())) + .map(|f| col(f.clone())) .collect(); fields.extend(new_columns.into_iter().map(|(name, ex)| ex.alias(name))); diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 31946d104e..a5d2e63bc8 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -269,6 +269,7 @@ async fn execute( let mut expressions: Vec<(Arc, String)> = Vec::new(); let scan_schema = scan.schema(); for (i, field) in scan_schema.fields().into_iter().enumerate() { + println!("GIN: {:#?}", field.name()); expressions.push(( Arc::new(expressions::Column::new(field.name(), i)), field.name().to_owned(), @@ -348,12 +349,14 @@ async fn execute( if !control_columns.contains(field.name()) { match map.get(field.name()) { Some(value) => { + println!("PUSH {:#?}", field.name()); expressions.push(( Arc::new(expressions::Column::new(field.name(), *value)), field.name().to_owned(), )); } None => { + println!("NONE {:#?}", field.name()); expressions.push(( Arc::new(expressions::Column::new(field.name(), i)), field.name().to_owned(), diff --git a/crates/sql/src/logical_plan.rs b/crates/sql/src/logical_plan.rs index 164462a90c..dacc41901a 100644 --- a/crates/sql/src/logical_plan.rs +++ b/crates/sql/src/logical_plan.rs @@ -1,7 +1,7 @@ use std::fmt::{self, Debug, Display}; use std::sync::Arc; -use datafusion_common::{DFSchema, DFSchemaRef, OwnedTableReference}; +use datafusion_common::{DFSchema, DFSchemaRef, TableReference}; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Expr, UserDefinedLogicalNodeCore}; @@ -107,7 +107,7 @@ impl UserDefinedLogicalNodeCore for DeltaStatement { #[derive(Clone, PartialEq, Eq, Hash)] pub struct Vacuum { /// A reference to the table being vacuumed - pub table: OwnedTableReference, + pub table: TableReference, /// The retention threshold. pub retention_hours: Option, /// Return a list of up to 1000 files to be deleted. @@ -117,7 +117,7 @@ pub struct Vacuum { } impl Vacuum { - pub fn new(table: OwnedTableReference, retention_hours: Option, dry_run: bool) -> Self { + pub fn new(table: TableReference, retention_hours: Option, dry_run: bool) -> Self { Self { table, retention_hours, @@ -133,13 +133,13 @@ impl Vacuum { #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeHistory { /// A reference to the table - pub table: OwnedTableReference, + pub table: TableReference, /// Schema for commit provenence information pub schema: DFSchemaRef, } impl DescribeHistory { - pub fn new(table: OwnedTableReference) -> Self { + pub fn new(table: TableReference) -> Self { Self { table, // TODO: add proper schema @@ -153,13 +153,13 @@ impl DescribeHistory { #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeDetails { /// A reference to the table - pub table: OwnedTableReference, + pub table: TableReference, /// Schema for commit provenence information pub schema: DFSchemaRef, } impl DescribeDetails { - pub fn new(table: OwnedTableReference) -> Self { + pub fn new(table: TableReference) -> Self { Self { table, // TODO: add proper schema @@ -172,13 +172,13 @@ impl DescribeDetails { #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeFiles { /// A reference to the table - pub table: OwnedTableReference, + pub table: TableReference, /// Schema for commit provenence information pub schema: DFSchemaRef, } impl DescribeFiles { - pub fn new(table: OwnedTableReference) -> Self { + pub fn new(table: TableReference) -> Self { Self { table, // TODO: add proper schema diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 0be14d59b0..baab05f9f6 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use datafusion_common::{OwnedTableReference, Result as DFResult}; +use datafusion_common::{Result as DFResult, TableReference}; use datafusion_expr::logical_plan::{Extension, LogicalPlan}; use datafusion_sql::planner::{ object_name_to_table_reference, ContextProvider, IdentNormalizer, ParserOptions, SqlToRel, @@ -54,7 +54,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { fn vacuum_to_plan(&self, vacuum: VacuumStatement) -> DFResult { let table_ref = self.object_name_to_table_reference(vacuum.table)?; let plan = DeltaStatement::Vacuum(Vacuum::new( - table_ref.to_owned_reference(), + table_ref.clone(), vacuum.retention_hours, vacuum.dry_run, )); @@ -65,8 +65,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { fn describe_to_plan(&self, describe: DescribeStatement) -> DFResult { let table_ref = self.object_name_to_table_reference(describe.table)?; - let plan = - DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.to_owned_reference())); + let plan = DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.clone())); Ok(LogicalPlan::Extension(Extension { node: Arc::new(plan), })) @@ -75,7 +74,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { pub(crate) fn object_name_to_table_reference( &self, object_name: ObjectName, - ) -> DFResult { + ) -> DFResult { object_name_to_table_reference(object_name, self.options.enable_ident_normalization) } }