Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec<LexOrdering>) -> Arc<DataSource
.build()
}

fn projection_exec_with_alias(
pub(crate) fn projection_exec_with_alias(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
Expand Down
66 changes: 65 additions & 1 deletion datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

use std::sync::Arc;

use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias;
use crate::physical_optimizer::sanity_checker::{
assert_sanity_check, assert_sanity_check_err,
};
use crate::physical_optimizer::test_utils::{
aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec,
coalesce_partitions_exec, create_test_schema, create_test_schema2,
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec,
local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr,
local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_stats,
repartition_exec, schema, single_partitioned_aggregate, sort_exec, sort_expr,
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
union_exec, RequirementsTestExec,
Expand Down Expand Up @@ -3346,3 +3351,62 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_preserve_needed_coalesce() -> Result<()> {
// Input to EnforceSorting, from our test case.
let plan = projection_exec_with_alias(
union_exec(vec![parquet_exec_with_stats(); 2]),
vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "value".to_string()),
],
);
let plan = Arc::new(CoalescePartitionsExec::new(plan));
let schema = schema();
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}]);
let plan: Arc<dyn ExecutionPlan> =
single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
let plan = sort_exec(sort_key, plan);

// Starting plan: as in our test case.
assert_eq!(
get_plan_string(&plan),
vec![
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intentionally put SinglePartitioned aggregate mode? The input is only 1 partition, so why don't you use AggregateMode::Single? I'm unsure if this is a valid case @wiedld @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I update the mode as Single, there seems no problem to me

Copy link
Contributor Author

@wiedld wiedld Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking the time @berkaysynnada .

The test case merged here is a minimal reproducer. The mode=SinglePartitioned is coming from the actual plans we have running and failing in prod: influxdata#58 (comment)

A simplified view of it is:

"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"  AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
"    CoalescePartitionsExec",
"      ProjectionExec: expr=[a@0 as a, b@1 as value]",
"        UnionExec",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet"

The coalesce gets removed for the mode=SinglePartitioned. Do you think that the bug is elsewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    /// Applies the entire logical aggregation operation in a single operator,
    /// as opposed to Partial / Final modes which apply the logical aggregation using
    /// two operators.
    ///
    /// This mode requires that the input is partitioned by group key (like
    /// FinalPartitioned)
    SinglePartitioned,

It means that when an AggregateExec is SinglePartitioned, then its input should be multi-partitioned (I should say the wording is a bit confusing -- SinglePartitioned means Single layer of aggregation, but its input is Partitioned). However, in your example plan, AggregateExec has a single partition (as it is above CoalescePartitions). So, what I was trying to say that your initial plan should cannot ever exists in general conditions. I'd also like to remind that; EnforceSorting can handle invalid plans and make them valid in terms of ordering conditions (and expects valid distribution conditions), and EnforceDistribution does the same for distribution conditions (and again expects valid ordering conditions). However, in this reproducer, you are giving an invalid plan to the EnforceSorting in terms of distribution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SinglePartitioned means Single layer of aggregation, but its input is Partitioned). However, in your example plan, AggregateExec has a single partition (as it is above CoalescePartitions).

I see -- so you are saying that if the input to the AggregateExec has only a single partitition, then it should never be SinglePartitioned.

So you are saying that the input plan is not valid and thus the error is elsewhere (whatever generated this plan).

I will make a PR to clarify the documentation as I agree it is quite confusing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@wiedld wiedld Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, in this reproducer, you are giving an invalid plan to the EnforceSorting in terms of distribution.

However, in your example plan, AggregateExec has a single partition (as it is above CoalescePartitions)

I think this means that the insertion of the coalesce in the first place (during the enforce distribution) is the bug. Since the input into the enforce distribution optimizer is:

     ...nodes...
        AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[sum(Value)]
          AggregateExec: mode=Partial, gby=[date_bin_wallclock(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time@0, 0) as time], aggr=[sum(Value)]
            SortExec: expr=[time@0 ASC NULLS LAST], preserve_partitioning=[false]
              ProjectionExec: expr=[time@1 as time, f@0 as Value]
                UnionExec. **multi-partitions**
                  ...multiple nodes...

Our Union outputs multiple partitions, and the AggregateExec;mode=Partial can take multiple partitions. Therefore the enforce distribution should not have inserted the coalesce?

I'll start by updating the reproducer test cases. Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a previous reproducer, I showed how the enforce distribution was inserting the coalesce.
See here: influxdata#58 (comment)

On the latest main, we no longer have the coalesce being inserted.
See here: #14949 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to revert this PR, with this test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our Union outputs multiple partitions, and the AggregateExec;mode=Partial can take multiple partitions. Therefore the enforce distribution should not have inserted the coalesce?

It might insert a coalesce, but then it has to update AggregateExec: mode=FinalPartitioned to AggregateExec: mode=Final

" CoalescePartitionsExec",
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
);
// Test: plan is valid.
assert_sanity_check(&plan, true);

// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
let optimizer = EnforceSorting::new();
let optimized = optimizer.optimize(plan, &Default::default())?;
assert_eq!(
get_plan_string(&optimized),
vec![
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
);

// Bug: Plan is now invalid.
let err = "does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)";
assert_sanity_check_err(&optimized, err);

Ok(())
}
10 changes: 9 additions & 1 deletion datafusion/core/tests/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ fn create_test_schema2() -> SchemaRef {
}

/// Check if sanity checker should accept or reject plans.
fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
pub(crate) fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
assert_eq!(
Expand All @@ -397,6 +397,14 @@ fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
);
}

