Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,7 @@ async fn test_remove_unnecessary_sort7() -> Result<()> {
) as Arc<dyn ExecutionPlan>;

let expected_input = [
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false], sort_prefix=[non_nullable_col@1 ASC]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice and clear addition to the explain

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just keep the common prefix count, it will simplify the displays too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Careful, the expr is unnormalized, the sort_prefix is normalized. I agree this is a bit confusing, but normalized_common_sort_prefix seems a bit too verbose. Any suggestions?

" SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]",
" DataSourceExec: partitions=1, partition_sizes=[0]",
];
Expand Down
59 changes: 44 additions & 15 deletions datafusion/physical-expr/src/equivalence/properties/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,22 +546,26 @@ impl EquivalenceProperties {
self.ordering_satisfy_requirement(&sort_requirements)
}

/// Checks whether the given sort requirements are satisfied by any of the
/// existing orderings.
pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
let mut eq_properties = self.clone();
// First, standardize the given requirement:
let normalized_reqs = eq_properties.normalize_sort_requirements(reqs);

/// Returns the number of consecutive requirements (starting from the left)
/// that are satisfied by the plan ordering.
fn compute_common_sort_prefix_length(
&self,
normalized_reqs: &LexRequirement,
) -> usize {
// Check whether given ordering is satisfied by constraints first
if self.satisfied_by_constraints(&normalized_reqs) {
return true;
if self.satisfied_by_constraints(normalized_reqs) {
// If the constraints satisfy all requirements, return the full normalized requirements length
return normalized_reqs.len();
}

for normalized_req in normalized_reqs {
let mut eq_properties = self.clone();

for (i, normalized_req) in normalized_reqs.iter().enumerate() {
// Check whether given ordering is satisfied
if !eq_properties.ordering_satisfy_single(&normalized_req) {
return false;
if !eq_properties.ordering_satisfy_single(normalized_req) {
// As soon as one requirement is not satisfied, return
// how many we've satisfied so far
return i;
}
// Treat satisfied keys as constants in subsequent iterations. We
// can do this because the "next" key only matters in a lexicographical
Expand All @@ -575,10 +579,35 @@ impl EquivalenceProperties {
// From the analysis above, we know that `[a ASC]` is satisfied. Then,
// we add column `a` as constant to the algorithm state. This enables us
// to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
eq_properties = eq_properties
.with_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
eq_properties = eq_properties.with_constants(std::iter::once(
ConstExpr::from(Arc::clone(&normalized_req.expr)),
));
}
true

// All requirements are satisfied.
normalized_reqs.len()
}

/// Determines the longest prefix of `reqs` that is satisfied by the existing ordering.
/// Returns that prefix as a new `LexRequirement`, and a boolean indicating if all the requirements are satisfied.
pub fn extract_common_sort_prefix(
&self,
reqs: &LexRequirement,
) -> (LexRequirement, bool) {
// First, standardize the given requirement:
let normalized_reqs = self.normalize_sort_requirements(reqs);

let prefix_len = self.compute_common_sort_prefix_length(&normalized_reqs);
(
LexRequirement::new(normalized_reqs[..prefix_len].to_vec()),
prefix_len == normalized_reqs.len(),
)
}

/// Checks whether the given sort requirements are satisfied by any of the
/// existing orderings.
pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
self.extract_common_sort_prefix(reqs).1
}

/// Checks if the sort requirements are satisfied by any of the table constraints (primary key or unique).
Expand Down
45 changes: 33 additions & 12 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,8 @@ pub struct SortExec {
preserve_partitioning: bool,
/// Fetch highest/lowest n results
fetch: Option<usize>,
/// Normalized common sort prefix between the input and the sort expressions (only used with fetch)
common_sort_prefix: LexOrdering,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, can we just keep the common prefix expr count of

    /// Sort expressions
    expr: LexOrdering

? I think it'll be more simplified, and avoiding duplication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can do that simply, as the common_sort_prefix is normalized, and the expr is not. It would mean we'd need to re-normalize expr everytime we need common_sort_prefix.

/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
}
Expand All @@ -975,13 +977,15 @@ impl SortExec {
/// sorted output partition.
pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
let preserve_partitioning = false;
let cache = Self::compute_properties(&input, expr.clone(), preserve_partitioning);
let (cache, sort_prefix) =
Self::compute_properties(&input, expr.clone(), preserve_partitioning);
Self {
expr,
input,
metrics_set: ExecutionPlanMetricsSet::new(),
preserve_partitioning,
fetch: None,
common_sort_prefix: sort_prefix,
cache,
}
}
Expand Down Expand Up @@ -1033,6 +1037,7 @@ impl SortExec {
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
common_sort_prefix: self.common_sort_prefix.clone(),
fetch,
cache,
}
Expand Down Expand Up @@ -1066,19 +1071,21 @@ impl SortExec {
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
/// It also returns the common sort prefix between the input and the sort expressions.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
sort_exprs: LexOrdering,
preserve_partitioning: bool,
) -> PlanProperties {
) -> (PlanProperties, LexOrdering) {
// Determine execution mode:
let requirement = LexRequirement::from(sort_exprs);
let sort_satisfied = input

let (sort_prefix, sort_satisfied) = input
.equivalence_properties()
.ordering_satisfy_requirement(&requirement);
.extract_common_sort_prefix(&requirement);

// The emission type depends on whether the input is already sorted:
// - If already sorted, we can emit results in the same way as the input
// - If already fully sorted, we can emit results in the same way as the input
// - If not sorted, we must wait until all data is processed to emit results (Final)
let emission_type = if sort_satisfied {
input.pipeline_behavior()
Expand Down Expand Up @@ -1114,11 +1121,14 @@ impl SortExec {
let output_partitioning =
Self::output_partitioning_helper(input, preserve_partitioning);

PlanProperties::new(
eq_properties,
output_partitioning,
emission_type,
boundedness,
(
PlanProperties::new(
eq_properties,
output_partitioning,
emission_type,
boundedness,
),
LexOrdering::from(sort_prefix),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice addition logic in this compute_properties function

)
}
}
Expand All @@ -1130,7 +1140,12 @@ impl DisplayAs for SortExec {
let preserve_partitioning = self.preserve_partitioning;
match self.fetch {
Some(fetch) => {
write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)
write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
if !self.common_sort_prefix.is_empty() {
write!(f, ", sort_prefix=[{}]", self.common_sort_prefix)
} else {
Ok(())
}
}
None => write!(f, "SortExec: expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr),
}
Expand Down Expand Up @@ -1203,10 +1218,12 @@ impl ExecutionPlan for SortExec {

trace!("End SortExec's input.execute for partition: {}", partition);

let requirement = &LexRequirement::from(self.expr.clone());

let sort_satisfied = self
.input
.equivalence_properties()
.ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
.ordering_satisfy_requirement(requirement);

match (sort_satisfied, self.fetch.as_ref()) {
(true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
Expand All @@ -1220,6 +1237,7 @@ impl ExecutionPlan for SortExec {
let mut topk = TopK::try_new(
partition,
input.schema(),
self.common_sort_prefix.clone(),
self.expr.clone(),
*fetch,
context.session_config().batch_size(),
Expand All @@ -1232,6 +1250,9 @@ impl ExecutionPlan for SortExec {
while let Some(batch) = input.next().await {
let batch = batch?;
topk.insert_batch(batch)?;
if topk.finished {
break;
}
}
topk.emit()
})
Expand Down
Loading