diff --git a/Cargo.toml b/Cargo.toml index 7aef936189bc8..fb85665e1ad69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,7 +161,7 @@ hex = { version = "0.4.3" } indexmap = "2.12.1" insta = { version = "1.46.0", features = ["glob", "filters"] } itertools = "0.14" -liblzma = { version = "0.4.4", features = ["static"] } +liblzma = { version = "0.4.5", features = ["static"] } log = "^0.4" num-traits = { version = "0.2" } object_store = { version = "0.12.4", default-features = false } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc7d534776d7e..b006d8ae39a9c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -802,10 +802,17 @@ impl DefaultPhysicalPlanner { )); } } - return internal_err!( + debug!( "Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences.iter().map(|s| format!("\n\t- {s}")).join("") ); + + //influx: temporarily remove error and only log so that we can find a + //reproducer in production + // return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences + // .iter() + // .map(|s| format!("\n\t- {s}")) + // .join("")); } let groups = self.create_grouping_physical_expr( @@ -4207,6 +4214,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_metadata() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4227,6 +4236,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_count() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4247,6 +4258,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_name() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4268,6 +4281,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_type() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4286,6 +4301,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_nullability() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4304,6 +4321,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_metadata() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4324,6 +4343,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_multiple() { let logical_schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), diff --git a/datafusion/physical-expr/src/equivalence/properties/union.rs b/datafusion/physical-expr/src/equivalence/properties/union.rs index d77129472a8ba..702688011668d 100644 --- a/datafusion/physical-expr/src/equivalence/properties/union.rs +++ b/datafusion/physical-expr/src/equivalence/properties/union.rs @@ -67,16 +67,43 @@ fn calculate_union_binary( }) .collect::>(); + // TEMP HACK WORKAROUND + // Revert code from https://github.com/apache/datafusion/pull/12562 + // Context: https://github.com/apache/datafusion/issues/13748 + // Context: https://github.com/influxdata/influxdb_iox/issues/13038 + // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = UnionEquivalentOrderingBuilder::new(); - orderings.add_satisfied_orderings(&lhs, &rhs)?; - orderings.add_satisfied_orderings(&rhs, &lhs)?; - let orderings = orderings.build(); + let mut orderings = vec![]; + for ordering in lhs.normalized_oeq_class().into_iter() { + let mut ordering: Vec = ordering.into(); + + // Progressively shorten the ordering to search for a satisfied prefix: + while !rhs.ordering_satisfy(ordering.clone())? { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + for ordering in rhs.normalized_oeq_class().into_iter() { + let mut ordering: Vec = ordering.into(); + + // Progressively shorten the ordering to search for a satisfied prefix: + while !lhs.ordering_satisfy(ordering.clone())? { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } let mut eq_properties = EquivalenceProperties::new(lhs.schema); eq_properties.add_constants(constants)?; eq_properties.add_orderings(orderings); + Ok(eq_properties) } @@ -122,6 +149,7 @@ struct UnionEquivalentOrderingBuilder { orderings: Vec, } +#[expect(unused)] impl UnionEquivalentOrderingBuilder { fn new() -> Self { Self { orderings: vec![] } @@ -504,6 +532,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -579,6 +608,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() -> Result<()> { let schema = create_test_schema().unwrap(); @@ -607,6 +637,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_gap_fill_symmetric() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -658,6 +689,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_middle_desc() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index bff33a281556d..8eb00327143dc 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ExecutionPlanProperties, get_plan_string}; use crate::PhysicalOptimizerRule; @@ -136,6 +138,14 @@ pub fn check_plan_sanity( plan.required_input_ordering(), plan.required_input_distribution(), ) { + // TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492 + if child.as_any().downcast_ref::().is_some() { + continue; + } + if child.as_any().downcast_ref::().is_some() { + continue; + } + let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { let sort_req = sort_req.into_single(); diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 4f32b6176ec39..1774a8ea95e53 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -25,6 +25,7 @@ use super::utils::{ OnceAsync, OnceFut, StatefulStreamResult, adjust_right_output_partitioning, reorder_output_after_swap, }; +use crate::coop::cooperative; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ @@ -332,7 +333,7 @@ impl ExecutionPlan for CrossJoinExec { })?; if enforce_batch_size_in_joins { - Ok(Box::pin(CrossJoinStream { + Ok(Box::pin(cooperative(CrossJoinStream { schema: Arc::clone(&self.schema), left_fut, right: stream, @@ -341,9 +342,9 @@ impl ExecutionPlan for CrossJoinExec { state: CrossJoinStreamState::WaitBuildSide, left_data: RecordBatch::new_empty(self.left().schema()), batch_transformer: BatchSplitter::new(batch_size), - })) + }))) } else { - Ok(Box::pin(CrossJoinStream { + Ok(Box::pin(cooperative(CrossJoinStream { schema: Arc::clone(&self.schema), left_fut, right: stream, @@ -352,7 +353,7 @@ impl ExecutionPlan for CrossJoinExec { state: CrossJoinStreamState::WaitBuildSide, left_data: RecordBatch::new_empty(self.left().schema()), batch_transformer: NoopBatchTransformer::new(), - })) + }))) } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 91fc1ee4436ee..0a45887eff3e4 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -22,6 +22,7 @@ use std::sync::{Arc, OnceLock}; use std::{any::Any, vec}; use crate::ExecutionPlanProperties; +use crate::coop::cooperative; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, @@ -1061,7 +1062,7 @@ impl ExecutionPlan for HashJoinExec { .map(|(_, right_expr)| Arc::clone(right_expr)) .collect::>(); - Ok(Box::pin(HashJoinStream::new( + Ok(Box::pin(cooperative(HashJoinStream::new( partition, self.schema(), on_right, @@ -1079,7 +1080,7 @@ impl ExecutionPlan for HashJoinExec { self.right.output_ordering().is_some(), build_accumulator, self.mode, - ))) + )))) } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 44637321a7e35..fa435c0d815b1 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -29,6 +29,7 @@ use super::utils::{ reorder_output_after_swap, swap_join_projection, }; use crate::common::can_project; +use crate::coop::cooperative; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::joins::SharedBitmapBuilder; use crate::joins::utils::{ @@ -529,7 +530,7 @@ impl ExecutionPlan for NestedLoopJoinExec { None => self.column_indices.clone(), }; - Ok(Box::pin(NestedLoopJoinStream::new( + Ok(Box::pin(cooperative(NestedLoopJoinStream::new( self.schema(), self.filter.clone(), self.join_type, @@ -538,7 +539,7 @@ impl ExecutionPlan for NestedLoopJoinExec { column_indices_after_projection, metrics, batch_size, - ))) + )))) } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index ae7a5fa764bcc..82929107806de 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -23,6 +23,7 @@ use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; +use crate::coop::cooperative; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::expressions::PhysicalSortExpr; use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics; @@ -497,7 +498,7 @@ impl ExecutionPlan for SortMergeJoinExec { .register(context.memory_pool()); // create join stream - Ok(Box::pin(SortMergeJoinStream::try_new( + Ok(Box::pin(cooperative(SortMergeJoinStream::try_new( context.session_config().spill_compression(), Arc::clone(&self.schema), self.sort_options.clone(), @@ -512,7 +513,7 @@ impl ExecutionPlan for SortMergeJoinExec { SortMergeJoinMetrics::new(partition, &self.metrics), reservation, context.runtime_env(), - )?)) + )?))) } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1f6bc703a0300..a54f930114c60 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -33,6 +33,7 @@ use std::task::{Context, Poll}; use std::vec; use crate::common::SharedMemoryReservation; +use crate::coop::cooperative; use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; use crate::joins::stream_join_utils::{ PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics, @@ -534,7 +535,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { } if enforce_batch_size_in_joins { - Ok(Box::pin(SymmetricHashJoinStream { + Ok(Box::pin(cooperative(SymmetricHashJoinStream { left_stream, right_stream, schema: self.schema(), @@ -552,9 +553,9 @@ impl ExecutionPlan for SymmetricHashJoinExec { state: SHJStreamState::PullRight, reservation, batch_transformer: BatchSplitter::new(batch_size), - })) + }))) } else { - Ok(Box::pin(SymmetricHashJoinStream { + Ok(Box::pin(cooperative(SymmetricHashJoinStream { left_stream, right_stream, schema: self.schema(), @@ -572,7 +573,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { state: SHJStreamState::PullRight, reservation, batch_transformer: NoopBatchTransformer::new(), - })) + }))) } }