/// Assert reason for sanity check failure.
pub(crate) fn assert_sanity_check_err(plan: &Arc<dyn ExecutionPlan>, err: &str) {
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
let error = sanity_checker.optimize(plan.clone(), &opts).unwrap_err();
assert!(error.message().contains(err));
}

/// Check if the plan we created is as expected by comparing the plan
/// formatted as a string.
fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) {
Expand Down
65 changes: 64 additions & 1 deletion datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::source::DataSourceExec;
use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
use datafusion_common::{JoinType, Result};
use datafusion_common::{ColumnStatistics, JoinType, Result, Statistics};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
Expand Down Expand Up @@ -102,6 +103,44 @@ pub fn schema() -> SchemaRef {
]))
}

fn int64_stats() -> ColumnStatistics {
ColumnStatistics {
null_count: Precision::Absent,
sum_value: Precision::Absent,
max_value: Precision::Exact(1_000_000.into()),
min_value: Precision::Exact(0.into()),
distinct_count: Precision::Absent,
}
}

fn column_stats() -> Vec<ColumnStatistics> {
vec![
int64_stats(), // a
int64_stats(), // b
int64_stats(), // c
ColumnStatistics::default(),
ColumnStatistics::default(),
]
}

/// Create parquet datasource exec using schema from [`schema`].
pub(crate) fn parquet_exec_with_stats() -> Arc<DataSourceExec> {
let mut statistics = Statistics::new_unknown(&schema());
statistics.num_rows = Precision::Inexact(10);
statistics.column_statistics = column_stats();

let config = FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema(),
Arc::new(ParquetSource::new(Default::default())),
)
.with_file(PartitionedFile::new("x".to_string(), 10000))
.with_statistics(statistics);
assert_eq!(config.statistics.num_rows, Precision::Inexact(10));

config.build()
}

pub fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
Expand Down Expand Up @@ -522,6 +561,30 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec<String>) -> Physica
PhysicalGroupBy::new_single(group_by_expr.clone())
}

pub(crate) fn single_partitioned_aggregate(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
let schema = schema();
let group_by = alias_pairs
.iter()
.map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string()))
.collect::<Vec<_>>();
let group_by = PhysicalGroupBy::new_single(group_by);

Arc::new(
AggregateExec::try_new(
AggregateMode::SinglePartitioned,
group_by,
vec![],
vec![],
input,
schema,
)
.unwrap(),
)
}

pub fn assert_plan_matches_expected(
plan: &Arc<dyn ExecutionPlan>,
expected: &[&str],
Expand Down