Skip to content

Commit

Permalink
chore: add config option for allowing bounded use of sort-preserving …
Browse files Browse the repository at this point in the history
…operators (#7164)
  • Loading branch information
Christopher M. Wolff committed Aug 1, 2023
1 parent 82a809b commit cca5225
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 4 deletions.
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,12 @@ config_namespace! {
/// ```
pub repartition_sorts: bool, default = true

/// When true, DataFusion will opportunistically remove sorts by replacing
/// `RepartitionExec` with `SortPreservingRepartitionExec`, and
/// `CoalescePartitionsExec` with `SortPreservingMergeExec`,
/// even when the query is bounded.
pub bounded_order_preserving_variants: bool, default = false

/// When set to true, the logical plan optimizer will produce warning
/// messages if any optimization rules produce errors and then proceed to the next
/// rule. When set to false, any rules that produce errors will cause the query to fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use super::utils::is_repartition;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_physical_expr::utils::ordering_satisfy;

Expand Down Expand Up @@ -204,6 +205,10 @@ fn get_updated_plan(
/// If this replacement is helpful for removing a `SortExec`, it updates the plan.
/// Otherwise, it leaves the plan unchanged.
///
/// Note: this optimizer sub-rule will only produce `SortPreservingRepartitionExec`s
/// if the query is bounded or if the config option `bounded_order_preserving_variants`
/// is set to `true`.
///
/// The algorithm flow is simply like this:
/// 1. Visit nodes of the physical plan bottom-up and look for `SortExec` nodes.
/// 1_1. During the traversal, build an `ExecTree` to keep track of operators
Expand Down Expand Up @@ -232,6 +237,7 @@ pub(crate) fn replace_with_order_preserving_variants(
// a `SortExec` from the plan. If this flag is `false`, this replacement
// should only be made to fix the pipeline (streaming).
is_spm_better: bool,
config: &ConfigOptions,
) -> Result<Transformed<OrderPreservationContext>> {
let plan = &requirements.plan;
let ordering_onwards = &requirements.ordering_onwards;
Expand All @@ -243,11 +249,13 @@ pub(crate) fn replace_with_order_preserving_variants(
};
// For unbounded cases, replace with the order-preserving variant in
// any case, as doing so helps fix the pipeline.
let is_unbounded = unbounded_output(plan);
// Also do the replacement if opted-in via config options.
let use_order_preserving_variant =
config.optimizer.bounded_order_preserving_variants || unbounded_output(plan);
let updated_sort_input = get_updated_plan(
exec_tree,
is_spr_better || is_unbounded,
is_spm_better || is_unbounded,
is_spr_better || use_order_preserving_variant,
is_spm_better || use_order_preserving_variant,
)?;
// If this sort is unnecessary, we should remove it and update the plan:
if ordering_satisfy(
Expand All @@ -270,6 +278,8 @@ pub(crate) fn replace_with_order_preserving_variants(
mod tests {
use super::*;

use crate::prelude::SessionConfig;

use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
Expand Down Expand Up @@ -299,8 +309,17 @@ mod tests {
/// `$EXPECTED_PLAN_LINES`: input plan
/// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan
/// `$PLAN`: the plan to optimized
/// `$ALLOW_BOUNDED`: whether to allow the plan to be optimized for bounded cases
macro_rules! assert_optimized {
($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
assert_optimized!(
$EXPECTED_PLAN_LINES,
$EXPECTED_OPTIMIZED_PLAN_LINES,
$PLAN,
false
);
};
($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $ALLOW_BOUNDED: expr) => {
let physical_plan = $PLAN;
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand All @@ -317,8 +336,9 @@ mod tests {

// Run the rule top-down
// let optimized_physical_plan = physical_plan.transform_down(&replace_repartition_execs)?;
let config = SessionConfig::new().with_bounded_order_preserving_variants($ALLOW_BOUNDED);
let plan_with_pipeline_fixer = OrderPreservationContext::new(physical_plan);
let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false))?;
let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options()))?;
let optimized_physical_plan = parallel.plan;

// Get string representation of the plan
Expand Down Expand Up @@ -751,6 +771,34 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_with_bounded_input() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, false);
let repartition = repartition_exec_hash(repartition_exec_round_robin(source));
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);

let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);

let expected_input = vec![
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}

// End test cases
// Start test helpers

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
plan_with_pipeline_fixer,
false,
true,
config,
)
})?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ datafusion.execution.time_zone +00:00
datafusion.explain.logical_plan_only false
datafusion.explain.physical_plan_only false
datafusion.optimizer.allow_symmetric_joins_without_pruning true
datafusion.optimizer.bounded_order_preserving_variants false
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.hash_join_single_partition_threshold 1048576
Expand Down
13 changes: 13 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ impl SessionConfig {
self.options.optimizer.repartition_sorts
}

/// Remove sorts by replacing with order-preserving variants of operators,
/// even when query is bounded?
pub fn bounded_order_preserving_variants(&self) -> bool {
self.options.optimizer.bounded_order_preserving_variants
}

/// Are statistics collected during execution?
pub fn collect_statistics(&self) -> bool {
self.options.execution.collect_statistics
Expand Down Expand Up @@ -215,6 +221,13 @@ impl SessionConfig {
self
}

/// Enables or disables the use of order-preserving variants of `CoalescePartitions`
/// and `RepartitionExec` operators, even when the query is bounded
pub fn with_bounded_order_preserving_variants(mut self, enabled: bool) -> Self {
self.options.optimizer.bounded_order_preserving_variants = enabled;
self
}

/// Enables or disables the use of pruning predicate for parquet readers to skip row groups
pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.pruning = enabled;
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. |
| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level |
| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` |
| datafusion.optimizer.bounded_order_preserving_variants | false | When true, DataFusion will opportunistically remove sorts by replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is bounded. |
| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail |
| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan |
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |
Expand Down

0 comments on commit cca5225

Please sign in to comment.