Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, PhysicalOperation}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{INVOKE, JSON_TO_STRUCT, LIKE_FAMLIY, PYTHON_UDF, REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE, SCALA_UDF}
Expand Down Expand Up @@ -117,13 +117,38 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
* do not add a subquery that might have an expensive computation
*/
private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
val ret = plan match {
case PhysicalOperation(_, filters, child) if child.isInstanceOf[LeafNode] =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old behavior was, collecting all the filters above a leaf node and checking if all filters are simple expressions and at least one of them is selective.

I've rewritten the code to keep the old behavior but in a more efficient way: simply traverse the plan tree to check filter predicates, instead of merging expressions and collecting filters that may duplicate complicated expressions.

filters.forall(isSimpleExpression) &&
filters.exists(isLikelySelective)
def isSelective(
p: LogicalPlan,
predicateReference: AttributeSet,
hasHitFilter: Boolean,
hasHitSelectiveFilter: Boolean): Boolean = p match {
case Project(projectList, child) =>
if (hasHitFilter) {
// We need to make sure all expressions referenced by filter predicates are simple
// expressions.
val referencedExprs = projectList.filter(predicateReference.contains)
referencedExprs.forall(isSimpleExpression) &&
isSelective(
child,
referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
hasHitFilter,
hasHitSelectiveFilter)
} else {
assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
isSelective(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
}
case Filter(condition, child) =>
isSimpleExpression(condition) && isSelective(
child,
predicateReference ++ condition.references,
hasHitFilter = true,
hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
case _: LeafNode => hasHitSelectiveFilter
case _ => false
}
!plan.isStreaming && ret

!plan.isStreaming &&
isSelective(plan, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false)
}

private def isSimpleExpression(e: Expression): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeSet, Expression, Literal, Or, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.CTE
Expand Down Expand Up @@ -69,7 +69,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
}
gatherPredicatesAndAttributes(child, cteMap)

case ScanOperation(projects, predicates, ref: CTERelationRef) =>
case PhysicalOperation(projects, predicates, ref: CTERelationRef) =>
val (cteDef, precedence, preds, attrs) = cteMap(ref.cteId)
val attrMapping = ref.output.zip(cteDef.output).map{ case (r, d) => r -> d }.toMap
val newPredicates = if (isTruePredicate(preds)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.planning.{NodeWithOnlyDeterministicProjectAndFilter, PhysicalOperation}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._

Expand Down Expand Up @@ -82,7 +82,8 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper {
// Find if the input plans are eligible for star join detection.
// An eligible plan is a base table access with valid statistics.
val foundEligibleJoin = input.forall {
case PhysicalOperation(_, _, t: LeafNode) if t.stats.rowCount.isDefined => true
case NodeWithOnlyDeterministicProjectAndFilter(t: LeafNode)
if t.stats.rowCount.isDefined => true
case _ => false
}

Expand Down Expand Up @@ -177,7 +178,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper {
private def isUnique(
column: Attribute,
plan: LogicalPlan): Boolean = plan match {
case PhysicalOperation(_, _, t: LeafNode) =>
case NodeWithOnlyDeterministicProjectAndFilter(t: LeafNode) =>
val leafCol = findLeafNodeCol(column, plan)
leafCol match {
case Some(col) if t.outputSet.contains(col) =>
Expand Down Expand Up @@ -212,7 +213,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper {
private def findLeafNodeCol(
column: Attribute,
plan: LogicalPlan): Option[Attribute] = plan match {
case pl @ PhysicalOperation(_, _, _: LeafNode) =>
case pl @ NodeWithOnlyDeterministicProjectAndFilter(_: LeafNode) =>
pl match {
case t: LeafNode if t.outputSet.contains(column) =>
Option(column)
Expand All @@ -233,7 +234,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper {
private def hasStatistics(
column: Attribute,
plan: LogicalPlan): Boolean = plan match {
case PhysicalOperation(_, _, t: LeafNode) =>
case NodeWithOnlyDeterministicProjectAndFilter(t: LeafNode) =>
val leafCol = findLeafNodeCol(column, plan)
leafCol match {
case Some(col) if t.outputSet.contains(col) =>
Expand Down Expand Up @@ -296,7 +297,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper {
*/
private def getTableAccessCardinality(
input: LogicalPlan): Option[BigInt] = input match {
case PhysicalOperation(_, cond, t: LeafNode) if t.stats.rowCount.isDefined =>
case NodeWithOnlyDeterministicProjectAndFilter(t: LeafNode) if t.stats.rowCount.isDefined =>
if (conf.cboEnabled && input.stats.rowCount.isDefined) {
Option(input.stats.rowCount.get)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.internal.SQLConf

trait OperationHelper extends PredicateHelper {
/**
* A pattern that matches any number of project or filter operations even if they are
* non-deterministic, as long as they satisfy the requirement of CollapseProject and CombineFilters.
* All filter operators are collected and their conditions are broken up and returned
* together with the top project operator. [[Alias Aliases]] are in-lined/substituted if
* necessary.
*/
object PhysicalOperation extends AliasHelper with PredicateHelper {
import org.apache.spark.sql.catalyst.optimizer.CollapseProject.canCollapseExpressions

type ReturnType =
Expand All @@ -43,16 +50,6 @@ trait OperationHelper extends PredicateHelper {
Some((fields.getOrElse(child.output), filters, child))
}

/**
* This legacy mode is for PhysicalOperation which has been there for years and we want to be
* extremely safe to not change its behavior. There are two differences when legacy mode is off:
* 1. We postpone the deterministic check to the very end (calling `canCollapseExpressions`),
* so that it's more likely to collect more projects and filters.
* 2. We follow CollapseProject and only collect adjacent projects if they don't produce
* repeated expensive expressions.
*/
protected def legacyMode: Boolean

/**
* Collects all adjacent projects and filters, in-lining/substituting aliases if necessary.
* Here are two examples for alias in-lining/substitution.
Expand All @@ -73,31 +70,27 @@ trait OperationHelper extends PredicateHelper {
def empty: IntermediateType = (None, Nil, plan, AttributeMap.empty)

plan match {
case Project(fields, child) if !legacyMode || fields.forall(_.deterministic) =>
case Project(fields, child) =>
val (_, filters, other, aliases) = collectProjectsAndFilters(child, alwaysInline)
if (legacyMode || canCollapseExpressions(fields, aliases, alwaysInline)) {
if (canCollapseExpressions(fields, aliases, alwaysInline)) {
val replaced = fields.map(replaceAliasButKeepName(_, aliases))
(Some(replaced), filters, other, getAliasMap(replaced))
} else {
empty
}

case Filter(condition, child) if !legacyMode || condition.deterministic =>
case Filter(condition, child) =>
val (fields, filters, other, aliases) = collectProjectsAndFilters(child, alwaysInline)
val canIncludeThisFilter = if (legacyMode) {
true
} else {
// When collecting projects and filters, we effectively push down filters through
// projects. We need to meet the following conditions to do so:
// 1) no Project collected so far or the collected Projects are all deterministic
// 2) the collected filters and this filter are all deterministic, or this is the
// first collected filter.
// 3) this filter does not repeat any expensive expressions from the collected
// projects.
fields.forall(_.forall(_.deterministic)) && {
filters.isEmpty || (filters.forall(_.deterministic) && condition.deterministic)
} && canCollapseExpressions(Seq(condition), aliases, alwaysInline)
}
// When collecting projects and filters, we effectively push down filters through
// projects. We need to meet the following conditions to do so:
// 1) no Project collected so far or the collected Projects are all deterministic
// 2) the collected filters and this filter are all deterministic, or this is the
// first collected filter.
// 3) this filter does not repeat any expensive expressions from the collected
// projects.
val canIncludeThisFilter = fields.forall(_.forall(_.deterministic)) && {
filters.isEmpty || (filters.forall(_.deterministic) && condition.deterministic)
} && canCollapseExpressions(Seq(condition), aliases, alwaysInline)
if (canIncludeThisFilter) {
val replaced = replaceAlias(condition, aliases)
(fields, filters ++ splitConjunctivePredicates(replaced), other, aliases)
Expand All @@ -112,24 +105,12 @@ trait OperationHelper extends PredicateHelper {
}
}

/**
* A pattern that matches any number of project or filter operations on top of another relational
* operator. All filter operators are collected and their conditions are broken up and returned
* together with the top project operator.
* [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if
* necessary.
*/
object PhysicalOperation extends OperationHelper {
override protected def legacyMode: Boolean = true
}

/**
* A variant of [[PhysicalOperation]]. It matches any number of project or filter
* operations even if they are non-deterministic, as long as they satisfy the
* requirement of CollapseProject and CombineFilters.
*/
object ScanOperation extends OperationHelper {
override protected def legacyMode: Boolean = false
object NodeWithOnlyDeterministicProjectAndFilter {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
case Project(projectList, child) if projectList.forall(_.deterministic) => unapply(child)
case Filter(cond, child) if cond.deterministic => unapply(child)
case _ => Some(plan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if it returns Some for all other cases, doesn't it mean it matches all input plans?

Copy link
Contributor Author

@cloud-fan cloud-fan Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and I explicitly call it out in the classdoc. This is the same in PhysicalOperation, which always returns Some, and the caller side will specify the expected type.

}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.DoubleType

class ScanOperationSuite extends SparkFunSuite {
class PhysicalOperationSuite extends SparkFunSuite {
private val relation = TestRelations.testRelation2
private val colA = relation.output(0)
private val colB = relation.output(1)
Expand All @@ -34,7 +34,7 @@ class ScanOperationSuite extends SparkFunSuite {
test("Project with a non-deterministic field and a deterministic child Filter") {
val project1 = Project(Seq(colB, aliasR), Filter(EqualTo(colA, Literal(1)), relation))
project1 match {
case ScanOperation(projects, filters, _: LocalRelation) =>
case PhysicalOperation(projects, filters, _: LocalRelation) =>
assert(projects.size === 2)
assert(projects(0) === colB)
assert(projects(1) === aliasR)
Expand All @@ -46,7 +46,7 @@ class ScanOperationSuite extends SparkFunSuite {
test("Project with all deterministic fields but a non-deterministic child Filter") {
val project2 = Project(Seq(colA, colB), Filter(EqualTo(aliasR, Literal(1)), relation))
project2 match {
case ScanOperation(projects, filters, _: LocalRelation) =>
case PhysicalOperation(projects, filters, _: LocalRelation) =>
assert(projects.size === 2)
assert(projects(0) === colA)
assert(projects(1) === colB)
Expand All @@ -58,7 +58,7 @@ class ScanOperationSuite extends SparkFunSuite {
test("Project which has the same non-deterministic expression with its child Project") {
val project3 = Project(Seq(colA, colR), Project(Seq(colA, aliasR), relation))
project3 match {
case ScanOperation(projects, filters, _: Project) =>
case PhysicalOperation(projects, filters, _: Project) =>
assert(projects.size === 2)
assert(projects(0) === colA)
assert(projects(1) === colR)
Expand All @@ -70,7 +70,7 @@ class ScanOperationSuite extends SparkFunSuite {
test("Project which has different non-deterministic expressions with its child Project") {
val project4 = Project(Seq(colA, aliasId), Project(Seq(colA, aliasR), relation))
project4 match {
case ScanOperation(projects, _, _: LocalRelation) =>
case PhysicalOperation(projects, _, _: LocalRelation) =>
assert(projects.size === 2)
assert(projects(0) === colA)
assert(projects(1) === aliasId)
Expand All @@ -81,7 +81,7 @@ class ScanOperationSuite extends SparkFunSuite {
test("Filter with non-deterministic Project") {
val filter1 = Filter(EqualTo(colA, Literal(1)), Project(Seq(colA, aliasR), relation))
filter1 match {
case ScanOperation(projects, filters, _: Filter) =>
case PhysicalOperation(projects, filters, _: Filter) =>
assert(projects.size === 2)
assert(filters.isEmpty)
case _ => assert(false)
Expand All @@ -92,7 +92,7 @@ class ScanOperationSuite extends SparkFunSuite {
val filter2 = Filter(EqualTo(MonotonicallyIncreasingID(), Literal(1)),
Project(Seq(colA, colB), relation))
filter2 match {
case ScanOperation(projects, filters, _: LocalRelation) =>
case PhysicalOperation(projects, filters, _: LocalRelation) =>
assert(projects.size === 2)
assert(projects(0) === colA)
assert(projects(1) === colB)
Expand All @@ -105,7 +105,7 @@ class ScanOperationSuite extends SparkFunSuite {
test("Deterministic filter which has a non-deterministic child Filter") {
val filter3 = Filter(EqualTo(colA, Literal(1)), Filter(EqualTo(aliasR, Literal(1)), relation))
filter3 match {
case ScanOperation(projects, filters, _: Filter) =>
case PhysicalOperation(projects, filters, _: Filter) =>
assert(filters.isEmpty)
case _ => assert(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
Expand Down Expand Up @@ -318,23 +318,23 @@ object DataSourceStrategy
extends Strategy with Logging with CastSupport with PredicateHelper with SQLConfHelper {

def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
case ScanOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil

case ScanOperation(projects, filters,
case PhysicalOperation(projects, filters,
l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil

case ScanOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
Expand Down Expand Up @@ -144,7 +144,7 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ScanOperation(projects, filters,
case PhysicalOperation(projects, filters,
l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
Expand Down
Loading