diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 11f1d8751d83c..20733b65692fc 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -138,8 +138,35 @@ fn update_sort_ctx_children_data( /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data /// attribute stores whether the plan is a `CoalescePartitionsExec` or is /// connected to a `CoalescePartitionsExec` via its children. +/// +/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce). +/// +/// This requires a bottom-up traversal was previously performed, updating the +/// children previously. pub type PlanWithCorrespondingCoalescePartitions = PlanContext; +/// Discovers the linked Coalesce->Sort cascades. +/// +/// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively +/// remove the linked coalesces in the subplan. Then afterwards, an SPM is added +/// at the root of the subplan (just after the sort) in order to parallelize sorts. +/// Refer to the [`parallelize_sorts`] for more details on sort parallelization. +/// +/// Example of linked Coalesce->Sort: +/// ```text +/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan) +/// ...nodes... ctx.data=true (e.g. are linked in cascade) +/// Coalesce ctx.data=true (e.g. is a coalesce) +/// ``` +/// +/// The link should not be continued (and the coalesce not removed) if the distribution +/// is changed between the Coalesce->Sort cascade. Example: +/// ```text +/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan) +/// AggregateExec ctx.data=false, to stop the link +/// ...nodes... ctx.data=true (e.g. are linked in cascade) +/// Coalesce ctx.data=true (e.g. is a coalesce) +/// ``` fn update_coalesce_ctx_children( coalesce_context: &mut PlanWithCorrespondingCoalescePartitions, ) { @@ -316,8 +343,19 @@ fn replace_with_partial_sort( /// are transformed into /// ```text /// "SortPreservingMergeExec: \[a@0 ASC\]", -/// " ...nodes..." -/// " SortExec: expr=\[a@0 ASC\]", +/// " SortExec: expr=\[a@0 ASC\]", +/// " ...nodes..." +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// ``` +/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. +/// By performing sorting in parallel, we can increase performance in some scenarios. +/// +/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`] +/// which require single partitioning. Do not parallelize when the following scenario occurs: +/// ```text +/// "SortExec: expr=\[a@0 ASC\]", +/// " ...nodes requiring single partitioning..." +/// " CoalescePartitionsExec", /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", /// ``` pub fn parallelize_sorts(