From 855955c8149ec16e2b6f1ca17d80259df751cc72 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 11 May 2024 14:50:19 +0000 Subject: [PATCH 1/4] chore: upgrade to Datafusion 38 --- Cargo.toml | 16 ++++++------ .../src/delta_datafusion/cdf/scan_utils.rs | 1 + crates/core/src/delta_datafusion/expr.rs | 3 ++- crates/core/src/delta_datafusion/mod.rs | 25 ++++++++----------- crates/core/src/operations/merge/mod.rs | 10 ++++---- crates/sql/src/logical_plan.rs | 18 ++++++------- crates/sql/src/planner.rs | 8 +++--- 7 files changed, 39 insertions(+), 42 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6168a500fd..9a40fccafa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,14 +42,14 @@ object_store = { version = "0.9" } parquet = { version = "51" } # datafusion -datafusion = { version = "37.1" } -datafusion-expr = { version = "37.1" } -datafusion-common = { version = "37.1" } -datafusion-proto = { version = "37.1" } -datafusion-sql = { version = "37.1" } -datafusion-physical-expr = { version = "37.1" } -datafusion-functions = { version = "37.1" } -datafusion-functions-array = { version = "37.1" } +datafusion = { version = "38.0" } +datafusion-expr = { version = "38.0" } +datafusion-common = { version = "38.0" } +datafusion-proto = { version = "38.0" } +datafusion-sql = { version = "38.0" } +datafusion-physical-expr = { version = "38.0" } +datafusion-functions = { version = "38.0" } +datafusion-functions-array = { version = "38.0" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/src/delta_datafusion/cdf/scan_utils.rs b/crates/core/src/delta_datafusion/cdf/scan_utils.rs index 434afa4f74..b7c890a7b1 100644 --- a/crates/core/src/delta_datafusion/cdf/scan_utils.rs +++ b/crates/core/src/delta_datafusion/cdf/scan_utils.rs @@ -80,6 +80,7 @@ pub fn create_partition_values( last_modified: chrono::Utc.timestamp_nanos(0), version: None, }, + statistics: None, partition_values: new_part_values.clone(), extensions: None, range: None, diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 868969c571..38ab2f0a8d 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -429,10 +429,11 @@ mod test { use datafusion_common::{Column, ScalarValue, ToDFSchema}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{ - col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition, + col, lit, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition, }; use datafusion_functions::core::arrow_cast; use datafusion_functions::encoding::expr_fn::decode; + use datafusion_functions::expr_fn::substring; use datafusion_functions_array::expr_fn::cardinality; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index c1b6208cff..0bc602a5c1 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -697,11 +697,11 @@ impl TableProvider for DeltaTable { Ok(Arc::new(scan)) } - fn supports_filter_pushdown( + fn supports_filters_pushdown( &self, - _filter: &Expr, - ) -> DataFusionResult { - Ok(TableProviderFilterPushDown::Inexact) + _filter: &[&Expr], + ) -> DataFusionResult> { + Ok(vec![TableProviderFilterPushDown::Inexact]) } fn statistics(&self) -> Option { @@ -776,11 +776,11 @@ impl TableProvider for DeltaTableProvider { Ok(Arc::new(scan)) } - fn supports_filter_pushdown( + fn supports_filters_pushdown( &self, - _filter: &Expr, - ) -> DataFusionResult { - Ok(TableProviderFilterPushDown::Inexact) + _filter: &[&Expr], + ) -> DataFusionResult> { + Ok(vec![TableProviderFilterPushDown::Inexact]) } fn statistics(&self) -> Option { @@ -988,6 +988,7 @@ pub(crate) fn partitioned_file_from_action( ..action.try_into().unwrap() }, partition_values, + statistics: None, range: None, extensions: None, } @@ -1424,14 +1425,7 @@ impl TreeNodeVisitor for FindFilesExprProperties { | Expr::TryCast(_) => (), Expr::ScalarFunction(ScalarFunction { func_def, .. }) => { let v = match func_def { - datafusion_expr::ScalarFunctionDefinition::BuiltIn(f) => f.volatility(), datafusion_expr::ScalarFunctionDefinition::UDF(u) => u.signature().volatility, - datafusion_expr::ScalarFunctionDefinition::Name(n) => { - self.result = Err(DeltaTableError::Generic(format!( - "Cannot determine volatility of find files predicate function {n}", - ))); - return Ok(TreeNodeRecursion::Stop); - } }; if v > Volatility::Immutable { self.result = Err(DeltaTableError::Generic(format!( @@ -1898,6 +1892,7 @@ mod tests { version: None, }, partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(), + statistics: None, range: None, extensions: None, }; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index ddbe113d16..160a6c25f0 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -502,7 +502,7 @@ impl MergeOperation { relation: Some(TableReference::Bare { table }), name, } => { - if table.eq(alias) { + if table.as_ref() == alias { Column { relation: Some(r), name, @@ -863,8 +863,8 @@ async fn try_construct_early_filter( table_snapshot: &DeltaTableState, session_state: &SessionState, source: &LogicalPlan, - source_name: &TableReference<'_>, - target_name: &TableReference<'_>, + source_name: &TableReference, + target_name: &TableReference, ) -> DeltaResult> { let table_metadata = table_snapshot.metadata(); let partition_columns = &table_metadata.partition_columns; @@ -1324,9 +1324,9 @@ async fn execute( let plan = projection.into_unoptimized_plan(); let mut fields: Vec = plan .schema() - .fields() + .columns() .iter() - .map(|f| col(f.qualified_column())) + .map(|f| col(f.clone())) .collect(); fields.extend(new_columns.into_iter().map(|(name, ex)| ex.alias(name))); diff --git a/crates/sql/src/logical_plan.rs b/crates/sql/src/logical_plan.rs index 164462a90c..dacc41901a 100644 --- a/crates/sql/src/logical_plan.rs +++ b/crates/sql/src/logical_plan.rs @@ -1,7 +1,7 @@ use std::fmt::{self, Debug, Display}; use std::sync::Arc; -use datafusion_common::{DFSchema, DFSchemaRef, OwnedTableReference}; +use datafusion_common::{DFSchema, DFSchemaRef, TableReference}; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Expr, UserDefinedLogicalNodeCore}; @@ -107,7 +107,7 @@ impl UserDefinedLogicalNodeCore for DeltaStatement { #[derive(Clone, PartialEq, Eq, Hash)] pub struct Vacuum { /// A reference to the table being vacuumed - pub table: OwnedTableReference, + pub table: TableReference, /// The retention threshold. pub retention_hours: Option, /// Return a list of up to 1000 files to be deleted. @@ -117,7 +117,7 @@ pub struct Vacuum { } impl Vacuum { - pub fn new(table: OwnedTableReference, retention_hours: Option, dry_run: bool) -> Self { + pub fn new(table: TableReference, retention_hours: Option, dry_run: bool) -> Self { Self { table, retention_hours, @@ -133,13 +133,13 @@ impl Vacuum { #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeHistory { /// A reference to the table - pub table: OwnedTableReference, + pub table: TableReference, /// Schema for commit provenence information pub schema: DFSchemaRef, } impl DescribeHistory { - pub fn new(table: OwnedTableReference) -> Self { + pub fn new(table: TableReference) -> Self { Self { table, // TODO: add proper schema @@ -153,13 +153,13 @@ impl DescribeHistory { #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeDetails { /// A reference to the table - pub table: OwnedTableReference, + pub table: TableReference, /// Schema for commit provenence information pub schema: DFSchemaRef, } impl DescribeDetails { - pub fn new(table: OwnedTableReference) -> Self { + pub fn new(table: TableReference) -> Self { Self { table, // TODO: add proper schema @@ -172,13 +172,13 @@ impl DescribeDetails { #[derive(Clone, PartialEq, Eq, Hash)] pub struct DescribeFiles { /// A reference to the table - pub table: OwnedTableReference, + pub table: TableReference, /// Schema for commit provenence information pub schema: DFSchemaRef, } impl DescribeFiles { - pub fn new(table: OwnedTableReference) -> Self { + pub fn new(table: TableReference) -> Self { Self { table, // TODO: add proper schema diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 0be14d59b0..6214125906 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use datafusion_common::{OwnedTableReference, Result as DFResult}; +use datafusion_common::{TableReference, Result as DFResult}; use datafusion_expr::logical_plan::{Extension, LogicalPlan}; use datafusion_sql::planner::{ object_name_to_table_reference, ContextProvider, IdentNormalizer, ParserOptions, SqlToRel, @@ -54,7 +54,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { fn vacuum_to_plan(&self, vacuum: VacuumStatement) -> DFResult { let table_ref = self.object_name_to_table_reference(vacuum.table)?; let plan = DeltaStatement::Vacuum(Vacuum::new( - table_ref.to_owned_reference(), + table_ref.clone(), vacuum.retention_hours, vacuum.dry_run, )); @@ -66,7 +66,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { fn describe_to_plan(&self, describe: DescribeStatement) -> DFResult { let table_ref = self.object_name_to_table_reference(describe.table)?; let plan = - DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.to_owned_reference())); + DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.clone())); Ok(LogicalPlan::Extension(Extension { node: Arc::new(plan), })) @@ -75,7 +75,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { pub(crate) fn object_name_to_table_reference( &self, object_name: ObjectName, - ) -> DFResult { + ) -> DFResult { object_name_to_table_reference(object_name, self.options.enable_ident_normalization) } } From 2fc08efb2c0ff338f115a853c186669fd2aeb3eb Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 15 May 2024 18:28:17 +0000 Subject: [PATCH 2/4] fixup! chore: upgrade to Datafusion 38 --- crates/benchmarks/src/bin/merge.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index bb178a192d..ef6a9019b5 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -9,7 +9,8 @@ use chrono::Duration; use clap::{command, Args, Parser, Subcommand}; use datafusion::{datasource::MemTable, prelude::DataFrame}; use datafusion_common::DataFusionError; -use datafusion_expr::{cast, col, lit, random}; +use datafusion_expr::{cast, col, lit}; +use datafusion::functions::expr_fn::random; use deltalake_core::protocol::SaveMode; use deltalake_core::{ arrow::{ From 68382e7f88ac6536cf4c6d54206ce40be6f18cd8 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 15 May 2024 18:28:25 +0000 Subject: [PATCH 3/4] fixup! fixup! chore: upgrade to Datafusion 38 --- crates/benchmarks/src/bin/merge.rs | 2 +- crates/sql/src/planner.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index ef6a9019b5..2465e23d94 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -7,10 +7,10 @@ use arrow::datatypes::Schema as ArrowSchema; use arrow_array::{RecordBatch, StringArray, UInt32Array}; use chrono::Duration; use clap::{command, Args, Parser, Subcommand}; +use datafusion::functions::expr_fn::random; use datafusion::{datasource::MemTable, prelude::DataFrame}; use datafusion_common::DataFusionError; use datafusion_expr::{cast, col, lit}; -use datafusion::functions::expr_fn::random; use deltalake_core::protocol::SaveMode; use deltalake_core::{ arrow::{ diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 6214125906..baab05f9f6 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use datafusion_common::{TableReference, Result as DFResult}; +use datafusion_common::{Result as DFResult, TableReference}; use datafusion_expr::logical_plan::{Extension, LogicalPlan}; use datafusion_sql::planner::{ object_name_to_table_reference, ContextProvider, IdentNormalizer, ParserOptions, SqlToRel, @@ -65,8 +65,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { fn describe_to_plan(&self, describe: DescribeStatement) -> DFResult { let table_ref = self.object_name_to_table_reference(describe.table)?; - let plan = - DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.clone())); + let plan = DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.clone())); Ok(LogicalPlan::Extension(Extension { node: Arc::new(plan), })) From bf9532a42d8fc165229321fb5dd0ca09add26334 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 26 May 2024 21:49:12 +0000 Subject: [PATCH 4/4] wip --- crates/core/src/operations/update.rs | 3 +++ crates/core/src/table/state_arrow.rs | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 9a088c6ae9..9920c41ea8 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -250,6 +250,7 @@ async fn execute( let mut expressions: Vec<(Arc, String)> = Vec::new(); let scan_schema = scan.schema(); for (i, field) in scan_schema.fields().into_iter().enumerate() { + println!("GIN: {:#?}", field.name()); expressions.push(( Arc::new(expressions::Column::new(field.name(), i)), field.name().to_owned(), @@ -328,12 +329,14 @@ async fn execute( if !control_columns.contains(field.name()) { match map.get(field.name()) { Some(value) => { + println!("PUSH {:#?}", field.name()); expressions.push(( Arc::new(expressions::Column::new(field.name(), *value)), field.name().to_owned(), )); } None => { + println!("NONE {:#?}", field.name()); expressions.push(( Arc::new(expressions::Column::new(field.name(), i)), field.name().to_owned(), diff --git a/crates/core/src/table/state_arrow.rs b/crates/core/src/table/state_arrow.rs index 9d23f3169f..fe35787cb4 100644 --- a/crates/core/src/table/state_arrow.rs +++ b/crates/core/src/table/state_arrow.rs @@ -572,7 +572,7 @@ impl DeltaTableState { // into StructArrays, until it is consolidated into a single array. columnar_stats = columnar_stats .into_iter() - .group_by(|col_stat| { + .chunk_by(|col_stat| { if col_stat.path.len() < level { col_stat.path.clone() } else {