Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,35 @@ fn update_sort_ctx_children_data(
/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data
/// attribute stores whether the plan is a `CoalescePartitionsExec` or is
/// connected to a `CoalescePartitionsExec` via its children.
///
/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
///
/// This requires a bottom-up traversal was previously performed, updating the
/// children previously.
pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;

/// Discovers the linked Coalesce->Sort cascades.
///
/// This linkage is used in [`remove_bottleneck_in_subplan`] to selectively
/// remove the linked coalesces in the subplan. Then afterwards, an SPM is added
/// at the root of the subplan (just after the sort) in order to parallelize sorts.
/// Refer to the [`parallelize_sorts`] for more details on sort parallelization.
///
/// Example of linked Coalesce->Sort:
/// ```text
/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
/// ...nodes... ctx.data=true (e.g. are linked in cascade)
/// Coalesce ctx.data=true (e.g. is a coalesce)
/// ```
///
/// The link should not be continued (and the coalesce not removed) if the distribution
/// is changed between the Coalesce->Sort cascade. Example:
/// ```text
/// SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
/// AggregateExec ctx.data=false, to stop the link
/// ...nodes... ctx.data=true (e.g. are linked in cascade)
/// Coalesce ctx.data=true (e.g. is a coalesce)
/// ```
fn update_coalesce_ctx_children(
coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
) {
Expand Down Expand Up @@ -316,8 +343,19 @@ fn replace_with_partial_sort(
/// are transformed into
/// ```text
/// "SortPreservingMergeExec: \[a@0 ASC\]",
/// " ...nodes..."
/// " SortExec: expr=\[a@0 ASC\]",
/// " SortExec: expr=\[a@0 ASC\]",
/// " ...nodes..."
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
/// By performing sorting in parallel, we can increase performance in some scenarios.
///
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
/// which require single partitioning. Do not parallelize when the following scenario occurs:
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
/// " ...nodes requiring single partitioning..."
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
pub fn parallelize_sorts(
Expand Down