Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade to Datafusion 38 #2499

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ object_store = { version = "0.9" }
parquet = { version = "51" }

# datafusion
datafusion = { version = "37.1" }
datafusion-expr = { version = "37.1" }
datafusion-common = { version = "37.1" }
datafusion-proto = { version = "37.1" }
datafusion-sql = { version = "37.1" }
datafusion-physical-expr = { version = "37.1" }
datafusion-functions = { version = "37.1" }
datafusion-functions-array = { version = "37.1" }
datafusion = { version = "38.0" }
datafusion-expr = { version = "38.0" }
datafusion-common = { version = "38.0" }
datafusion-proto = { version = "38.0" }
datafusion-sql = { version = "38.0" }
datafusion-physical-expr = { version = "38.0" }
datafusion-functions = { version = "38.0" }
datafusion-functions-array = { version = "38.0" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use arrow::datatypes::Schema as ArrowSchema;
use arrow_array::{RecordBatch, StringArray, UInt32Array};
use chrono::Duration;
use clap::{command, Args, Parser, Subcommand};
use datafusion::functions::expr_fn::random;
use datafusion::{datasource::MemTable, prelude::DataFrame};
use datafusion_common::DataFusionError;
use datafusion_expr::{cast, col, lit, random};
use datafusion_expr::{cast, col, lit};
use deltalake_core::protocol::SaveMode;
use deltalake_core::{
arrow::{
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/delta_datafusion/cdf/scan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub fn create_partition_values<F: FileAction>(
last_modified: chrono::Utc.timestamp_nanos(0),
version: None,
},
statistics: None,
partition_values: new_part_values.clone(),
extensions: None,
range: None,
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,11 @@ mod test {
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{
col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition,
col, lit, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition,
};
use datafusion_functions::core::arrow_cast;
use datafusion_functions::encoding::expr_fn::decode;
use datafusion_functions::expr_fn::substring;
use datafusion_functions_array::expr_fn::cardinality;

use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
Expand Down
25 changes: 10 additions & 15 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,11 +699,11 @@ impl TableProvider for DeltaTable {
Ok(Arc::new(scan))
}

fn supports_filter_pushdown(
fn supports_filters_pushdown(
&self,
_filter: &Expr,
) -> DataFusionResult<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Inexact)
_filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact])
}

fn statistics(&self) -> Option<Statistics> {
Expand Down Expand Up @@ -778,11 +778,11 @@ impl TableProvider for DeltaTableProvider {
Ok(Arc::new(scan))
}

fn supports_filter_pushdown(
fn supports_filters_pushdown(
&self,
_filter: &Expr,
) -> DataFusionResult<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Inexact)
_filter: &[&Expr],
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact])
}

