diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 855550dc748a6..76b9848d662c3 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -19,6 +19,9 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; +use crate::physical_optimizer::sanity_checker::{ + assert_sanity_check, assert_sanity_check_err, +}; use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, repartition_exec, schema, sort_merge_join_exec, sort_preserving_merge_exec, @@ -32,8 +35,9 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, ParquetSource}; use datafusion_common::error::Result; +use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::ScalarValue; +use datafusion_common::{ColumnStatistics, ScalarValue}; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr::PhysicalExpr; @@ -61,7 +65,9 @@ use datafusion_physical_plan::source::DataSourceExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::ExecutionPlanProperties; use datafusion_physical_plan::PlanProperties; -use datafusion_physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; +use datafusion_physical_plan::{ + displayable, get_plan_string, DisplayAs, DisplayFormatType, Statistics, +}; /// Models operators like BoundedWindowExec that require an input /// ordering but is easy to construct @@ -186,6 +192,43 @@ fn parquet_exec_multiple_sorted( .new_exec() } +fn int64_stats() -> ColumnStatistics { + ColumnStatistics { + null_count: Precision::Absent, + sum_value: Precision::Absent, + max_value: Precision::Exact(1_000_000.into()), + min_value: Precision::Exact(0.into()), + distinct_count: Precision::Absent, + } +} + +fn column_stats() -> Vec { + vec![ + int64_stats(), // a + int64_stats(), // b + int64_stats(), // c + ColumnStatistics::default(), + ColumnStatistics::default(), + ] +} + +pub(crate) fn parquet_exec_with_stats() -> Arc { + let mut statistics = Statistics::new_unknown(&schema()); + statistics.num_rows = Precision::Inexact(10); + statistics.column_statistics = column_stats(); + + let config = FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema(), + Arc::new(ParquetSource::new(Default::default())), + ) + .with_file(PartitionedFile::new("x".to_string(), 10000)) + .with_statistics(statistics); + assert_eq!(config.statistics.num_rows, Precision::Inexact(10)); + + config.new_exec() +} + fn csv_exec() -> Arc { csv_exec_with_sort(vec![]) } @@ -220,7 +263,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { @@ -503,6 +546,53 @@ macro_rules! assert_plan_txt { }; } +/// Similar to [`macro_rules! assert_optimized`], but with the following: +/// * only run the EnforceDistribution once. +/// * then only run the EnforceSorting once. +/// * does not force round-robin repartitioning to be inserted. +/// +/// It also is a simplified test case, and does not handle JOINS. +/// (Does run join key reordering, a pre-condition before distribution enforcement). +fn assert_optimized_without_forced_roundrobin( + expected: &[&str], + plan: Arc, + config: &ConfigOptions, +) -> Arc { + // Add the ancillary output requirements operator at the start: + let optimizer = OutputRequirements::new_add_mode(); + let optimized = optimizer + .optimize(plan, config) + .expect("failed to add output req node"); + + let optimizer = EnforceDistribution::new(); + let optimized = optimizer + .optimize(optimized, config) + .expect("failed distribution enforcement"); + + let optimizer = EnforceSorting::new(); + let optimized = optimizer + .optimize(optimized, config) + .expect("failed sorting enforcement"); + + // Remove the ancillary output requirements operator when done: + let optimizer = OutputRequirements::new_remove_mode(); + let optimized = optimizer + .optimize(optimized, config) + .expect("failed to remove output req node"); + + // Now format correctly + let actual_lines = get_plan_string(&optimized); + + let expected_lines: Vec<&str> = expected.iter().map(|s| *s).collect(); + assert_eq!( + &expected_lines, &actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + + optimized +} + #[test] fn multi_hash_joins() -> Result<()> { let left = parquet_exec(); @@ -3154,3 +3244,313 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { Ok(()) } + +fn aggregate_over_union(input: Vec>) -> Arc { + let union = union_exec(input); + let plan = + aggregate_exec_with_alias(union, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + let before = get_plan_string(&plan); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +// Aggregate over a union, +// with current testing setup. +// +// It will repartiton twice for an aggregate over a union. +// * repartitions before the partial aggregate. +// * repartitions before the final aggregation. +#[test] +fn repartitions_twice_for_aggregate_after_union() -> Result<()> { + let plan = aggregate_over_union(vec![parquet_exec(); 2]); + + // We get a distribution error without repartitioning. + let err = "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)"; + assert_sanity_check_err(&plan, err); + + // Test: using the `assert_optimized` macro. + // + // Updated plan (post optimization) will have added RepartitionExecs (btwn union and aggregation). + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "UnionExec", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + assert_optimized!(expected, plan.clone(), true); + + // Test: confirm changes are idempotent + assert_optimized!(expected, plan.clone(), false); + + Ok(()) +} + +// Aggregate over a union, +// but make the test setup match our real world example. +// +// It will still repartiton twice for an aggregate over a union. +#[test] +fn redo_with_new_testing_setup() -> Result<()> { + // use parquet exec with stats + let plan: Arc = + aggregate_over_union(vec![parquet_exec_with_stats(); 2]); + + // We get a distribution error without repartitioning. + let err = "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)"; + assert_sanity_check_err(&plan, err); + + // Test: using the `assert_optimized_without_forced_roundrobin` macro. + // This removes the forced round-robin repartitioning, + // by no longer hard-coding batch_size=1. + // + // We get the same output as using the `assert_optimized` macro, for this base testing case. + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let plan = assert_optimized_without_forced_roundrobin( + expected, + plan, + &ConfigOptions::default(), + ); + + // Test: confirm changes are idempotent + let plan = assert_optimized_without_forced_roundrobin( + expected, + plan, + &ConfigOptions::default(), + ); + + // Test: plan now passes sanity check. + assert_sanity_check(&plan, true); + + Ok(()) +} + +/// Same as [`aggregate_over_union`], but with a sort btwn the union and aggregation. +fn aggregate_over_sorted_union( + input: Vec>, +) -> Arc { + let union = union_exec(input); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union, false); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = get_plan_string(&plan); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +/// Same starting plan as [`redo_with_new_testing_setup`], +/// but adds a sort btwn the union and the aggregate. This changes the outcome: +/// +/// * we still get repartitioning +/// * we get another sort added +/// * if we run only once, we get 2 additional sorts pushed down below the union +#[test] +fn repartitions_for_aggregate_after_sorted_union() -> Result<()> { + let plan = aggregate_over_sorted_union(vec![parquet_exec_with_stats(); 2]); + + // We get a distribution error without repartitioning. + let err = "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: SinglePartition. Child-0 output partitioning: UnknownPartitioning(2)"; + assert_sanity_check_err(&plan, err); + + // Once we add the additional sort (in the starting plan), the optimizer run does: + // * add 2 RepartitionExec. Same as before. + // * replaces the original sort with a SPM. New change. + // * pushes the sort down below the union. New change. + // * adds another sort between the partial and final aggregates. New change. + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " UnionExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let plan = assert_optimized_without_forced_roundrobin( + expected_after_first_run, + plan, + &ConfigOptions::default(), + ); + + // Demonstrate: plan changes are not idempotent. + // + // After the second run, the optimizer: + // * replaces the SPM with a sort, and removes the sorts pushed down below the union. + // * swaps the repartition vs SPM. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let plan = assert_optimized_without_forced_roundrobin( + expected_after_second_run, + plan, + &ConfigOptions::default(), + ); + + // Test: plan now passes sanity check. + assert_sanity_check(&plan, true); + + Ok(()) +} + +/// Same as [`aggregate_over_sorted_union`], but with a projection->sort added btwn the union and aggregation. +fn aggregate_over_sorted_union_projection( + input: Vec>, +) -> Arc { + let union = union_exec(input); + let union_projection = projection_exec_with_alias( + union, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union_projection, false); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = get_plan_string(&plan); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +/// Same starting plan as [`repartitions_for_aggregate_after_sorted_union`], but adds a projection +/// as well between the union and aggregate. This change the outcome: +/// +/// * we still get repartitioning +/// * we still get another sort added +/// * we no longer get 2 additional sorts pushed down below the union, after the first run +#[test] +fn repartitions_for_aggregate_after_sorted_union_projection() -> Result<()> { + let mut config = ConfigOptions::default(); + config + .execution + .use_row_number_estimates_to_optimize_partitioning = true; + config.execution.batch_size = 100; + + let plan = aggregate_over_sorted_union_projection(vec![parquet_exec_with_stats(); 2]); + + // We get a distribution error without repartitioning. + let err = "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: SinglePartition. Child-0 output partitioning: UnknownPartitioning(2)"; + assert_sanity_check_err(&plan, err); + + // Once we add the additional projection (in the starting plan), the optimizer run does: + // * add 2 RepartitionExec. Same as before. + // * replaces the sort (above the union) with an SPM. Same as before. + // * NO LONGER pushes 2 additional sorts down below the union. New change. + // * adds another sort between the partial and final aggregates. Same as before. + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let plan = assert_optimized_without_forced_roundrobin( + expected_after_first_run, + plan, + &config, + ); + + // Demonstrate: plan changes are not idempotent. + // + // After the second run, the optimizer: + // * replaces the SPM with a sort. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let plan = assert_optimized_without_forced_roundrobin( + expected_after_second_run, + plan, + &config, + ); + + // Test: plan now passes sanity check. + assert_sanity_check(&plan, true); + + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 4b358e47361bb..3d088c7b682d2 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -17,6 +17,12 @@ use std::sync::Arc; +use crate::physical_optimizer::enforce_distribution::{ + parquet_exec_with_stats, projection_exec_with_alias, +}; +use crate::physical_optimizer::sanity_checker::{ + assert_sanity_check, assert_sanity_check_err, +}; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, bounded_window_exec_non_set_monotonic, bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec, @@ -28,6 +34,7 @@ use crate::physical_optimizer::test_utils::{ spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, }; +use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use datafusion_physical_plan::displayable; use arrow::compute::SortOptions; use arrow::datatypes::SchemaRef; @@ -55,6 +62,8 @@ use datafusion_physical_plan::sorts::sort::SortExec; use rstest::rstest; +use super::test_utils::schema; + /// Create a csv exec for tests fn csv_exec_ordered( schema: &SchemaRef, @@ -2280,3 +2289,85 @@ async fn test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()> assert_optimized!(expected_input, expected_no_change, physical_plan, true); Ok(()) } + +fn single_partition_aggregate( + input: Arc, + alias_pairs: Vec<(String, String)>, +) -> Arc { + let schema = schema(); + let group_by = alias_pairs + .iter() + .map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string())) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(group_by); + + Arc::new( + AggregateExec::try_new( + AggregateMode::SinglePartitioned, + group_by, + vec![], + vec![], + input, + schema, + ) + .unwrap(), + ) +} + +#[tokio::test] +async fn test_preserve_needed_coalesce() -> Result<()> { + // Input to EnforceSorting, from our test case. + let plan = projection_exec_with_alias( + union_exec(vec![parquet_exec_with_stats(); 2]), + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let plan = Arc::new(CoalescePartitionsExec::new(plan)); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let plan: Arc = + single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); + let plan = sort_exec(sort_key, plan); + + // Starting plan: as in our test case. + assert_eq!( + get_plan_string(&plan), + vec![ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + assert_sanity_check(&plan, true); + + // EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate). + let optimizer = EnforceSorting::new(); + let optimized = optimizer.optimize(plan, &Default::default())?; + assert_eq!( + get_plan_string(&optimized), + vec![ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + // Plan is now invalid. + let err = "does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)"; + assert_sanity_check_err(&optimized, err); + + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index a73d084a081f3..594cb5b8a7e0d 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -388,13 +388,21 @@ fn create_test_schema2() -> SchemaRef { } /// Check if sanity checker should accept or reject plans. -fn assert_sanity_check(plan: &Arc, is_sane: bool) { +pub(crate) fn assert_sanity_check(plan: &Arc, is_sane: bool) { let sanity_checker = SanityCheckPlan::new(); let opts = ConfigOptions::default(); - assert_eq!( - sanity_checker.optimize(plan.clone(), &opts).is_ok(), - is_sane - ); + let res = sanity_checker + .optimize(plan.clone(), &opts) + .map_err(|e| e.to_string()); + assert_eq!(res.is_ok(), is_sane, "SanityCheck returned {:?}", res); +} + +/// Assert reason for sanity check failure. +pub(crate) fn assert_sanity_check_err(plan: &Arc, err: &str) { + let sanity_checker = SanityCheckPlan::new(); + let opts = ConfigOptions::default(); + let error = sanity_checker.optimize(plan.clone(), &opts).unwrap_err(); + assert!(error.message().contains(err)); } /// Check if the plan we created is as expected by comparing the plan