Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ trait CatalystConf {

def sessionLocalTimeZone: String

def constraintPropagationEnabled: Boolean

/** If true, cartesian products between relations will be allowed for all
* join types(inner, (left|right|full) outer).
* If false, cartesian products will require explicit CROSS JOIN syntax.
Expand Down Expand Up @@ -76,5 +78,6 @@ case class SimpleCatalystConf(
crossJoinEnabled: Boolean = false,
cboEnabled: Boolean = false,
warehousePath: String = "/user/hive/warehouse",
sessionLocalTimeZone: String = TimeZone.getDefault().getID)
sessionLocalTimeZone: String = TimeZone.getDefault().getID,
constraintPropagationEnabled: Boolean = true)
extends CatalystConf
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
// Operator push down
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin,
EliminateOuterJoin(conf),
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown(conf),
ColumnPruning,
InferFiltersFromConstraints,
InferFiltersFromConstraints(conf),
// Operator combine
CollapseRepartition,
CollapseProject,
Expand All @@ -105,7 +105,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyBinaryComparison,
PruneFilters,
PruneFilters(conf),
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
Expand Down Expand Up @@ -606,8 +606,15 @@ object CollapseWindow extends Rule[LogicalPlan] {
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case class InferFiltersFromConstraints(conf: CatalystConf)
extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = if (conf.constraintPropagationEnabled) {
doInferFilters(plan)
} else {
plan
}

private def doInferFilters(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
val newFilters = filter.constraints --
(child.constraints ++ splitConjunctivePredicates(condition))
Expand Down Expand Up @@ -696,7 +703,7 @@ object EliminateSorts extends Rule[LogicalPlan] {
* 2) by substituting a dummy empty relation when the filter will always evaluate to `false`.
* 3) by eliminating the always-true conditions given the constraints on the child's output.
*/
object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
case class PruneFilters(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// If the filter condition always evaluate to true, remove the filter.
case Filter(Literal(true, BooleanType), child) => child
Expand All @@ -706,7 +713,7 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty)
// If any deterministic condition is guaranteed to be true given the constraints on the child's
// output, remove the condition
case f @ Filter(fc, p: LogicalPlan) =>
case f @ Filter(fc, p: LogicalPlan) if conf.constraintPropagationEnabled =>
val (prunedPredicates, remainingPredicates) =
splitConjunctivePredicates(fc).partition { cond =>
cond.deterministic && p.constraints.contains(cond)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -101,7 +102,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
*
* This rule should be executed before pushing down the Filter
*/
object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
case class EliminateOuterJoin(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper {

/**
* Returns whether the expression returns null or false when all inputs are nulls.
Expand All @@ -117,7 +118,11 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
}

private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
val conditions = if (conf.constraintPropagationEnabled) {
splitConjunctivePredicates(filter.condition) ++ filter.constraints
} else {
splitConjunctivePredicates(filter.condition)
}
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ import org.apache.spark.sql.catalyst.rules._
class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper {

object Optimize extends RuleExecutor[LogicalPlan] {
val conf = SimpleCatalystConf(caseSensitiveAnalysis = true)
val batches =
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("Constant Folding", FixedPoint(50),
NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
NullPropagation(conf),
ConstantFolding,
BooleanSimplification,
SimplifyBinaryComparison,
PruneFilters) :: Nil
PruneFilters(conf)) :: Nil
}

val nullableRelation = LocalRelation('a.int.withNullability(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import org.apache.spark.sql.catalyst.rules._
class BooleanSimplificationSuite extends PlanTest with PredicateHelper {

object Optimize extends RuleExecutor[LogicalPlan] {
val conf = SimpleCatalystConf(caseSensitiveAnalysis = true)
val batches =
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("Constant Folding", FixedPoint(50),
NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
NullPropagation(conf),
ConstantFolding,
BooleanSimplification,
PruneFilters) :: Nil
PruneFilters(conf)) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -31,7 +32,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
Batch("InferAndPushDownFilters", FixedPoint(100),
PushPredicateThroughJoin,
PushDownPredicate,
InferFiltersFromConstraints,
InferFiltersFromConstraints(SimpleCatalystConf(caseSensitiveAnalysis = true)),
CombineFilters) :: Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -31,7 +32,7 @@ class OuterJoinEliminationSuite extends PlanTest {
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Outer Join Elimination", Once,
EliminateOuterJoin,
EliminateOuterJoin(SimpleCatalystConf(caseSensitiveAnalysis = true)),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for outer join elimination as well?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test.

PushPredicateThroughJoin) :: Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans._
Expand All @@ -33,7 +34,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
PushDownPredicate,
PruneFilters,
PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true)),
PropagateEmptyRelation) :: Nil
}

Expand All @@ -45,7 +46,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
PushDownPredicate,
PruneFilters) :: Nil
PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}

val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -33,7 +34,7 @@ class PruneFiltersSuite extends PlanTest {
EliminateSubqueryAliases) ::
Batch("Filter Pushdown and Pruning", Once,
CombineFilters,
PruneFilters,
PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true)),
PushDownPredicate,
PushPredicateThroughJoin) :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -34,7 +35,7 @@ class SetOperationSuite extends PlanTest {
CombineUnions,
PushProjectionThroughUnion,
PushDownPredicate,
PruneFilters) :: Nil
PruneFilters(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
.internal()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be an internal flag, right? cc @sameeragarwal @hvanhovell

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about it, because constraint propagation is internal details.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To determine whether a flag is internal or not, we should consider the impact of external users. If users could easily hit this, we might need to expose it as an external flag and document it in the public document.

@viirya viirya Mar 14, 2017

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the fact there are few users reporting hitting this issue, we may need to expose it as an external flag. But looks like it is not very common issue, compared with other external flag.

However, I would think that a large portion of external users may not know constraint propagation. It might not be intuitive to link the problem they hit to constraint propagation and to find this config, even it is external.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constraint propagation is like predicate pushdown, which has already been used in external configurations. We can rename it to make external users easy to understand, e.g., constraints inferences.

.doc("When true, the query optimizer will use constraint propagation in query plans to " +
"perform optimization. Constraint propagation can be computation expensive for long " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long -> large

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Fixed.

"query plans. For such queries, disable this flag to get around this issue. Default " +
"is enabled")
.booleanConf
.createWithDefault(true)

val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
Expand Down Expand Up @@ -795,6 +804,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)

def subexpressionEliminationEnabled: Boolean =
getConf(SUBEXPRESSION_ELIMINATION_ENABLED)

Expand Down