fn statistics(&self) -> Option<Statistics> {
Expand Down Expand Up @@ -990,6 +990,7 @@ pub(crate) fn partitioned_file_from_action(
..action.try_into().unwrap()
},
partition_values,
statistics: None,
range: None,
extensions: None,
}
Expand Down Expand Up @@ -1426,14 +1427,7 @@ impl TreeNodeVisitor for FindFilesExprProperties {
| Expr::TryCast(_) => (),
Expr::ScalarFunction(ScalarFunction { func_def, .. }) => {
let v = match func_def {
datafusion_expr::ScalarFunctionDefinition::BuiltIn(f) => f.volatility(),
datafusion_expr::ScalarFunctionDefinition::UDF(u) => u.signature().volatility,
datafusion_expr::ScalarFunctionDefinition::Name(n) => {
self.result = Err(DeltaTableError::Generic(format!(
"Cannot determine volatility of find files predicate function {n}",
)));
return Ok(TreeNodeRecursion::Stop);
}
};
if v > Volatility::Immutable {
self.result = Err(DeltaTableError::Generic(format!(
Expand Down Expand Up @@ -1900,6 +1894,7 @@ mod tests {
version: None,
},
partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
statistics: None,
range: None,
extensions: None,
};
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl MergeOperation {
relation: Some(TableReference::Bare { table }),
name,
} => {
if table.eq(alias) {
if table.as_ref() == alias {
Column {
relation: Some(r),
name,
Expand Down Expand Up @@ -863,8 +863,8 @@ async fn try_construct_early_filter(
table_snapshot: &DeltaTableState,
session_state: &SessionState,
source: &LogicalPlan,
source_name: &TableReference<'_>,
target_name: &TableReference<'_>,
source_name: &TableReference,
target_name: &TableReference,
) -> DeltaResult<Option<Expr>> {
let table_metadata = table_snapshot.metadata();
let partition_columns = &table_metadata.partition_columns;
Expand Down Expand Up @@ -1324,9 +1324,9 @@ async fn execute(
let plan = projection.into_unoptimized_plan();
let mut fields: Vec<Expr> = plan
.schema()
.fields()
.columns()
.iter()
.map(|f| col(f.qualified_column()))
.map(|f| col(f.clone()))
.collect();

fields.extend(new_columns.into_iter().map(|(name, ex)| ex.alias(name)));
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ async fn execute(
let mut expressions: Vec<(Arc<dyn PhysicalExpr>, String)> = Vec::new();
let scan_schema = scan.schema();
for (i, field) in scan_schema.fields().into_iter().enumerate() {
println!("GIN: {:#?}", field.name());
expressions.push((
Arc::new(expressions::Column::new(field.name(), i)),
field.name().to_owned(),
Expand Down Expand Up @@ -348,12 +349,14 @@ async fn execute(
if !control_columns.contains(field.name()) {
match map.get(field.name()) {
Some(value) => {
println!("PUSH {:#?}", field.name());
expressions.push((
Arc::new(expressions::Column::new(field.name(), *value)),
field.name().to_owned(),
));
}
None => {
println!("NONE {:#?}", field.name());
expressions.push((
Arc::new(expressions::Column::new(field.name(), i)),
field.name().to_owned(),
Expand Down
18 changes: 9 additions & 9 deletions crates/sql/src/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::{self, Debug, Display};
use std::sync::Arc;

use datafusion_common::{DFSchema, DFSchemaRef, OwnedTableReference};
use datafusion_common::{DFSchema, DFSchemaRef, TableReference};
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{Expr, UserDefinedLogicalNodeCore};

Expand Down Expand Up @@ -107,7 +107,7 @@ impl UserDefinedLogicalNodeCore for DeltaStatement {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct Vacuum {
/// A reference to the table being vacuumed
pub table: OwnedTableReference,
pub table: TableReference,
/// The retention threshold.
pub retention_hours: Option<i32>,
/// Return a list of up to 1000 files to be deleted.
Expand All @@ -117,7 +117,7 @@ pub struct Vacuum {
}

impl Vacuum {
pub fn new(table: OwnedTableReference, retention_hours: Option<i32>, dry_run: bool) -> Self {
pub fn new(table: TableReference, retention_hours: Option<i32>, dry_run: bool) -> Self {
Self {
table,
retention_hours,
Expand All @@ -133,13 +133,13 @@ impl Vacuum {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DescribeHistory {
/// A reference to the table
pub table: OwnedTableReference,
pub table: TableReference,
/// Schema for commit provenence information
pub schema: DFSchemaRef,
}

impl DescribeHistory {
pub fn new(table: OwnedTableReference) -> Self {
pub fn new(table: TableReference) -> Self {
Self {
table,
// TODO: add proper schema
Expand All @@ -153,13 +153,13 @@ impl DescribeHistory {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DescribeDetails {
/// A reference to the table
pub table: OwnedTableReference,
pub table: TableReference,
/// Schema for commit provenence information
pub schema: DFSchemaRef,
}

impl DescribeDetails {
pub fn new(table: OwnedTableReference) -> Self {
pub fn new(table: TableReference) -> Self {
Self {
table,
// TODO: add proper schema
Expand All @@ -172,13 +172,13 @@ impl DescribeDetails {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DescribeFiles {
/// A reference to the table
pub table: OwnedTableReference,
pub table: TableReference,
/// Schema for commit provenence information
pub schema: DFSchemaRef,
}

impl DescribeFiles {
pub fn new(table: OwnedTableReference) -> Self {
pub fn new(table: TableReference) -> Self {
Self {
table,
// TODO: add proper schema
Expand Down
9 changes: 4 additions & 5 deletions crates/sql/src/planner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use datafusion_common::{OwnedTableReference, Result as DFResult};
use datafusion_common::{Result as DFResult, TableReference};
use datafusion_expr::logical_plan::{Extension, LogicalPlan};
use datafusion_sql::planner::{
object_name_to_table_reference, ContextProvider, IdentNormalizer, ParserOptions, SqlToRel,
Expand Down Expand Up @@ -54,7 +54,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {
fn vacuum_to_plan(&self, vacuum: VacuumStatement) -> DFResult<LogicalPlan> {
let table_ref = self.object_name_to_table_reference(vacuum.table)?;
let plan = DeltaStatement::Vacuum(Vacuum::new(
table_ref.to_owned_reference(),
table_ref.clone(),
vacuum.retention_hours,
vacuum.dry_run,
));
Expand All @@ -65,8 +65,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {

fn describe_to_plan(&self, describe: DescribeStatement) -> DFResult<LogicalPlan> {
let table_ref = self.object_name_to_table_reference(describe.table)?;
let plan =
DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.to_owned_reference()));
let plan = DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.clone()));
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(plan),
}))
Expand All @@ -75,7 +74,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {
pub(crate) fn object_name_to_table_reference(
&self,
object_name: ObjectName,
) -> DFResult<OwnedTableReference> {
) -> DFResult<TableReference> {
object_name_to_table_reference(object_name, self.options.enable_ident_normalization)
}
}
Expand Down
Loading