diff --git a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs index de8f7b36a3a7..9c76f6ab6f58 100644 --- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs @@ -20,9 +20,10 @@ //! Note these tests are not in the same module as the optimizer pass because //! they rely on `DataSourceExec` which is in the core crate. +use insta::assert_snapshot; use std::sync::Arc; -use crate::physical_optimizer::test_utils::{parquet_exec, trim_plan_display}; +use crate::physical_optimizer::test_utils::parquet_exec; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; @@ -43,22 +44,16 @@ use datafusion_physical_plan::ExecutionPlan; /// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected macro_rules! assert_optimized { - ($EXPECTED_LINES: expr, $PLAN: expr) => { - let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); - + ($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => { // run optimizer let optimizer = CombinePartialFinalAggregate {}; let config = ConfigOptions::new(); let optimized = optimizer.optimize($PLAN, &config)?; // Now format correctly let plan = displayable(optimized.as_ref()).indent(true).to_string(); - let actual_lines = trim_plan_display(&plan); + let actual_lines = plan.trim(); - assert_eq!( - &expected_lines, &actual_lines, - "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", - expected_lines, actual_lines - ); + assert_snapshot!(actual_lines, @ $EXPECTED_LINES); }; } @@ -144,13 +139,15 @@ fn aggregations_not_combined() -> datafusion_common::Result<()> { aggr_expr, ); // should not combine the Partial/Final AggregateExecs - let expected = &[ - "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet", - ]; - assert_optimized!(expected, plan); + assert_optimized!( + plan, + @ r" + AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)] + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet + " + ); let aggr_expr1 = vec![count_expr(lit(1i8), "COUNT(1)", &schema)]; let aggr_expr2 = vec![count_expr(lit(1i8), "COUNT(2)", &schema)]; @@ -165,13 +162,14 @@ fn aggregations_not_combined() -> datafusion_common::Result<()> { aggr_expr2, ); // should not combine the Partial/Final AggregateExecs - let expected = &[ - "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]", - "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet", - ]; - - assert_optimized!(expected, plan); + assert_optimized!( + plan, + @ r" + AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)] + AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet + " + ); Ok(()) } @@ -191,12 +189,13 @@ fn aggregations_combined() -> datafusion_common::Result<()> { aggr_expr, ); // should combine the Partial/Final AggregateExecs to the Single AggregateExec - let expected = &[ - "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet", - ]; - - assert_optimized!(expected, plan); + assert_optimized!( + plan, + @ " + AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet + " + ); Ok(()) } @@ -224,12 +223,13 @@ fn aggregations_with_group_combined() -> datafusion_common::Result<()> { let plan = final_aggregate_exec(partial_agg, final_group_by, aggr_expr); // should combine the Partial/Final AggregateExecs to the Single AggregateExec - let expected = &[ - "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[Sum(b)]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet", - ]; - - assert_optimized!(expected, plan); + assert_optimized!( + plan, + @ r" + AggregateExec: mode=Single, gby=[c@2 as c], aggr=[Sum(b)] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet + " + ); Ok(()) } @@ -265,11 +265,12 @@ fn aggregations_with_limit_combined() -> datafusion_common::Result<()> { let plan: Arc = final_agg; // should combine the Partial/Final AggregateExecs to a Single AggregateExec // with the final limit preserved - let expected = &[ - "AggregateExec: mode=Single, gby=[c@2 as c], aggr=[], lim=[5]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet", - ]; - - assert_optimized!(expected, plan); + assert_optimized!( + plan, + @ r" + AggregateExec: mode=Single, gby=[c@2 as c], aggr=[], lim=[5] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c], file_type=parquet + " + ); Ok(()) } diff --git a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs index 409d392f7819..ad15d6803413 100644 --- a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs @@ -17,11 +17,12 @@ //! Integration tests for [`LimitedDistinctAggregation`] physical optimizer rule +use insta::assert_snapshot; use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - assert_plan_matches_expected, build_group_by, mock_data, parquet_exec_with_sort, - schema, TestAggregate, + build_group_by, get_optimized_plan, mock_data, parquet_exec_with_sort, schema, + TestAggregate, }; use arrow::datatypes::DataType; @@ -39,16 +40,12 @@ use datafusion_physical_plan::{ ExecutionPlan, }; -async fn assert_results_match_expected( - plan: Arc, - expected: &str, -) -> Result<()> { +async fn run_plan_and_format(plan: Arc) -> Result { let cfg = SessionConfig::new().with_target_partitions(1); let ctx = SessionContext::new_with_config(cfg); let batches = collect(plan, ctx.task_ctx()).await?; let actual = format!("{}", pretty_format_batches(&batches)?); - assert_eq!(actual, expected); - Ok(()) + Ok(actual) } #[tokio::test] @@ -77,27 +74,33 @@ async fn test_partial_final() -> Result<()> { Arc::new(final_agg), 4, // fetch ); - // expected to push the limit to the Partial and Final AggregateExecs - let expected = [ - "LocalLimitExec: fetch=4", - "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[], lim=[4]", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[], lim=[4]", - "DataSourceExec: partitions=1, partition_sizes=[1]", - ]; let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - let expected = r#" -+---+ -| a | -+---+ -| 1 | -| 2 | -| | -| 4 | -+---+ -"# - .trim(); - assert_results_match_expected(plan, expected).await?; + let formatted = get_optimized_plan(&plan)?; + let actual = formatted.trim(); + assert_snapshot!( + actual, + @r" + LocalLimitExec: fetch=4 + AggregateExec: mode=Final, gby=[a@0 as a], aggr=[], lim=[4] + AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[], lim=[4] + DataSourceExec: partitions=1, partition_sizes=[1] + " + ); + let expected = run_plan_and_format(plan).await?; + assert_snapshot!( + expected, + @r" + +---+ + | a | + +---+ + | 1 | + | 2 | + | | + | 4 | + +---+ + " + ); + Ok(()) } @@ -120,25 +123,31 @@ async fn test_single_local() -> Result<()> { 4, // fetch ); // expected to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=4", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4]", - "DataSourceExec: partitions=1, partition_sizes=[1]", - ]; let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - let expected = r#" -+---+ -| a | -+---+ -| 1 | -| 2 | -| | -| 4 | -+---+ -"# - .trim(); - assert_results_match_expected(plan, expected).await?; + let formatted = get_optimized_plan(&plan)?; + let actual = formatted.trim(); + assert_snapshot!( + actual, + @r" + LocalLimitExec: fetch=4 + AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4] + DataSourceExec: partitions=1, partition_sizes=[1] + " + ); + let expected = run_plan_and_format(plan).await?; + assert_snapshot!( + expected, + @r" + +---+ + | a | + +---+ + | 1 | + | 2 | + | | + | 4 | + +---+ + " + ); Ok(()) } @@ -162,24 +171,30 @@ async fn test_single_global() -> Result<()> { Some(3), // fetch ); // expected to push the skip+fetch limit to the AggregateExec - let expected = [ - "GlobalLimitExec: skip=1, fetch=3", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4]", - "DataSourceExec: partitions=1, partition_sizes=[1]", - ]; let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - let expected = r#" -+---+ -| a | -+---+ -| 2 | -| | -| 4 | -+---+ -"# - .trim(); - assert_results_match_expected(plan, expected).await?; + let formatted = get_optimized_plan(&plan)?; + let actual = formatted.trim(); + assert_snapshot!( + actual, + @r" + GlobalLimitExec: skip=1, fetch=3 + AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4] + DataSourceExec: partitions=1, partition_sizes=[1] + " + ); + let expected = run_plan_and_format(plan).await?; + assert_snapshot!( + expected, + @r" + +---+ + | a | + +---+ + | 2 | + | | + | 4 | + +---+ + " + ); Ok(()) } @@ -210,26 +225,32 @@ async fn test_distinct_cols_different_than_group_by_cols() -> Result<()> { 4, // fetch ); // expected to push the limit to the outer AggregateExec only - let expected = [ - "LocalLimitExec: fetch=4", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4]", - "AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[]", - "DataSourceExec: partitions=1, partition_sizes=[1]", - ]; let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; - let expected = r#" -+---+ -| a | -+---+ -| 1 | -| 2 | -| | -| 4 | -+---+ -"# - .trim(); - assert_results_match_expected(plan, expected).await?; + let formatted = get_optimized_plan(&plan)?; + let actual = formatted.trim(); + assert_snapshot!( + actual, + @r" + LocalLimitExec: fetch=4 + AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], lim=[4] + AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[] + DataSourceExec: partitions=1, partition_sizes=[1] + " + ); + let expected = run_plan_and_format(plan).await?; + assert_snapshot!( + expected, + @r" + +---+ + | a | + +---+ + | 1 | + | 2 | + | | + | 4 | + +---+ + " + ); Ok(()) } @@ -258,13 +279,17 @@ fn test_has_order_by() -> Result<()> { 10, // fetch ); // expected not to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], ordering_mode=Sorted", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", - ]; - let plan = Arc::new(limit_exec) as _; - assert_plan_matches_expected(&plan, &expected)?; + let plan: Arc = Arc::new(limit_exec); + let formatted = get_optimized_plan(&plan)?; + let actual = formatted.trim(); + assert_snapshot!( + actual, + @r" + LocalLimitExec: fetch=10 + AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], ordering_mode=Sorted + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + " + ); Ok(()) } @@ -287,13 +312,17 @@ fn test_no_group_by() -> Result<()> { 10, // fetch ); // expected not to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[], aggr=[]", - "DataSourceExec: partitions=1, partition_sizes=[1]", - ]; let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; + let formatted = get_optimized_plan(&plan)?; + let actual = formatted.trim(); + assert_snapshot!( + actual, + @r" + LocalLimitExec: fetch=10 + AggregateExec: mode=Single, gby=[], aggr=[] + DataSourceExec: partitions=1, partition_sizes=[1] + " + ); Ok(()) } @@ -317,13 +346,17 @@ fn test_has_aggregate_expression() -> Result<()> { 10, // fetch ); // expected not to push the limit to the AggregateExec - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - "DataSourceExec: partitions=1, partition_sizes=[1]", - ]; let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; + let formatted = get_optimized_plan(&plan)?; + let actual = formatted.trim(); + assert_snapshot!( + actual, + @r" + LocalLimitExec: fetch=10 + AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)] + DataSourceExec: partitions=1, partition_sizes=[1] + " + ); Ok(()) } @@ -355,12 +388,16 @@ fn test_has_filter() -> Result<()> { ); // expected not to push the limit to the AggregateExec // TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out - let expected = [ - "LocalLimitExec: fetch=10", - "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]", - "DataSourceExec: partitions=1, partition_sizes=[1]", - ]; let plan: Arc = Arc::new(limit_exec); - assert_plan_matches_expected(&plan, &expected)?; + let formatted = get_optimized_plan(&plan)?; + let actual = formatted.trim(); + assert_snapshot!( + actual, + @r" + LocalLimitExec: fetch=10 + AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)] + DataSourceExec: partitions=1, partition_sizes=[1] + " + ); Ok(()) } diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 1f8aad0f2334..6964965a6431 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -57,8 +57,9 @@ use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::union::UnionExec; -use datafusion_physical_plan::{get_plan_string, ExecutionPlan}; +use datafusion_physical_plan::{displayable, ExecutionPlan}; +use insta::assert_snapshot; use itertools::Itertools; /// Mocked UDF @@ -424,19 +425,29 @@ fn test_csv_after_projection() -> Result<()> { ], csv.clone(), )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[b@2 as b, d@0 as d]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[d, c, b], file_type=csv, has_header=false", - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[b@2 as b, d@0 as d] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[d, c, b], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = - ["DataSourceExec: file_groups={1 group: [[x]]}, projection=[b, d], file_type=csv, has_header=false"]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + + assert_snapshot!( + actual, + @"DataSourceExec: file_groups={1 group: [[x]]}, projection=[b, d], file_type=csv, has_header=false" + ); Ok(()) } @@ -452,18 +463,30 @@ fn test_memory_after_projection() -> Result<()> { ], memory.clone(), )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[d@2 as d, e@3 as e, a@1 as a]", - " DataSourceExec: partitions=0, partition_sizes=[]", - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[d@2 as d, e@3 as e, a@1 as a] + DataSourceExec: partitions=0, partition_sizes=[] + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = ["DataSourceExec: partitions=0, partition_sizes=[]"]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + + assert_snapshot!( + actual, + @"DataSourceExec: partitions=0, partition_sizes=[]" + ); + assert_eq!( after_optimize .clone() @@ -630,22 +653,35 @@ fn test_projection_after_projection() -> Result<()> { child_projection.clone(), )?); - let initial = get_plan_string(&top_projection); - let expected_initial = [ - "ProjectionExec: expr=[new_b@3 as new_b, c@0 + new_e@1 as binary, new_b@3 as newest_b]", - " ProjectionExec: expr=[c@2 as c, e@4 as new_e, a@0 as a, b@1 as new_b]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(top_projection.as_ref()) + .indent(true) + .to_string(); + let actual = initial.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[new_b@3 as new_b, c@0 + new_e@1 as binary, new_b@3 as newest_b] + ProjectionExec: expr=[c@2 as c, e@4 as new_e, a@0 as a, b@1 as new_b] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(top_projection, &ConfigOptions::new())?; - let expected = [ - "ProjectionExec: expr=[b@1 as new_b, c@2 + e@4 as binary, b@1 as newest_b]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[b@1 as new_b, c@2 + e@4 as binary, b@1 as newest_b] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); Ok(()) } @@ -686,24 +722,35 @@ fn test_output_req_after_projection() -> Result<()> { sort_req.clone(), )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " OutputRequirementExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + OutputRequirementExec + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected: [&str; 3] = [ - "OutputRequirementExec", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + + assert_snapshot!( + actual, + @r" + OutputRequirementExec + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); - assert_eq!(get_plan_string(&after_optimize), expected); let expected_reqs = OrderingRequirements::new( [ PhysicalSortRequirement::new( @@ -766,23 +813,34 @@ fn test_coalesce_partitions_after_projection() -> Result<()> { ], coalesce_partitions, )?); - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d]", - " CoalescePartitionsExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d] + CoalescePartitionsExec + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "CoalescePartitionsExec", - " ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + + assert_snapshot!( + actual, + @r" + CoalescePartitionsExec + ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); Ok(()) } @@ -804,7 +862,7 @@ fn test_filter_after_projection() -> Result<()> { )), )); let filter = Arc::new(FilterExec::try_new(predicate, csv)?); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ (Arc::new(Column::new("a", 0)), "a_new".to_string()), (Arc::new(Column::new("b", 1)), "b".to_string()), @@ -813,23 +871,34 @@ fn test_filter_after_projection() -> Result<()> { filter.clone(), )?) as _; - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d]", - " FilterExec: b@1 - a@0 > d@3 - a@0", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d] + FilterExec: b@1 - a@0 > d@3 - a@0 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "FilterExec: b@1 - a_new@0 > d@2 - a_new@0", - " ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + + assert_snapshot!( + actual, + @r" + FilterExec: b@1 - a_new@0 > d@2 - a_new@0 + ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); Ok(()) } @@ -888,7 +957,7 @@ fn test_join_after_projection() -> Result<()> { None, StreamJoinPartitionMode::SinglePartition, )?); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ (Arc::new(Column::new("c", 2)), "c_from_left".to_string()), (Arc::new(Column::new("b", 1)), "b_from_left".to_string()), @@ -898,26 +967,37 @@ fn test_join_after_projection() -> Result<()> { ], join, )?) as _; - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, a@5 as a_from_right, c@7 as c_from_right]", - " SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, a@5 as a_from_right, c@7 as c_from_right] + SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b_from_left@1, c_from_right@1)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", - " ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " ProjectionExec: expr=[a@0 as a_from_right, c@2 as c_from_right]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + + assert_snapshot!( + actual, + @r" + SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b_from_left@1, c_from_right@1)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2 + ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + ProjectionExec: expr=[a@0 as a_from_right, c@2 as c_from_right] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let expected_filter_col_ind = vec![ ColumnIndex { @@ -1002,7 +1082,7 @@ fn test_join_after_required_projection() -> Result<()> { None, StreamJoinPartitionMode::SinglePartition, )?); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ (Arc::new(Column::new("a", 5)), "a".to_string()), (Arc::new(Column::new("b", 6)), "b".to_string()), @@ -1017,25 +1097,36 @@ fn test_join_after_required_projection() -> Result<()> { ], join, )?) as _; - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]", - " SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e] + SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]", - " SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e] + SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); Ok(()) } @@ -1081,27 +1172,37 @@ fn test_nested_loop_join_after_projection() -> Result<()> { None, )?) as _; - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![(col_left_c, "c".to_string())], Arc::clone(&join), )?) as _; - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c]", - " NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[c@2 as c] + NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); - let after_optimize = + let after_optimize_string = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1, projection=[c@2]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize_string.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + assert_snapshot!( + actual, + @r" + NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1, projection=[c@2] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + + ); Ok(()) } @@ -1160,7 +1261,7 @@ fn test_hash_join_after_projection() -> Result<()> { PartitionMode::Auto, NullEquality::NullEqualsNull, )?); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ (Arc::new(Column::new("c", 2)), "c_from_left".to_string()), (Arc::new(Column::new("b", 1)), "b_from_left".to_string()), @@ -1169,18 +1270,35 @@ fn test_hash_join_after_projection() -> Result<()> { ], join.clone(), )?) as _; - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@7 as c_from_right]", " HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@7 as c_from_right] + HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); // HashJoinExec only returns result after projection. Because there are some alias columns in the projection, the ProjectionExec is not removed. - let expected = ["ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@3 as c_from_right]", " HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false"]; - assert_eq!(get_plan_string(&after_optimize), expected); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@3 as c_from_right] + HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let projection = Arc::new(ProjectionExec::try_new( vec![ @@ -1194,10 +1312,20 @@ fn test_hash_join_after_projection() -> Result<()> { let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); // Comparing to the previous result, this projection don't have alias columns either change the order of output fields. So the ProjectionExec is removed. - let expected = ["HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false"]; - assert_eq!(get_plan_string(&after_optimize), expected); + assert_snapshot!( + actual, + @r" + HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); Ok(()) } @@ -1216,7 +1344,7 @@ fn test_repartition_after_projection() -> Result<()> { 6, ), )?); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ (Arc::new(Column::new("b", 1)), "b_new".to_string()), (Arc::new(Column::new("a", 0)), "a".to_string()), @@ -1224,23 +1352,32 @@ fn test_repartition_after_projection() -> Result<()> { ], repartition, )?) as _; - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]", - " RepartitionExec: partitioning=Hash([a@0, b@1, d@3], 6), input_partitions=1", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new] + RepartitionExec: partitioning=Hash([a@0, b@1, d@3], 6), input_partitions=1 + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "RepartitionExec: partitioning=Hash([a@1, b_new@0, d_new@2], 6), input_partitions=1", - " ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + assert_snapshot!( + actual, + @r" + RepartitionExec: partitioning=Hash([a@1, b_new@0, d_new@2], 6), input_partitions=1 + ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); assert_eq!( after_optimize @@ -1277,7 +1414,7 @@ fn test_sort_after_projection() -> Result<()> { .into(), csv, ); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ (Arc::new(Column::new("c", 2)), "c".to_string()), (Arc::new(Column::new("a", 0)), "new_a".to_string()), @@ -1286,23 +1423,32 @@ fn test_sort_after_projection() -> Result<()> { Arc::new(sort_exec), )?) as _; - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " SortExec: expr=[b@1 ASC, c@2 + a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + SortExec: expr=[b@1 ASC, c@2 + a@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "SortExec: expr=[b@2 ASC, c@0 + new_a@1 ASC], preserve_partitioning=[false]", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + assert_snapshot!( + actual, + @r" + SortExec: expr=[b@2 ASC, c@0 + new_a@1 ASC], preserve_partitioning=[false] + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); Ok(()) } @@ -1322,7 +1468,7 @@ fn test_sort_preserving_after_projection() -> Result<()> { .into(), csv, ); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ (Arc::new(Column::new("c", 2)), "c".to_string()), (Arc::new(Column::new("a", 0)), "new_a".to_string()), @@ -1331,23 +1477,32 @@ fn test_sort_preserving_after_projection() -> Result<()> { Arc::new(sort_exec), )?) as _; - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " SortPreservingMergeExec: [b@1 ASC, c@2 + a@0 ASC]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + SortPreservingMergeExec: [b@1 ASC, c@2 + a@0 ASC] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "SortPreservingMergeExec: [b@2 ASC, c@0 + new_a@1 ASC]", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + assert_snapshot!( + actual, + @r" + SortPreservingMergeExec: [b@2 ASC, c@0 + new_a@1 ASC] + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); Ok(()) } @@ -1356,7 +1511,7 @@ fn test_sort_preserving_after_projection() -> Result<()> { fn test_union_after_projection() -> Result<()> { let csv = create_simple_csv_exec(); let union = Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv])); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ (Arc::new(Column::new("c", 2)), "c".to_string()), (Arc::new(Column::new("a", 0)), "new_a".to_string()), @@ -1365,29 +1520,38 @@ fn test_union_after_projection() -> Result<()> { union.clone(), )?) as _; - let initial = get_plan_string(&projection); - let expected_initial = [ - "ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " UnionExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(initial, expected_initial); + let initial = displayable(projection.as_ref()).indent(true).to_string(); + let actual = initial.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + UnionExec + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "UnionExec", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - " ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + assert_snapshot!( + actual, + @r" + UnionExec + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false + " + ); Ok(()) } @@ -1418,7 +1582,7 @@ fn test_partition_col_projection_pushdown() -> Result<()> { let source = partitioned_data_source(); let partitioned_schema = source.schema(); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ ( col("string_col", partitioned_schema.as_ref())?, @@ -1439,11 +1603,17 @@ fn test_partition_col_projection_pushdown() -> Result<()> { let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as partition_col, int_col@0 as int_col]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as partition_col, int_col@0 as int_col] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false + " + ); Ok(()) } @@ -1453,7 +1623,7 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> { let source = partitioned_data_source(); let partitioned_schema = source.schema(); - let projection = Arc::new(ProjectionExec::try_new( + let projection: Arc = Arc::new(ProjectionExec::try_new( vec![ ( col("string_col", partitioned_schema.as_ref())?, @@ -1479,11 +1649,17 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> { let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let expected = [ - "ProjectionExec: expr=[string_col@1 as string_col, CAST(partition_col@2 AS Utf8View) as partition_col, int_col@0 as int_col]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false" - ]; - assert_eq!(get_plan_string(&after_optimize), expected); + let after_optimize_string = displayable(after_optimize.as_ref()) + .indent(true) + .to_string(); + let actual = after_optimize_string.trim(); + assert_snapshot!( + actual, + @r" + ProjectionExec: expr=[string_col@1 as string_col, CAST(partition_col@2 AS Utf8View) as partition_col, int_col@0 as int_col] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, string_col, partition_col], file_type=csv, has_header=false + " + ); Ok(()) } diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index f7d68e5d899c..5d62ea4ccb20 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use insta::assert_snapshot; use std::sync::Arc; use crate::physical_optimizer::test_utils::{ @@ -398,14 +399,6 @@ fn assert_sanity_check(plan: &Arc, is_sane: bool) { ); } -/// Check if the plan we created is as expected by comparing the plan -/// formatted as a string. -fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { - let plan_str = displayable(plan).indent(true).to_string(); - let actual_lines: Vec<&str> = plan_str.trim().lines().collect(); - assert_eq!(actual_lines, expected_lines); -} - #[tokio::test] /// Tests that plan is valid when the sort requirements are satisfied. async fn test_bounded_window_agg_sort_requirement() -> Result<()> { @@ -422,11 +415,16 @@ async fn test_bounded_window_agg_sort_requirement() -> Result<()> { .into(); let sort = sort_exec(ordering.clone(), source); let bw = bounded_window_exec("c9", ordering, sort); - assert_plan(bw.as_ref(), vec![ - "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]", - " DataSourceExec: partitions=1, partition_sizes=[0]" - ]); + let plan_str = displayable(bw.as_ref()).indent(true).to_string(); + let actual = plan_str.trim(); + assert_snapshot!( + actual, + @r#" + BoundedWindowAggExec: wdw=[count: Ok(Field { name: "count", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted] + SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false] + DataSourceExec: partitions=1, partition_sizes=[0] + "# + ); assert_sanity_check(&bw, true); Ok(()) } @@ -445,10 +443,15 @@ async fn test_bounded_window_agg_no_sort_requirement() -> Result<()> { }, )]; let bw = bounded_window_exec("c9", sort_exprs, source); - assert_plan(bw.as_ref(), vec![ - "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " DataSourceExec: partitions=1, partition_sizes=[0]" - ]); + let plan_str = displayable(bw.as_ref()).indent(true).to_string(); + let actual = plan_str.trim(); + assert_snapshot!( + actual, + @r#" + BoundedWindowAggExec: wdw=[count: Ok(Field { name: "count", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted] + DataSourceExec: partitions=1, partition_sizes=[0] + "# + ); // Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check. assert_sanity_check(&bw, false); Ok(()) @@ -462,12 +465,14 @@ async fn test_global_limit_single_partition() -> Result<()> { let source = memory_exec(&schema); let limit = global_limit_exec(source, 0, Some(100)); - assert_plan( - limit.as_ref(), - vec![ - "GlobalLimitExec: skip=0, fetch=100", - " DataSourceExec: partitions=1, partition_sizes=[0]", - ], + let plan_str = displayable(limit.as_ref()).indent(true).to_string(); + let actual = plan_str.trim(); + assert_snapshot!( + actual, + @r" + GlobalLimitExec: skip=0, fetch=100 + DataSourceExec: partitions=1, partition_sizes=[0] + " ); assert_sanity_check(&limit, true); Ok(()) @@ -481,13 +486,15 @@ async fn test_global_limit_multi_partition() -> Result<()> { let source = memory_exec(&schema); let limit = global_limit_exec(repartition_exec(source), 0, Some(100)); - assert_plan( - limit.as_ref(), - vec![ - "GlobalLimitExec: skip=0, fetch=100", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[0]", - ], + let plan_str = displayable(limit.as_ref()).indent(true).to_string(); + let actual = plan_str.trim(); + assert_snapshot!( + actual, + @r" + GlobalLimitExec: skip=0, fetch=100 + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0] + " ); // Distribution requirement of the `GlobalLimitExec` is not satisfied. We expect to receive error during sanity check. assert_sanity_check(&limit, false); @@ -501,12 +508,14 @@ async fn test_local_limit() -> Result<()> { let source = memory_exec(&schema); let limit = local_limit_exec(source, 100); - assert_plan( - limit.as_ref(), - vec![ - "LocalLimitExec: fetch=100", - " DataSourceExec: partitions=1, partition_sizes=[0]", - ], + let plan_str = displayable(limit.as_ref()).indent(true).to_string(); + let actual = plan_str.trim(); + assert_snapshot!( + actual, + @r" + LocalLimitExec: fetch=100 + DataSourceExec: partitions=1, partition_sizes=[0] + " ); assert_sanity_check(&limit, true); Ok(()) @@ -540,17 +549,19 @@ async fn test_sort_merge_join_satisfied() -> Result<()> { let join_ty = JoinType::Inner; let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: partitions=1, partition_sizes=[0]", - ], + let plan_str = displayable(smj.as_ref()).indent(true).to_string(); + let actual = plan_str.trim(); + assert_snapshot!( + actual, + @r" + SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)] + RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1 + SortExec: expr=[c9@0 ASC], preserve_partitioning=[false] + DataSourceExec: partitions=1, partition_sizes=[0] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: partitions=1, partition_sizes=[0] + " ); assert_sanity_check(&smj, true); Ok(()) @@ -588,16 +599,18 @@ async fn test_sort_merge_join_order_missing() -> Result<()> { let join_ty = JoinType::Inner; let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[0]", - ], + let plan_str = displayable(smj.as_ref()).indent(true).to_string(); + let actual = plan_str.trim(); + assert_snapshot!( + actual, + @r" + SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)] + RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1 + SortExec: expr=[c9@0 ASC], preserve_partitioning=[false] + DataSourceExec: partitions=1, partition_sizes=[0] + RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[0] + " ); // Order requirement for the `SortMergeJoin` is not satisfied for right child. We expect to receive error during sanity check. assert_sanity_check(&smj, false); @@ -634,17 +647,19 @@ async fn test_sort_merge_join_dist_missing() -> Result<()> { let join_ty = JoinType::Inner; let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: partitions=1, partition_sizes=[0]", - ], + let plan_str = displayable(smj.as_ref()).indent(true).to_string(); + let actual = plan_str.trim(); + assert_snapshot!( + actual, + @r" + SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)] + RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1 + SortExec: expr=[c9@0 ASC], preserve_partitioning=[false] + DataSourceExec: partitions=1, partition_sizes=[0] + RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + DataSourceExec: partitions=1, partition_sizes=[0] + " ); // Distribution requirement for the `SortMergeJoin` is not satisfied for right child (has round-robin partitioning). We expect to receive error during sanity check. assert_sanity_check(&smj, false); diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index c91a70989be4..7fb0f795f294 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -509,13 +509,6 @@ pub fn check_integrity(context: PlanContext) -> Result Vec<&str> { - plan.split('\n') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect() -} - // construct a stream partition for test purposes #[derive(Debug)] pub struct TestStreamPartition { @@ -629,25 +622,15 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec) -> Physica PhysicalGroupBy::new_single(group_by_expr.clone()) } -pub fn assert_plan_matches_expected( - plan: &Arc, - expected: &[&str], -) -> Result<()> { - let expected_lines: Vec<&str> = expected.to_vec(); +pub fn get_optimized_plan(plan: &Arc) -> Result { let config = ConfigOptions::new(); let optimized = LimitedDistinctAggregation::new().optimize(Arc::clone(plan), &config)?; let optimized_result = displayable(optimized.as_ref()).indent(true).to_string(); - let actual_lines = trim_plan_display(&optimized_result); - - assert_eq!( - &expected_lines, &actual_lines, - "\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n" - ); - Ok(()) + Ok(optimized_result) } /// Describe the type of aggregate being tested diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 729542d27e3f..fbe7e3a00f54 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use insta::assert_snapshot; + use datafusion::assert_batches_eq; use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use datafusion::test_util::register_unbounded_file_with_ordering; @@ -62,28 +64,21 @@ async fn join_change_in_planner() -> Result<()> { let dataframe = ctx.sql(sql).await?; let physical_plan = dataframe.create_physical_plan().await?; let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let expected = { - [ - "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - // " DataSourceExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], file_type=csv, has_header=false", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - // " DataSourceExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], file_type=csv, has_header=false" - ] - }; - let mut actual: Vec<&str> = formatted.trim().lines().collect(); - // Remove CSV lines - actual.remove(4); - actual.remove(7); + let actual = formatted.trim(); - assert_eq!( - expected, - actual[..], - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + assert_snapshot!( + actual, + @r" + SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10 + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] + " ); Ok(()) } @@ -130,28 +125,21 @@ async fn join_no_order_on_filter() -> Result<()> { let dataframe = ctx.sql(sql).await?; let physical_plan = dataframe.create_physical_plan().await?; let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let expected = { - [ - "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a3@0 AS Int64) > CAST(a3@1 AS Int64) + 3 AND CAST(a3@0 AS Int64) < CAST(a3@1 AS Int64) + 10", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - // " DataSourceExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], file_type=csv, has_header=false", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - // " DataSourceExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], file_type=csv, has_header=false" - ] - }; - let mut actual: Vec<&str> = formatted.trim().lines().collect(); - // Remove CSV lines - actual.remove(4); - actual.remove(7); + let actual = formatted.trim(); - assert_eq!( - expected, - actual[..], - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + assert_snapshot!( + actual, + @r" + SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a3@0 AS Int64) > CAST(a3@1 AS Int64) + 3 AND CAST(a3@0 AS Int64) < CAST(a3@1 AS Int64) + 10 + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a1, a2, a3], infinite_source=true, output_ordering=[a1@0 ASC NULLS LAST] + " ); Ok(()) } @@ -180,28 +168,21 @@ async fn join_change_in_planner_without_sort() -> Result<()> { let dataframe = ctx.sql(sql).await?; let physical_plan = dataframe.create_physical_plan().await?; let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let expected = { - [ - "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - // " DataSourceExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], file_type=csv, has_header=false", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - // " DataSourceExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], file_type=csv, has_header=false" - ] - }; - let mut actual: Vec<&str> = formatted.trim().lines().collect(); - // Remove CSV lines - actual.remove(4); - actual.remove(7); + let actual = formatted.trim(); - assert_eq!( - expected, - actual[..], - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + assert_snapshot!( + actual, + @r" + SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10 + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a1, a2], infinite_source=true + " ); Ok(()) }