diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 236636ac7ea11..8c63012c6814d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate -import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, PhysicalOperation} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{INVOKE, JSON_TO_STRUCT, LIKE_FAMLIY, PYTHON_UDF, REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE, SCALA_UDF} @@ -117,13 +117,38 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J * do not add a subquery that might have an expensive computation */ private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = { - val ret = plan match { - case PhysicalOperation(_, filters, child) if child.isInstanceOf[LeafNode] => - filters.forall(isSimpleExpression) && - filters.exists(isLikelySelective) + def isSelective( + p: LogicalPlan, + predicateReference: AttributeSet, + hasHitFilter: Boolean, + hasHitSelectiveFilter: Boolean): Boolean = p match { + case Project(projectList, child) => + if (hasHitFilter) { + // We need to make sure all expressions referenced by filter predicates are simple + // expressions. + val referencedExprs = projectList.filter(predicateReference.contains) + referencedExprs.forall(isSimpleExpression) && + isSelective( + child, + referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _), + hasHitFilter, + hasHitSelectiveFilter) + } else { + assert(predicateReference.isEmpty && !hasHitSelectiveFilter) + isSelective(child, predicateReference, hasHitFilter, hasHitSelectiveFilter) + } + case Filter(condition, child) => + isSimpleExpression(condition) && isSelective( + child, + predicateReference ++ condition.references, + hasHitFilter = true, + hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition)) + case _: LeafNode => hasHitSelectiveFilter case _ => false } - !plan.isStreaming && ret + + !plan.isStreaming && + isSelective(plan, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false) } private def isSimpleExpression(e: Expression): Boolean = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala index 2195eef2fc93b..fde22b249b690 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushdownPredicatesAndPruneColumnsForCTEDef.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, Literal, Or, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.CTE @@ -69,7 +69,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] { } gatherPredicatesAndAttributes(child, cteMap) - case ScanOperation(projects, predicates, ref: CTERelationRef) => + case PhysicalOperation(projects, predicates, ref: CTERelationRef) => val (cteDef, precedence, preds, attrs) = cteMap(ref.cteId) val attrMapping = ref.output.zip(cteDef.output).map{ case (r, d) => r -> d }.toMap val newPredicates = if (isTruePredicate(preds)) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala index bf3fced0ae0fd..74085436870e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala @@ -21,7 +21,7 @@ import scala.annotation.tailrec import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.planning.{NodeWithOnlyDeterministicProjectAndFilter, PhysicalOperation} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -82,7 +82,8 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper { // Find if the input plans are eligible for star join detection. // An eligible plan is a base table access with valid statistics. val foundEligibleJoin = input.forall { - case PhysicalOperation(_, _, t: LeafNode) if t.stats.rowCount.isDefined => true + case NodeWithOnlyDeterministicProjectAndFilter(t: LeafNode) + if t.stats.rowCount.isDefined => true case _ => false } @@ -177,7 +178,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper { private def isUnique( column: Attribute, plan: LogicalPlan): Boolean = plan match { - case PhysicalOperation(_, _, t: LeafNode) => + case NodeWithOnlyDeterministicProjectAndFilter(t: LeafNode) => val leafCol = findLeafNodeCol(column, plan) leafCol match { case Some(col) if t.outputSet.contains(col) => @@ -212,7 +213,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper { private def findLeafNodeCol( column: Attribute, plan: LogicalPlan): Option[Attribute] = plan match { - case pl @ PhysicalOperation(_, _, _: LeafNode) => + case pl @ NodeWithOnlyDeterministicProjectAndFilter(_: LeafNode) => pl match { case t: LeafNode if t.outputSet.contains(column) => Option(column) @@ -233,7 +234,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper { private def hasStatistics( column: Attribute, plan: LogicalPlan): Boolean = plan match { - case PhysicalOperation(_, _, t: LeafNode) => + case NodeWithOnlyDeterministicProjectAndFilter(t: LeafNode) => val leafCol = findLeafNodeCol(column, plan) leafCol match { case Some(col) if t.outputSet.contains(col) => @@ -296,7 +297,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper { */ private def getTableAccessCardinality( input: LogicalPlan): Option[BigInt] = input match { - case PhysicalOperation(_, cond, t: LeafNode) if t.stats.rowCount.isDefined => + case NodeWithOnlyDeterministicProjectAndFilter(t: LeafNode) if t.stats.rowCount.isDefined => if (conf.cboEnabled && input.stats.rowCount.isDefined) { Option(input.stats.rowCount.get) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 4e12b811acd1b..72546ea73dd9f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -29,7 +29,14 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} import org.apache.spark.sql.internal.SQLConf -trait OperationHelper extends PredicateHelper { +/** + * A pattern that matches any number of project or filter operations even if they are + * non-deterministic, as long as they satisfy the requirement of CollapseProject and CombineFilters. + * All filter operators are collected and their conditions are broken up and returned + * together with the top project operator. [[Alias Aliases]] are in-lined/substituted if + * necessary. + */ +object PhysicalOperation extends AliasHelper with PredicateHelper { import org.apache.spark.sql.catalyst.optimizer.CollapseProject.canCollapseExpressions type ReturnType = @@ -43,16 +50,6 @@ trait OperationHelper extends PredicateHelper { Some((fields.getOrElse(child.output), filters, child)) } - /** - * This legacy mode is for PhysicalOperation which has been there for years and we want to be - * extremely safe to not change its behavior. There are two differences when legacy mode is off: - * 1. We postpone the deterministic check to the very end (calling `canCollapseExpressions`), - * so that it's more likely to collect more projects and filters. - * 2. We follow CollapseProject and only collect adjacent projects if they don't produce - * repeated expensive expressions. - */ - protected def legacyMode: Boolean - /** * Collects all adjacent projects and filters, in-lining/substituting aliases if necessary. * Here are two examples for alias in-lining/substitution. @@ -73,31 +70,27 @@ trait OperationHelper extends PredicateHelper { def empty: IntermediateType = (None, Nil, plan, AttributeMap.empty) plan match { - case Project(fields, child) if !legacyMode || fields.forall(_.deterministic) => + case Project(fields, child) => val (_, filters, other, aliases) = collectProjectsAndFilters(child, alwaysInline) - if (legacyMode || canCollapseExpressions(fields, aliases, alwaysInline)) { + if (canCollapseExpressions(fields, aliases, alwaysInline)) { val replaced = fields.map(replaceAliasButKeepName(_, aliases)) (Some(replaced), filters, other, getAliasMap(replaced)) } else { empty } - case Filter(condition, child) if !legacyMode || condition.deterministic => + case Filter(condition, child) => val (fields, filters, other, aliases) = collectProjectsAndFilters(child, alwaysInline) - val canIncludeThisFilter = if (legacyMode) { - true - } else { - // When collecting projects and filters, we effectively push down filters through - // projects. We need to meet the following conditions to do so: - // 1) no Project collected so far or the collected Projects are all deterministic - // 2) the collected filters and this filter are all deterministic, or this is the - // first collected filter. - // 3) this filter does not repeat any expensive expressions from the collected - // projects. - fields.forall(_.forall(_.deterministic)) && { - filters.isEmpty || (filters.forall(_.deterministic) && condition.deterministic) - } && canCollapseExpressions(Seq(condition), aliases, alwaysInline) - } + // When collecting projects and filters, we effectively push down filters through + // projects. We need to meet the following conditions to do so: + // 1) no Project collected so far or the collected Projects are all deterministic + // 2) the collected filters and this filter are all deterministic, or this is the + // first collected filter. + // 3) this filter does not repeat any expensive expressions from the collected + // projects. + val canIncludeThisFilter = fields.forall(_.forall(_.deterministic)) && { + filters.isEmpty || (filters.forall(_.deterministic) && condition.deterministic) + } && canCollapseExpressions(Seq(condition), aliases, alwaysInline) if (canIncludeThisFilter) { val replaced = replaceAlias(condition, aliases) (fields, filters ++ splitConjunctivePredicates(replaced), other, aliases) @@ -112,24 +105,12 @@ trait OperationHelper extends PredicateHelper { } } -/** - * A pattern that matches any number of project or filter operations on top of another relational - * operator. All filter operators are collected and their conditions are broken up and returned - * together with the top project operator. - * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if - * necessary. - */ -object PhysicalOperation extends OperationHelper { - override protected def legacyMode: Boolean = true -} - -/** - * A variant of [[PhysicalOperation]]. It matches any number of project or filter - * operations even if they are non-deterministic, as long as they satisfy the - * requirement of CollapseProject and CombineFilters. - */ -object ScanOperation extends OperationHelper { - override protected def legacyMode: Boolean = false +object NodeWithOnlyDeterministicProjectAndFilter { + def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { + case Project(projectList, child) if projectList.forall(_.deterministic) => unapply(child) + case Filter(cond, child) if cond.deterministic => unapply(child) + case _ => Some(plan) + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/PhysicalOperationSuite.scala similarity index 88% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/PhysicalOperationSuite.scala index eb3899c9187db..3d3f4c4c448b4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/PhysicalOperationSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.DoubleType -class ScanOperationSuite extends SparkFunSuite { +class PhysicalOperationSuite extends SparkFunSuite { private val relation = TestRelations.testRelation2 private val colA = relation.output(0) private val colB = relation.output(1) @@ -34,7 +34,7 @@ class ScanOperationSuite extends SparkFunSuite { test("Project with a non-deterministic field and a deterministic child Filter") { val project1 = Project(Seq(colB, aliasR), Filter(EqualTo(colA, Literal(1)), relation)) project1 match { - case ScanOperation(projects, filters, _: LocalRelation) => + case PhysicalOperation(projects, filters, _: LocalRelation) => assert(projects.size === 2) assert(projects(0) === colB) assert(projects(1) === aliasR) @@ -46,7 +46,7 @@ class ScanOperationSuite extends SparkFunSuite { test("Project with all deterministic fields but a non-deterministic child Filter") { val project2 = Project(Seq(colA, colB), Filter(EqualTo(aliasR, Literal(1)), relation)) project2 match { - case ScanOperation(projects, filters, _: LocalRelation) => + case PhysicalOperation(projects, filters, _: LocalRelation) => assert(projects.size === 2) assert(projects(0) === colA) assert(projects(1) === colB) @@ -58,7 +58,7 @@ class ScanOperationSuite extends SparkFunSuite { test("Project which has the same non-deterministic expression with its child Project") { val project3 = Project(Seq(colA, colR), Project(Seq(colA, aliasR), relation)) project3 match { - case ScanOperation(projects, filters, _: Project) => + case PhysicalOperation(projects, filters, _: Project) => assert(projects.size === 2) assert(projects(0) === colA) assert(projects(1) === colR) @@ -70,7 +70,7 @@ class ScanOperationSuite extends SparkFunSuite { test("Project which has different non-deterministic expressions with its child Project") { val project4 = Project(Seq(colA, aliasId), Project(Seq(colA, aliasR), relation)) project4 match { - case ScanOperation(projects, _, _: LocalRelation) => + case PhysicalOperation(projects, _, _: LocalRelation) => assert(projects.size === 2) assert(projects(0) === colA) assert(projects(1) === aliasId) @@ -81,7 +81,7 @@ class ScanOperationSuite extends SparkFunSuite { test("Filter with non-deterministic Project") { val filter1 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), relation)) filter1 match { - case ScanOperation(projects, filters, _: Filter) => + case PhysicalOperation(projects, filters, _: Filter) => assert(projects.size === 2) assert(filters.isEmpty) case _ => assert(false) @@ -92,7 +92,7 @@ class ScanOperationSuite extends SparkFunSuite { val filter2 = Filter(EqualTo(MonotonicallyIncreasingID(), Literal(1)), Project(Seq(colA, colB), relation)) filter2 match { - case ScanOperation(projects, filters, _: LocalRelation) => + case PhysicalOperation(projects, filters, _: LocalRelation) => assert(projects.size === 2) assert(projects(0) === colA) assert(projects(1) === colB) @@ -105,7 +105,7 @@ class ScanOperationSuite extends SparkFunSuite { test("Deterministic filter which has a non-deterministic child Filter") { val filter3 = Filter(EqualTo(colA, Literal(1)), Filter(EqualTo(aliasR, Literal(1)), relation)) filter3 match { - case ScanOperation(projects, filters, _: Filter) => + case PhysicalOperation(projects, filters, _: Filter) => assert(filters.isEmpty) case _ => assert(false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index a9d5c6da3844c..0216503fba0f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 @@ -318,7 +318,7 @@ object DataSourceStrategy extends Strategy with Logging with CastSupport with PredicateHelper with SQLConfHelper { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { - case ScanOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) => pruneFilterProjectRaw( l, projects, @@ -326,7 +326,7 @@ object DataSourceStrategy (requestedColumns, allPredicates, _) => toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil - case ScanOperation(projects, filters, + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) => pruneFilterProject( l, @@ -334,7 +334,7 @@ object DataSourceStrategy filters, (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil - case ScanOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) => pruneFilterProject( l, projects, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 9356e46a69187..4995a0d6cd4f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME @@ -144,7 +144,7 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { } def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ScanOperation(projects, filters, + case PhysicalOperation(projects, filters, l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 01b0ae451b2a9..c7b09904df41d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.CollapseProject -import org.apache.spark.sql.catalyst.planning.ScanOperation +import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, Limit, LimitAndOffset, LocalLimit, LogicalPlan, Offset, OffsetAndLimit, Project, Sample, Sort} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder} @@ -97,7 +97,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } private def rewriteAggregate(agg: Aggregate): LogicalPlan = agg.child match { - case ScanOperation(project, Nil, holder @ ScanBuilderHolder(_, _, + case PhysicalOperation(project, Nil, holder @ ScanBuilderHolder(_, _, r: SupportsPushDownAggregates)) if CollapseProject.canCollapseExpressions( agg.aggregateExpressions, project, alwaysInline = true) => val aliasMap = getAliasMap(project) @@ -342,7 +342,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } def pruneColumns(plan: LogicalPlan): LogicalPlan = plan.transform { - case ScanOperation(project, filters, sHolder: ScanBuilderHolder) => + case PhysicalOperation(project, filters, sHolder: ScanBuilderHolder) => // column pruning val normalizedProjects = DataSourceStrategy .normalizeExprs(project, sHolder.output) @@ -382,7 +382,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { def pushDownSample(plan: LogicalPlan): LogicalPlan = plan.transform { case sample: Sample => sample.child match { - case ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => + case PhysicalOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => val tableSample = TableSampleInfo( sample.lowerBound, sample.upperBound, @@ -401,18 +401,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } private def pushDownLimit(plan: LogicalPlan, limit: Int): (LogicalPlan, Boolean) = plan match { - case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => + case operation @ PhysicalOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => val (isPushed, isPartiallyPushed) = PushDownUtils.pushLimit(sHolder.builder, limit) if (isPushed) { sHolder.pushedLimit = Some(limit) } (operation, isPushed && !isPartiallyPushed) - case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder)) + case s @ Sort(order, _, operation @ PhysicalOperation(project, Nil, sHolder: ScanBuilderHolder)) // Without building the Scan, we do not know the resulting column names after aggregate // push-down, and thus can't push down Top-N which needs to know the ordering column names. // TODO: we can support simple cases like GROUP BY columns directly and ORDER BY the same // columns, which we know the resulting column names: the original table columns. - if sHolder.pushedAggregate.isEmpty && filter.isEmpty && + if sHolder.pushedAggregate.isEmpty && CollapseProject.canCollapseExpressions(order, project, alwaysInline = true) => val aliasMap = getAliasMap(project) val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala index 9607ca5396449..2d0130985eacc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/CleanupDynamicPruningFilters.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.dynamicpruning import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, DynamicPruningSubquery, EqualNullSafe, EqualTo, Expression, ExpressionSet, PredicateHelper} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral -import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.planning.NodeWithOnlyDeterministicProjectAndFilter import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ @@ -72,12 +72,15 @@ object CleanupDynamicPruningFilters extends Rule[LogicalPlan] with PredicateHelp // No-op for trees that do not contain dynamic pruning. _.containsAnyPattern(DYNAMIC_PRUNING_EXPRESSION, DYNAMIC_PRUNING_SUBQUERY)) { // pass through anything that is pushed down into PhysicalOperation - case p @ PhysicalOperation(_, _, LogicalRelation(_: HadoopFsRelation, _, _, _)) => + case p @ NodeWithOnlyDeterministicProjectAndFilter( + LogicalRelation(_: HadoopFsRelation, _, _, _)) => removeUnnecessaryDynamicPruningSubquery(p) // pass through anything that is pushed down into PhysicalOperation - case p @ PhysicalOperation(_, _, HiveTableRelation(_, _, _, _, _)) => + case p @ NodeWithOnlyDeterministicProjectAndFilter( + HiveTableRelation(_, _, _, _, _)) => removeUnnecessaryDynamicPruningSubquery(p) - case p @ PhysicalOperation(_, _, _: DataSourceV2ScanRelation) => + case p @ NodeWithOnlyDeterministicProjectAndFilter( + _: DataSourceV2ScanRelation) => removeUnnecessaryDynamicPruningSubquery(p) // remove any Filters with DynamicPruning that didn't get pushed down to PhysicalOperation. case f @ Filter(condition, _) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d1e222794a526..42bf1e31bb04a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -276,7 +276,7 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ScanOperation(projectList, filters, relation: HiveTableRelation) => + case PhysicalOperation(projectList, filters, relation: HiveTableRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionCols)