diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7c8ce316f9647..89442a70283f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -66,44 +66,35 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Plans special cases of limit operators. */ object SpecialLimits extends Strategy { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { - case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) => - if (limit < conf.topKSortFallbackThreshold) { + private def decideTopRankNode(limit: Int, child: LogicalPlan): Seq[SparkPlan] = { + if (limit < conf.topKSortFallbackThreshold) { + child match { + case Sort(order, true, child) => TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil - } else { - GlobalLimitExec(limit, - LocalLimitExec(limit, planLater(s)), - orderedLimit = true) :: Nil - } - case Limit(IntegerLiteral(limit), p@Project(projectList, Sort(order, true, child))) => - if (limit < conf.topKSortFallbackThreshold) { + case Project(projectList, Sort(order, true, child)) => TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil - } else { - GlobalLimitExec(limit, - LocalLimitExec(limit, planLater(p)), - orderedLimit = true) :: Nil - } + } + } else { + GlobalLimitExec(limit, + LocalLimitExec(limit, planLater(child)), + orderedLimit = true) :: Nil + } + } + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ReturnAnswer(rootPlan) => rootPlan match { + case Limit(IntegerLiteral(limit), s @ Sort(order, true, child)) => + decideTopRankNode(limit, s) + case Limit(IntegerLiteral(limit), p @ Project(projectList, Sort(order, true, child))) => + decideTopRankNode(limit, p) case Limit(IntegerLiteral(limit), child) => CollectLimitExec(limit, planLater(child)) :: Nil case other => planLater(other) :: Nil } - case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) => - if (limit < conf.topKSortFallbackThreshold) { - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil - } else { - GlobalLimitExec(limit, - LocalLimitExec(limit, planLater(s)), - orderedLimit = true) :: Nil - } - case Limit(IntegerLiteral(limit), p@Project(projectList, Sort(order, true, child))) => - if (limit < conf.topKSortFallbackThreshold) { - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil - } else { - GlobalLimitExec(limit, - LocalLimitExec(limit, planLater(p)), - orderedLimit = true) :: Nil - } + case Limit(IntegerLiteral(limit), s @ Sort(order, true, child)) => + decideTopRankNode(limit, s) + case Limit(IntegerLiteral(limit), p @ Project(projectList, Sort(order, true, child))) => + decideTopRankNode(limit, p) case _ => Nil } }