From 23afe8660155246ab1b46c464f8de5927fdc4775 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 6 Jan 2025 22:09:44 -0500 Subject: [PATCH 1/7] chore: deprecate `ValuesExec` in favour of `MemoryExec` --- datafusion/core/src/physical_planner.rs | 2 + datafusion/physical-plan/src/memory.rs | 198 +++++++++++++++++++++++- datafusion/physical-plan/src/values.rs | 18 ++- 3 files changed, 214 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 47b31d2f4e2d..d389058cec80 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -54,6 +54,7 @@ use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::unnest::UnnestExec; +#[allow(deprecated)] // TODO: Remove in favour of MemoryExec use crate::physical_plan::values::ValuesExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ @@ -466,6 +467,7 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; + #[allow(deprecated)] // TODO: Remove in favour of MemoryExec let value_exec = ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?; Arc::new(value_exec) } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 521008ce9b02..123d67ad47fc 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -24,14 +24,17 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::{ - common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, Statistics, + common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream, + Statistics, }; use crate::execution_plan::{Boundedness, EmissionType}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::{internal_err, project_schema, Result}; +use arrow_array::RecordBatchOptions; +use arrow_schema::Schema; +use datafusion_common::{internal_err, plan_err, project_schema, Result, ScalarValue}; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::ProjectionMapping; @@ -174,6 +177,93 @@ impl MemoryExec { }) } + /// Create a new execution plan from a list of constant values (`ValuesExec`) + pub fn try_new_as_values( + schema: SchemaRef, + data: Vec>>, + ) -> Result { + if data.is_empty() { + return plan_err!("Values list cannot be empty"); + } + + let n_row = data.len(); + let n_col = schema.fields().len(); + + let placeholder_schema = Arc::new(Schema::empty()); + let placeholder_batch = RecordBatch::try_new_with_options( + placeholder_schema.clone(), + vec![], + &RecordBatchOptions::new().with_row_count(Some(1)), + )?; + + // Evaluate each column + let arrays = (0..n_col) + .map(|j| { + (0..n_row) + .map(|i| { + let expr = &data[i][j]; + let result = expr.evaluate(&placeholder_batch)?; + + match result { + ColumnarValue::Scalar(scalar) => Ok(scalar), + ColumnarValue::Array(array) if array.len() == 1 => { + ScalarValue::try_from_array(&array, 0) + } + ColumnarValue::Array(_) => { + plan_err!("Cannot have array values in a values list") + } + } + }) + .collect::>>() + .and_then(ScalarValue::iter_to_array) + }) + .collect::>>()?; + + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + arrays, + &RecordBatchOptions::new().with_row_count(Some(n_row)), + )?; + + let partitions = vec![batch]; + Self::try_new_from_batches(Arc::clone(&schema), partitions) + } + + /// Create a new plan using the provided schema and batches. + /// + /// Errors if any of the batches don't match the provided schema, or if no + /// batches are provided. + pub fn try_new_from_batches( + schema: SchemaRef, + batches: Vec, + ) -> Result { + if batches.is_empty() { + return plan_err!("Values list cannot be empty"); + } + + for batch in &batches { + let batch_schema = batch.schema(); + if batch_schema != schema { + return plan_err!( + "Batch has invalid schema. Expected: {}, got: {}", + schema, + batch_schema + ); + } + } + + let cache = Self::compute_properties_as_value(Arc::clone(&schema)); + Ok(Self { + partitions: vec![batches], + schema: Arc::clone(&schema), + projected_schema: Arc::clone(&schema), + projection: None, + sort_information: vec![], + cache, + show_sizes: true, + }) + } + /// Set `show_sizes` to determine whether to display partition sizes pub fn with_show_sizes(mut self, show_sizes: bool) -> Self { self.show_sizes = show_sizes; @@ -293,6 +383,15 @@ impl MemoryExec { Boundedness::Bounded, ) } + + fn compute_properties_as_value(schema: SchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } } /// Iterator over batches @@ -696,3 +795,96 @@ mod lazy_memory_tests { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::expressions::lit; + use crate::test::{self, make_partition}; + + use arrow_schema::{DataType, Field}; + use datafusion_common::stats::{ColumnStatistics, Precision}; + + #[tokio::test] + async fn values_empty_case() -> Result<()> { + let schema = test::aggr_test_schema(); + let empty = MemoryExec::try_new_as_values(schema, vec![]); + assert!(empty.is_err()); + Ok(()) + } + + #[test] + fn new_exec_with_batches() { + let batch = make_partition(7); + let schema = batch.schema(); + let batches = vec![batch.clone(), batch]; + let _exec = MemoryExec::try_new_from_batches(schema, batches).unwrap(); + } + + #[test] + fn new_exec_with_batches_empty() { + let batch = make_partition(7); + let schema = batch.schema(); + let _ = MemoryExec::try_new_from_batches(schema, Vec::new()).unwrap_err(); + } + + #[test] + fn new_exec_with_batches_invalid_schema() { + let batch = make_partition(7); + let batches = vec![batch.clone(), batch]; + + let invalid_schema = Arc::new(Schema::new(vec![ + Field::new("col0", DataType::UInt32, false), + Field::new("col1", DataType::Utf8, false), + ])); + let _ = MemoryExec::try_new_from_batches(invalid_schema, batches).unwrap_err(); + } + + // Test issue: https://github.com/apache/datafusion/issues/8763 + #[test] + fn new_exec_with_non_nullable_schema() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + let _ = MemoryExec::try_new_as_values(Arc::clone(&schema), vec![vec![lit(1u32)]]) + .unwrap(); + // Test that a null value is rejected + let _ = MemoryExec::try_new_as_values( + schema, + vec![vec![lit(ScalarValue::UInt32(None))]], + ) + .unwrap_err(); + } + + #[test] + fn values_stats_with_nulls_only() -> Result<()> { + let data = vec![ + vec![lit(ScalarValue::Null)], + vec![lit(ScalarValue::Null)], + vec![lit(ScalarValue::Null)], + ]; + let rows = data.len(); + let values = MemoryExec::try_new_as_values( + Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])), + data, + )?; + + assert_eq!( + values.statistics()?, + Statistics { + num_rows: Precision::Exact(rows), + total_byte_size: Precision::Exact(8), // not important + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(rows), // there are only nulls + distinct_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + },], + } + ); + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 5089b1e626d4..88a66c6eaf40 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -34,6 +34,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; /// Execution plan for values list based relation (produces constant rows) +#[deprecated(since = "44.0.0", note = "Use `MemoryExec::try_new_as_values` instead")] #[derive(Debug, Clone)] pub struct ValuesExec { /// The schema @@ -44,6 +45,7 @@ pub struct ValuesExec { cache: PlanProperties, } +#[allow(deprecated)] impl ValuesExec { /// Create a new values exec from data as expr pub fn try_new( @@ -117,6 +119,7 @@ impl ValuesExec { } let cache = Self::compute_properties(Arc::clone(&schema)); + #[allow(deprecated)] Ok(ValuesExec { schema, data: batches, @@ -126,6 +129,7 @@ impl ValuesExec { /// Provides the data pub fn data(&self) -> Vec { + #[allow(deprecated)] self.data.clone() } @@ -140,6 +144,7 @@ impl ValuesExec { } } +#[allow(deprecated)] impl DisplayAs for ValuesExec { fn fmt_as( &self, @@ -154,6 +159,7 @@ impl DisplayAs for ValuesExec { } } +#[allow(deprecated)] impl ExecutionPlan for ValuesExec { fn name(&self) -> &'static str { "ValuesExec" @@ -165,6 +171,7 @@ impl ExecutionPlan for ValuesExec { } fn properties(&self) -> &PlanProperties { + #[allow(deprecated)] &self.cache } @@ -176,6 +183,7 @@ impl ExecutionPlan for ValuesExec { self: Arc, _: Vec>, ) -> Result> { + #[allow(deprecated)] ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone()) .map(|e| Arc::new(e) as _) } @@ -194,6 +202,7 @@ impl ExecutionPlan for ValuesExec { Ok(Box::pin(MemoryStream::try_new( self.data(), + #[allow(deprecated)] Arc::clone(&self.schema), None, )?)) @@ -203,6 +212,7 @@ impl ExecutionPlan for ValuesExec { let batch = self.data(); Ok(common::compute_record_batch_statistics( &[batch], + #[allow(deprecated)] &self.schema, None, )) @@ -221,6 +231,7 @@ mod tests { #[tokio::test] async fn values_empty_case() -> Result<()> { let schema = test::aggr_test_schema(); + #[allow(deprecated)] let empty = ValuesExec::try_new(schema, vec![]); assert!(empty.is_err()); Ok(()) @@ -231,7 +242,7 @@ mod tests { let batch = make_partition(7); let schema = batch.schema(); let batches = vec![batch.clone(), batch]; - + #[allow(deprecated)] let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap(); } @@ -239,6 +250,7 @@ mod tests { fn new_exec_with_batches_empty() { let batch = make_partition(7); let schema = batch.schema(); + #[allow(deprecated)] let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err(); } @@ -251,6 +263,7 @@ mod tests { Field::new("col0", DataType::UInt32, false), Field::new("col1", DataType::Utf8, false), ])); + #[allow(deprecated)] let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err(); } @@ -262,8 +275,10 @@ mod tests { DataType::UInt32, false, )])); + #[allow(deprecated)] let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap(); // Test that a null value is rejected + #[allow(deprecated)] let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]]) .unwrap_err(); } @@ -276,6 +291,7 @@ mod tests { vec![lit(ScalarValue::Null)], ]; let rows = data.len(); + #[allow(deprecated)] let values = ValuesExec::try_new( Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])), data, From 2f19a54bb98e2ddc03d45879a56f66380f0bf88a Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Mon, 6 Jan 2025 22:55:58 -0500 Subject: [PATCH 2/7] clippy fix --- datafusion/physical-plan/src/memory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 123d67ad47fc..5a1e21863a8c 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -191,7 +191,7 @@ impl MemoryExec { let placeholder_schema = Arc::new(Schema::empty()); let placeholder_batch = RecordBatch::try_new_with_options( - placeholder_schema.clone(), + Arc::clone(&placeholder_schema), vec![], &RecordBatchOptions::new().with_row_count(Some(1)), )?; From eaaf2237724d8e0d7828a089c950f9d00ede4a5a Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 7 Jan 2025 10:09:36 -0500 Subject: [PATCH 3/7] Update datafusion/physical-plan/src/values.rs Co-authored-by: Andrew Lamb --- datafusion/physical-plan/src/values.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 88a66c6eaf40..a30b8981fdd8 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -34,7 +34,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; /// Execution plan for values list based relation (produces constant rows) -#[deprecated(since = "44.0.0", note = "Use `MemoryExec::try_new_as_values` instead")] +#[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new_as_values` instead")] #[derive(Debug, Clone)] pub struct ValuesExec { /// The schema From 388bc5d699dd0594dd3abdac4f51f7450ad6da31 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 7 Jan 2025 21:16:32 -0500 Subject: [PATCH 4/7] change to memoryexec --- datafusion/core/src/physical_planner.rs | 6 ++---- datafusion/sqllogictest/test_files/insert_to_external.slt | 2 +- datafusion/sqllogictest/test_files/order.slt | 4 ++-- datafusion/sqllogictest/test_files/select.slt | 6 +++--- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d389058cec80..6a4f379e4d6d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -54,8 +54,6 @@ use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::unnest::UnnestExec; -#[allow(deprecated)] // TODO: Remove in favour of MemoryExec -use crate::physical_plan::values::ValuesExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, @@ -467,8 +465,8 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; - #[allow(deprecated)] // TODO: Remove in favour of MemoryExec - let value_exec = ValuesExec::try_new(SchemaRef::new(exec_schema), exprs)?; + let value_exec = + MemoryExec::try_new_as_values(SchemaRef::new(exec_schema), exprs)?; Arc::new(value_exec) } LogicalPlan::EmptyRelation(EmptyRelation { diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index edfc2ee75bd7..2a2aecf2f2cc 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -128,7 +128,7 @@ physical_plan 01)DataSinkExec: sink=CsvSink(file_groups=[]) 02)--SortExec: expr=[a@0 ASC NULLS LAST, b@1 DESC], preserve_partitioning=[false] 03)----ProjectionExec: expr=[column1@0 as a, column2@1 as b] -04)------ValuesExec +04)------MemoryExec: partitions=1, partition_sizes=[1] query I INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9), (7,10), (3, 3), (2, 4), (1, 5); diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index a46040aa532e..8d96fe47f6b3 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -786,7 +786,7 @@ physical_plan 08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 09)----------------AggregateExec: mode=Partial, gby=[t@0 as t], aggr=[] 10)------------------ProjectionExec: expr=[column1@0 as t] -11)--------------------ValuesExec +11)--------------------MemoryExec: partitions=1, partition_sizes=[1] 12)------ProjectionExec: expr=[1 as m, t@0 as t] 13)--------AggregateExec: mode=FinalPartitioned, gby=[t@0 as t], aggr=[] 14)----------CoalesceBatchesExec: target_batch_size=8192 @@ -794,7 +794,7 @@ physical_plan 16)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 17)----------------AggregateExec: mode=Partial, gby=[t@0 as t], aggr=[] 18)------------------ProjectionExec: expr=[column1@0 as t] -19)--------------------ValuesExec +19)--------------------MemoryExec: partitions=1, partition_sizes=[1] ##### # Multi column sorting with lists diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index c687429ae6ec..a127463c2b27 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -424,19 +424,19 @@ query TT EXPLAIN VALUES (1, 'a', -1, 1.1),(NULL, 'b', -3, 0.5) ---- logical_plan Values: (Int64(1), Utf8("a"), Int64(-1), Float64(1.1)), (Int64(NULL), Utf8("b"), Int64(-3), Float64(0.5)) -physical_plan ValuesExec +physical_plan MemoryExec: partitions=1, partition_sizes=[1] query TT EXPLAIN VALUES ('1'::float) ---- logical_plan Values: (Float32(1) AS Utf8("1")) -physical_plan ValuesExec +physical_plan MemoryExec: partitions=1, partition_sizes=[1] query TT EXPLAIN VALUES (('1'||'2')::int unsigned) ---- logical_plan Values: (UInt32(12) AS Utf8("1") || Utf8("2")) -physical_plan ValuesExec +physical_plan MemoryExec: partitions=1, partition_sizes=[1] # all where empty From dc1952a35d33c96004c03b1f87732004393c0265 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Wed, 8 Jan 2025 16:39:58 -0500 Subject: [PATCH 5/7] Update datafusion/physical-plan/src/memory.rs Co-authored-by: Jay Zhan --- datafusion/physical-plan/src/memory.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 5a1e21863a8c..28783ebcf358 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -189,6 +189,8 @@ impl MemoryExec { let n_row = data.len(); let n_col = schema.fields().len(); + // We have this single row batch as a placeholder to satisfy evaluation argument + // and generate a single output row let placeholder_schema = Arc::new(Schema::empty()); let placeholder_batch = RecordBatch::try_new_with_options( Arc::clone(&placeholder_schema), From 577b47f9592af48211b5ef9479001e3e546019c8 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Thu, 9 Jan 2025 22:36:50 -0500 Subject: [PATCH 6/7] use compute properties --- datafusion/physical-plan/src/memory.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 28783ebcf358..6c878fe5297b 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -254,9 +254,10 @@ impl MemoryExec { } } - let cache = Self::compute_properties_as_value(Arc::clone(&schema)); + let partitions = vec![batches]; + let cache = Self::compute_properties(Arc::clone(&schema), &[], &partitions); Ok(Self { - partitions: vec![batches], + partitions, schema: Arc::clone(&schema), projected_schema: Arc::clone(&schema), projection: None, @@ -385,15 +386,6 @@ impl MemoryExec { Boundedness::Bounded, ) } - - fn compute_properties_as_value(schema: SchemaRef) -> PlanProperties { - PlanProperties::new( - EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, - ) - } } /// Iterator over batches From 8be2ccc9d56fe59b01c0d6372a8dd1afc4f434a5 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Fri, 10 Jan 2025 00:18:15 -0500 Subject: [PATCH 7/7] clippy fix --- datafusion/physical-expr-common/src/physical_expr.rs | 4 +--- datafusion/physical-expr-common/src/sort_expr.rs | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index c2e892d63da0..e90f9c32ee87 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -159,9 +159,7 @@ pub trait DynEq { impl DynEq for T { fn dyn_eq(&self, other: &dyn Any) -> bool { - other - .downcast_ref::() - .map_or(false, |other| other == self) + other.downcast_ref::() == Some(self) } } diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 0d7501610662..de8c07030ce5 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -292,7 +292,7 @@ impl PhysicalSortRequirement { pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool { self.expr.eq(&other.expr) && other.options.map_or(true, |other_opts| { - self.options.map_or(false, |opts| opts == other_opts) + self.options.as_ref() == Some(&other_opts) }) }