diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index b78b0ab77d3f6..bc202e4d53b5c 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -453,7 +453,7 @@ impl TestConfig { &self, expected_lines: &[&str], plan: Arc, - optimizers_to_run: Vec, + optimizers_to_run: &[Run], ) -> Result> { let expected_lines: Vec<&str> = expected_lines.to_vec(); @@ -632,16 +632,8 @@ fn multi_hash_joins() -> Result<()> { }; let test_config = TestConfig::default(); - test_config.run( - &expected, - top_join.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - &expected, - top_join, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?; } JoinType::RightSemi | JoinType::RightAnti => {} } @@ -706,16 +698,8 @@ fn multi_hash_joins() -> Result<()> { }; let test_config = TestConfig::default(); - test_config.run( - &expected, - top_join.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - &expected, - top_join, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?; } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {} } @@ -772,16 +756,8 @@ fn multi_joins_after_alias() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run( - expected, - top_join.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - top_join, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?; // Join on (a2 == c) let top_join_on = vec![( @@ -807,16 +783,8 @@ fn multi_joins_after_alias() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run( - expected, - top_join.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - top_join, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -870,16 +838,8 @@ fn multi_joins_after_multi_alias() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - top_join.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - top_join, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, top_join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -919,16 +879,8 @@ fn join_after_agg_alias() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run( - expected, - join.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - join, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -981,16 +933,8 @@ fn hash_join_key_ordering() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run( - expected, - join.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - join, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1116,16 +1060,8 @@ fn multi_hash_join_key_ordering() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run( - expected, - filter_top_join.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - filter_top_join, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, filter_top_join.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, filter_top_join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1508,7 +1444,7 @@ fn multi_smj_joins() -> Result<()> { ], }; // TODO(wiedld): show different test result if enforce sorting first. - test_config.run(&expected, top_join.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs @@ -1566,7 +1502,7 @@ fn multi_smj_joins() -> Result<()> { test_config.run( &expected_first_sort_enforcement, top_join, - SORT_DISTRIB_DISTRIB.into(), + &SORT_DISTRIB_DISTRIB, )?; match join_type { @@ -1625,11 +1561,7 @@ fn multi_smj_joins() -> Result<()> { _ => unreachable!() }; // TODO(wiedld): show different test result if enforce sorting first. - test_config.run( - &expected, - top_join.clone(), - DISTRIB_DISTRIB_SORT.into(), - )?; + test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 of them preserves order) and 3 SortExecs @@ -1680,7 +1612,7 @@ fn multi_smj_joins() -> Result<()> { test_config.run( &expected_first_sort_enforcement, top_join, - SORT_DISTRIB_DISTRIB.into(), + &SORT_DISTRIB_DISTRIB, )?; } _ => {} @@ -1760,7 +1692,7 @@ fn smj_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - test_config.run(expected, join.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected, join.clone(), &DISTRIB_DISTRIB_SORT)?; // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ @@ -1787,11 +1719,7 @@ fn smj_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - test_config.run( - expected_first_sort_enforcement, - join, - SORT_DISTRIB_DISTRIB.into(), - )?; + test_config.run(expected_first_sort_enforcement, join, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1825,7 +1753,7 @@ fn merge_does_not_need_sort() -> Result<()> { " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run(expected, exec.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected, exec.clone(), &DISTRIB_DISTRIB_SORT)?; // Test: result IS DIFFERENT, if EnforceSorting is run first: // @@ -1839,11 +1767,7 @@ fn merge_does_not_need_sort() -> Result<()> { " CoalesceBatchesExec: target_batch_size=4096", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - test_config.run( - expected_first_sort_enforcement, - exec, - SORT_DISTRIB_DISTRIB.into(), - )?; + test_config.run(expected_first_sort_enforcement, exec, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1886,16 +1810,8 @@ fn union_to_interleave() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1941,16 +1857,8 @@ fn union_not_to_interleave() -> Result<()> { // TestConfig: Prefer existing union. let test_config = TestConfig::default().with_prefer_existing_union(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1969,16 +1877,8 @@ fn added_repartition_to_single_partition() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - &expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - &expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(&expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -1998,16 +1898,8 @@ fn repartition_deepest_node() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2027,16 +1919,8 @@ fn repartition_unsorted_limit() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2059,16 +1943,8 @@ fn repartition_sorted_limit() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2096,16 +1972,8 @@ fn repartition_sorted_limit_with_filter() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2136,16 +2004,8 @@ fn repartition_ignores_limit() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2165,16 +2025,8 @@ fn repartition_ignores_union() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2196,16 +2048,8 @@ fn repartition_through_sort_preserving_merge() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2232,7 +2076,7 @@ fn repartition_ignores_sort_preserving_merge() -> Result<()> { " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ @@ -2240,11 +2084,7 @@ fn repartition_ignores_sort_preserving_merge() -> Result<()> { " CoalescePartitionsExec", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - test_config.run( - expected_first_sort_enforcement, - plan, - SORT_DISTRIB_DISTRIB.into(), - )?; + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2270,7 +2110,7 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; // test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ @@ -2280,11 +2120,7 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - test_config.run( - expected_first_sort_enforcement, - plan, - SORT_DISTRIB_DISTRIB.into(), - )?; + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2315,16 +2151,8 @@ fn repartition_does_not_destroy_sort() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2366,16 +2194,8 @@ fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2409,7 +2229,7 @@ fn repartition_transitively_with_projection() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ @@ -2420,11 +2240,7 @@ fn repartition_transitively_with_projection() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - test_config.run( - expected_first_sort_enforcement, - plan, - SORT_DISTRIB_DISTRIB.into(), - )?; + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2458,16 +2274,8 @@ fn repartition_ignores_transitively_with_projection() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2501,16 +2309,8 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2534,7 +2334,7 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ @@ -2545,11 +2345,7 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - test_config.run( - expected_first_sort_enforcement, - plan, - SORT_DISTRIB_DISTRIB.into(), - )?; + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2587,7 +2383,7 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ @@ -2598,11 +2394,7 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - test_config.run( - expected_first_sort_enforcement, - plan, - SORT_DISTRIB_DISTRIB.into(), - )?; + test_config.run(expected_first_sort_enforcement, plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2627,13 +2419,9 @@ fn parallelization_single_partition() -> Result<()> { test_config.run( &expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - &expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; // Test: with csv let expected_csv = [ @@ -2642,16 +2430,8 @@ fn parallelization_single_partition() -> Result<()> { " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - test_config.run( - &expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - &expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2684,12 +2464,12 @@ fn parallelization_multiple_files() -> Result<()> { test_config_concurrency_3.run( &expected_with_3_target_partitions, // same if distribution enforced before sort. plan.clone(), - DISTRIB_DISTRIB_SORT.into(), + &DISTRIB_DISTRIB_SORT, )?; test_config_concurrency_3.run( &expected_with_3_target_partitions, plan.clone(), - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &SORT_DISTRIB_DISTRIB, )?; let expected_with_8_target_partitions = [ @@ -2701,12 +2481,12 @@ fn parallelization_multiple_files() -> Result<()> { test_config_concurrency_8.run( &expected_with_8_target_partitions, plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + &DISTRIB_DISTRIB_SORT, )?; test_config_concurrency_8.run( &expected_with_8_target_partitions, plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &SORT_DISTRIB_DISTRIB, )?; Ok(()) @@ -2760,16 +2540,8 @@ fn parallelization_compressed_csv() -> Result<()> { let test_config = TestConfig::default() .with_query_execution_partitions(2) .with_prefer_repartition_file_scans(10); - test_config.run( - expected, - plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; } Ok(()) } @@ -2795,13 +2567,9 @@ fn parallelization_two_partitions() -> Result<()> { test_config.run( &expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - &expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; // Test: with csv let expected_csv = [ @@ -2811,16 +2579,8 @@ fn parallelization_two_partitions() -> Result<()> { // Plan already has two partitions " DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - test_config.run( - &expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - &expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2846,13 +2606,9 @@ fn parallelization_two_partitions_into_four() -> Result<()> { test_config.run( &expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - &expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(&expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; // Test: with csv let expected_csv = [ @@ -2862,16 +2618,8 @@ fn parallelization_two_partitions_into_four() -> Result<()> { // Multiple source files splitted across partitions " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - test_config.run( - &expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - &expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(&expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(&expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2900,13 +2648,9 @@ fn parallelization_sorted_limit() -> Result<()> { test_config.run( expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; // Test: with csv let expected_csv = &[ @@ -2917,16 +2661,8 @@ fn parallelization_sorted_limit() -> Result<()> { // Doesn't parallelize for SortExec without preserve_partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - test_config.run( - expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -2963,13 +2699,9 @@ fn parallelization_limit_with_filter() -> Result<()> { test_config.run( expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; // Test: with csv let expected_csv = &[ @@ -2984,16 +2716,8 @@ fn parallelization_limit_with_filter() -> Result<()> { // SortExec doesn't benefit from input partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - test_config.run( - expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3030,13 +2754,9 @@ fn parallelization_ignores_limit() -> Result<()> { test_config.run( expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; // Test: with csv let expected_csv = &[ @@ -3055,16 +2775,8 @@ fn parallelization_ignores_limit() -> Result<()> { " LocalLimitExec: fetch=100", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - test_config.run( - expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3089,13 +2801,9 @@ fn parallelization_union_inputs() -> Result<()> { test_config.run( expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; // Test: with csv let expected_csv = &[ @@ -3107,16 +2815,8 @@ fn parallelization_union_inputs() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - test_config.run( - expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3148,28 +2848,16 @@ fn parallelization_prior_to_sort_preserving_merge() -> Result<()> { test_config.run( expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; // Test: with csv let expected_csv = &[ "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - test_config.run( - expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3204,7 +2892,7 @@ fn parallelization_sort_preserving_merge_with_union() -> Result<()> { test_config.run( expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), + &DISTRIB_DISTRIB_SORT, )?; let expected_parquet_first_sort_enforcement = &[ // no SPM @@ -3218,7 +2906,7 @@ fn parallelization_sort_preserving_merge_with_union() -> Result<()> { test_config.run( expected_parquet_first_sort_enforcement, plan_parquet, - SORT_DISTRIB_DISTRIB.into(), + &SORT_DISTRIB_DISTRIB, )?; // Test: with csv @@ -3228,7 +2916,7 @@ fn parallelization_sort_preserving_merge_with_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - test_config.run(expected_csv, plan_csv.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; let expected_csv_first_sort_enforcement = &[ // no SPM "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", @@ -3241,7 +2929,7 @@ fn parallelization_sort_preserving_merge_with_union() -> Result<()> { test_config.run( expected_csv_first_sort_enforcement, plan_csv.clone(), - SORT_DISTRIB_DISTRIB.into(), + &SORT_DISTRIB_DISTRIB, )?; Ok(()) @@ -3276,29 +2964,17 @@ fn parallelization_does_not_benefit() -> Result<()> { test_config.run( expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; // Test: with csv let expected_csv = &[ "SortRequiredExec: [c@2 ASC]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - test_config.run( - expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3342,13 +3018,9 @@ fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> test_config.run( expected_parquet, plan_parquet.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_parquet, - plan_parquet, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + &DISTRIB_DISTRIB_SORT, )?; + test_config.run(expected_parquet, plan_parquet, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3389,16 +3061,8 @@ fn parallelization_ignores_transitively_with_projection_csv() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; let test_config = TestConfig::default(); - test_config.run( - expected_csv, - plan_csv.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected_csv, - plan_csv, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected_csv, plan_csv.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected_csv, plan_csv, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3424,16 +3088,8 @@ fn remove_redundant_roundrobins() -> Result<()> { ]; let test_config = TestConfig::default(); - test_config.run( - expected, - physical_plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - physical_plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3462,16 +3118,8 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> { " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - test_config.run( - expected, - physical_plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - physical_plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3496,16 +3144,8 @@ fn preserve_ordering_through_repartition() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - test_config.run( - expected, - physical_plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - physical_plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3530,7 +3170,7 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - test_config.run(expected, physical_plan.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ @@ -3543,7 +3183,7 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { test_config.run( expected_first_sort_enforcement, physical_plan, - SORT_DISTRIB_DISTRIB.into(), + &SORT_DISTRIB_DISTRIB, )?; Ok(()) @@ -3568,16 +3208,8 @@ fn no_need_for_sort_after_filter() -> Result<()> { " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run( - expected, - physical_plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - physical_plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3607,7 +3239,7 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - test_config.run(expected, physical_plan.clone(), DISTRIB_DISTRIB_SORT.into())?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ @@ -3621,7 +3253,7 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { test_config.run( expected_first_sort_enforcement, physical_plan, - SORT_DISTRIB_DISTRIB.into(), + &SORT_DISTRIB_DISTRIB, )?; Ok(()) @@ -3643,16 +3275,8 @@ fn do_not_preserve_ordering_through_repartition3() -> Result<()> { " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run( - expected, - physical_plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - physical_plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3751,16 +3375,8 @@ fn do_not_add_unnecessary_hash() -> Result<()> { " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - test_config.run( - expected, - physical_plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - physical_plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3792,16 +3408,8 @@ fn do_not_add_unnecessary_hash2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - test_config.run( - expected, - physical_plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - physical_plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3820,16 +3428,8 @@ fn optimize_away_unnecessary_repartition() -> Result<()> { &["DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet"]; let test_config = TestConfig::default(); - test_config.run( - expected, - physical_plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - physical_plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) } @@ -3856,16 +3456,8 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let test_config = TestConfig::default(); - test_config.run( - expected, - physical_plan.clone(), - DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. - )?; - test_config.run( - expected, - physical_plan, - SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. - )?; + test_config.run(expected, physical_plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, physical_plan, &SORT_DISTRIB_DISTRIB)?; Ok(()) }