Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
(Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// However, because we also use the analyzer to canonicalize queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
// Must come before EliminateSubqueryAliases.
RemoveSubquerySorts,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
Expand Down Expand Up @@ -307,6 +309,32 @@ object RemoveRedundantProject extends Rule[LogicalPlan] {
}
}

/**
* Remove [[Sort]] in subqueries that do not affect the set of rows produced, only their
* order. Subqueries produce unordered sets of rows so sorting their output is unnecessary.
*/
object RemoveSubquerySorts extends Rule[LogicalPlan] {

/**
* Removes all [[Sort]] operators from a plan that are accessible from the root operator via
* 0 or more [[Project]], [[Filter]] or [[View]] operators.
*/
private def removeTopLevelSorts(plan: LogicalPlan): LogicalPlan = {
plan match {
case Sort(_, _, child) => removeTopLevelSorts(child)
case Project(fields, child) => Project(fields, removeTopLevelSorts(child))
case Filter(condition, child) => Filter(condition, removeTopLevelSorts(child))
case View(tbl, output, child) => View(tbl, output, removeTopLevelSorts(child))
case _ => plan
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Subquery(child) => Subquery(removeTopLevelSorts(child))
case SubqueryAlias(name, child) => SubqueryAlias(name, removeTopLevelSorts(child))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SubqueryAlias is not the subquery you want. This is just an alias of a query/table/view. For example,

Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str_sort").orderBy('int.asc).as('df1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've been trying to understand the role of Subquery and SubqueryAlias. My confusion is that subqueries do seem to get planned as SubqueryAlias operators, e.g.:

scala> spark.sql("SELECT count(*) from (SELECT id FROM dft ORDER BY id)").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('count(1), None)]
+- 'SubqueryAlias __auto_generated_subquery_name
   +- 'Sort ['id ASC NULLS FIRST], true
      +- 'Project ['id]
         +- 'UnresolvedRelation `dft`

In the example you give I (personally) think it's still reasonable to drop the ordering, but understand that might surprise some users. It wouldn't be hard to skip the root if it's a subquery - but what do you propose for detecting subqueries if my method isn't right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str_sort").orderBy('int.asc).as('df1)

Before entering optimizer, we get rid of SubqueryAlias by the rule EliminateSubqueryAliases. Basically, it is no-op after query analysis. The name is a little bit confusing, I have to admit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's why I added the new rule just before EliminateSubqueryAliases (which runs in the optimizer, as part of the 'finish analysis' batch). After EliminateSubqueryAliases there doesn't seem to be any way to detect subqueries.

Another approach I suppose would be to handle this like SparkPlan's requiredChildOrdering - if a parent doesn't require any ordering of the child, (and the child is a Sort node), the child Sort should be dropped. That seems like a more fundamental change though.

}
}

/**
* Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, SimpleAnalyzer}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

class RemoveSubquerySortsSuite extends PlanTest {

private object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
RemoveSubquerySorts,
EliminateSubqueryAliases) :: Nil
}

private val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

private def analyzeAndCompare(plan: LogicalPlan, correct: LogicalPlan) {
// We can't use the implicit analyze method, that tests usually use, for 'plan'
// because it explicitly calls EliminateSubqueryAliases.
comparePlans(Optimize.execute(SimpleAnalyzer.execute(plan)), correct.analyze)
}

test("Remove top-level sort") {
val query = testRelation.orderBy('a.asc).subquery('x)
analyzeAndCompare(query, testRelation)
}

test("Remove sort behind filter and project") {
val query = testRelation.orderBy('a.asc).where('a.attr > 10).select('b).subquery('x)
analyzeAndCompare(query, testRelation.where('a.attr > 10).select('b))
}

test("Remove sort below subquery that is not at root") {
val query = testRelation.orderBy('a.asc).subquery('x).groupBy('a)(sum('b))
analyzeAndCompare(query, testRelation.groupBy('a)(sum('b)))
}

test("Sorts with limits must not be removed from subqueries") {
val query = testRelation.orderBy('a.asc).limit(10).subquery('x)
analyzeAndCompare(query, testRelation.orderBy('a.asc).limit(10))
}

test("Remove more than one sort") {
val query = testRelation.orderBy('a.asc).orderBy('b.desc).subquery('x)
analyzeAndCompare(query, testRelation)
}

test("Nested subqueries") {
val query = testRelation.orderBy('a.asc).subquery('x).orderBy('b.desc).subquery('y)
analyzeAndCompare(query, testRelation)
}

test("Sorts below non-project / filter operators don't get removed") {
val query = testRelation.orderBy('a.asc).groupBy('a)(sum('b)).subquery('x)
analyzeAndCompare(query, testRelation.orderBy('a.asc).groupBy('a)(sum('b)))
}
}