Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ class Analyzer(
failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF")

// To resolve duplicate expression IDs for Join and Intersect
case j @ Join(left, right, _, _) if !j.duplicateResolved =>
case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
case i @ Intersect(left, right) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))
Expand Down Expand Up @@ -2087,10 +2087,10 @@ class Analyzer(
*/
object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
case j @ Join(left, right, UsingJoin(joinType, usingCols), _, _)
if left.resolved && right.resolved && j.duplicateResolved =>
commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural =>
case j @ Join(left, right, NaturalJoin(joinType), condition, _) if j.resolvedExceptNatural =>
// find common column names from both sides
val joinNames = left.output.map(_.name).intersect(right.output.map(_.name))
commonNaturalJoinProcessing(left, right, joinType, joinNames, condition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ trait CheckAnalysis extends PredicateHelper {
failAnalysis("Null-aware predicate sub-queries cannot be used in nested " +
s"conditions: $condition")

case j @ Join(_, _, _, Some(condition)) if condition.dataType != BooleanType =>
case j @ Join(_, _, _, Some(condition), _) if condition.dataType != BooleanType =>
failAnalysis(
s"join condition '${condition.sql}' " +
s"of type ${condition.dataType.simpleString} is not a boolean.")
Expand Down Expand Up @@ -583,7 +583,7 @@ trait CheckAnalysis extends PredicateHelper {
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)

// Join can host correlated expressions.
case j @ Join(left, right, joinType, _) =>
case j @ Join(left, right, joinType, _, _) =>
joinType match {
// Inner join, like Filter, can be anywhere.
case _: InnerLike =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ object UnsupportedOperationChecker {
throwError("dropDuplicates is not supported after aggregation on a " +
"streaming DataFrame/Dataset")

case Join(left, right, joinType, condition) =>
case Join(left, right, joinType, condition, _) =>

joinType match {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ trait PredicateHelper {
}
}

// If one expression and its children are null intolerant, it is null intolerant.
protected def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerant)
case _ => false
}

// Substitute any known alias from a map.
protected def replaceAlias(
condition: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
} else {
val result = plan transformDown {
// Start reordering with a joinable item, which is an InnerLike join with conditions.
case j @ Join(_, _, _: InnerLike, Some(cond)) =>
case j @ Join(_, _, _: InnerLike, Some(cond), _) =>
reorder(j, j.output)
case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond)))
case p @ Project(projectList, Join(_, _, _: InnerLike, Some(cond), _))
if projectList.forall(_.isInstanceOf[Attribute]) =>
reorder(p, p.output)
}
Expand Down Expand Up @@ -76,12 +76,12 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
*/
private def extractInnerJoins(plan: LogicalPlan): (Seq[LogicalPlan], Set[Expression]) = {
plan match {
case Join(left, right, _: InnerLike, Some(cond)) =>
case Join(left, right, _: InnerLike, Some(cond), _) =>
val (leftPlans, leftConditions) = extractInnerJoins(left)
val (rightPlans, rightConditions) = extractInnerJoins(right)
(leftPlans ++ rightPlans, splitConjunctivePredicates(cond).toSet ++
leftConditions ++ rightConditions)
case Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond)))
case Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond), _))
if projectList.forall(_.isInstanceOf[Attribute]) =>
extractInnerJoins(j)
case _ =>
Expand All @@ -90,11 +90,11 @@ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
}

private def replaceWithOrderedJoin(plan: LogicalPlan): LogicalPlan = plan match {
case j @ Join(left, right, jt: InnerLike, Some(cond)) =>
case j @ Join(left, right, jt: InnerLike, Some(cond), _) =>
val replacedLeft = replaceWithOrderedJoin(left)
val replacedRight = replaceWithOrderedJoin(right)
OrderedJoin(replacedLeft, replacedRight, jt, Some(cond))
case p @ Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond))) =>
case p @ Project(projectList, j @ Join(_, _, _: InnerLike, Some(cond), _)) =>
p.copy(child = replaceWithOrderedJoin(j))
case _ =>
plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
// not allowed to use the same attributes. We use a blacklist to prevent us from creating a
// situation in which this happens; the rule will only remove an alias if its child
// attribute is not on the black list.
case Join(left, right, joinType, condition) =>
case Join(left, right, joinType, condition, notNullAttrs) =>
val newLeft = removeRedundantAliases(left, blacklist ++ right.outputSet)
val newRight = removeRedundantAliases(right, blacklist ++ newLeft.outputSet)
val mapping = AttributeMap(
Expand All @@ -263,7 +263,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
val newCondition = condition.map(_.transform {
case a: Attribute => mapping.getOrElse(a, a)
})
Join(newLeft, newRight, joinType, newCondition)
Join(newLeft, newRight, joinType, newCondition, notNullAttrs)

case _ =>
// Remove redundant aliases in the subtree(s).
Expand Down Expand Up @@ -354,7 +354,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
// on both sides if it is applied multiple times. Therefore:
// - If one side is already limited, stack another limit on top if the new limit is smaller.
// The redundant limit will be collapsed by the CombineLimits rule.
case LocalLimit(exp, join @ Join(left, right, joinType, _)) =>
case LocalLimit(exp, join @ Join(left, right, joinType, _, _)) =>
val newJoin = joinType match {
case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
Expand Down Expand Up @@ -468,7 +468,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))

// Eliminate unneeded attributes from right side of a Left Existence Join.
case j @ Join(_, right, LeftExistence(_), _) =>
case j @ Join(_, right, LeftExistence(_), _, _) =>
j.copy(right = prunedChild(right, j.references))

// all the columns will be used to compare, so we can't prune them
Expand Down Expand Up @@ -661,27 +661,38 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan]
filter
}

case join @ Join(left, right, joinType, conditionOpt) =>
case join @ Join(left, right, joinType, conditionOpt, notNullAttrs) =>
joinType match {
// For inner join, we can infer additional filters for both sides. LeftSemi is kind of an
// inner join, it just drops the right side in the final output.
case _: InnerLike | LeftSemi =>
val allConstraints = getAllConstraints(left, right, conditionOpt)
val newLeft = inferNewFilter(left, allConstraints)
val newRight = inferNewFilter(right, allConstraints)
join.copy(left = newLeft, right = newRight)
val newLeftPredicates = inferNewPredicate(left, allConstraints)
val newRightPredicates = inferNewPredicate(right, allConstraints)
val newNotNullAttrs = getNotNullAttributes(
newLeftPredicates ++ newRightPredicates, notNullAttrs)
join.copy(
left = addFilterIfNeeded(left, newLeftPredicates),
right = addFilterIfNeeded(right, newRightPredicates),
notNullAttributes = newNotNullAttrs)

// For right outer join, we can only infer additional filters for left side.
case RightOuter =>
val allConstraints = getAllConstraints(left, right, conditionOpt)
val newLeft = inferNewFilter(left, allConstraints)
join.copy(left = newLeft)
val newLeftPredicates = inferNewPredicate(left, allConstraints)
val newNotNullAttrs = getNotNullAttributes(newLeftPredicates, notNullAttrs)
join.copy(
left = addFilterIfNeeded(left, newLeftPredicates),
notNullAttributes = newNotNullAttrs)

// For left join, we can only infer additional filters for right side.
case LeftOuter | LeftAnti =>
val allConstraints = getAllConstraints(left, right, conditionOpt)
val newRight = inferNewFilter(right, allConstraints)
join.copy(right = newRight)
val newRightPredicates = inferNewPredicate(right, allConstraints)
val newNotNullAttrs = getNotNullAttributes(newRightPredicates, notNullAttrs)
join.copy(
right = addFilterIfNeeded(right, newRightPredicates),
notNullAttributes = newNotNullAttrs)

case _ => join
}
Expand All @@ -696,16 +707,32 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan]
baseConstraints.union(inferAdditionalConstraints(baseConstraints))
}

private def inferNewFilter(plan: LogicalPlan, constraints: Set[Expression]): LogicalPlan = {
val newPredicates = constraints
private def inferNewPredicate(
plan: LogicalPlan, constraints: Set[Expression]): Set[Expression] = {
constraints
.union(constructIsNotNullConstraints(constraints, plan.output))
.filter { c =>
c.references.nonEmpty && c.references.subsetOf(plan.outputSet) && c.deterministic
} -- plan.constraints
if (newPredicates.isEmpty) {
plan
} else {
}

private def getNotNullAttributes(
constraints: Set[Expression],
curNotNullAttrs: Set[ExprId]): Set[ExprId] = {

// Split out all the IsNotNulls from the `constraints`
val (notNullPreds, _) = constraints.partition {
case IsNotNull(a) => isNullIntolerant(a)
case _ => false
}
notNullPreds.flatMap(_.references.map(_.exprId)) ++ curNotNullAttrs
}

private def addFilterIfNeeded(plan: LogicalPlan, newPredicates: Set[Expression]): LogicalPlan = {
if (newPredicates.nonEmpty) {
Filter(newPredicates.reduce(And), plan)
} else {
plan
}
}
}
Expand Down Expand Up @@ -1048,7 +1075,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// push the where condition down into join filter
case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) =>
case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition, notNullAttrs)) =>
val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
split(splitConjunctivePredicates(filterCondition), left, right)
joinType match {
Expand All @@ -1062,7 +1089,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
commonFilterCondition.partition(canEvaluateWithinJoin)
val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And)

val join = Join(newLeft, newRight, joinType, newJoinCond)
val join = Join(newLeft, newRight, joinType, newJoinCond, notNullAttrs)
if (others.nonEmpty) {
Filter(others.reduceLeft(And), join)
} else {
Expand All @@ -1074,7 +1101,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newRight = rightFilterConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = joinCondition
val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond)
val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond, notNullAttrs)

(leftFilterConditions ++ commonFilterCondition).
reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
Expand All @@ -1084,7 +1111,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
val newRight = right
val newJoinCond = joinCondition
val newJoin = Join(newLeft, newRight, joinType, newJoinCond)
val newJoin = Join(newLeft, newRight, joinType, newJoinCond, notNullAttrs)

(rightFilterConditions ++ commonFilterCondition).
reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
Expand All @@ -1094,7 +1121,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}

// push down the join filter into sub query scanning if applicable
case j @ Join(left, right, joinType, joinCondition) =>
case j @ Join(left, right, joinType, joinCondition, notNullAttrs) =>
val (leftJoinConditions, rightJoinConditions, commonJoinCondition) =
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)

Expand All @@ -1107,23 +1134,23 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = commonJoinCondition.reduceLeftOption(And)

Join(newLeft, newRight, joinType, newJoinCond)
Join(newLeft, newRight, joinType, newJoinCond, notNullAttrs)
case RightOuter =>
// push down the left side only join filter for left side sub query
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
val newRight = right
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

Join(newLeft, newRight, RightOuter, newJoinCond)
Join(newLeft, newRight, RightOuter, newJoinCond, notNullAttrs)
case LeftOuter | LeftAnti | ExistenceJoin(_) =>
// push down the right side only join filter for right sub query
val newLeft = left
val newRight = rightJoinConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

Join(newLeft, newRight, joinType, newJoinCond)
Join(newLeft, newRight, joinType, newJoinCond, notNullAttrs)
case FullOuter => j
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
case UsingJoin(_, _) => sys.error("Untransformed Using join node")
Expand Down Expand Up @@ -1179,7 +1206,7 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper {
if (SQLConf.get.crossJoinEnabled) {
plan
} else plan transform {
case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, _)
case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, _, _)
if isCartesianProduct(j) =>
throw new AnalysisException(
s"""Detected cartesian product for ${j.joinType.sql} join between logical plans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
// Joins on empty LocalRelations generated from streaming sources are not eliminated
// as stateful streaming joins need to perform other state management operations other than
// just processing the input data.
case p @ Join(_, _, joinType, _)
case p @ Join(_, _, joinType, _, _)
if !p.children.exists(_.isStreaming) =>
val isLeftEmpty = isEmptyLocalRelation(p.left)
val isRightEmpty = isEmptyLocalRelation(p.right)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
// propagating the foldable expressions.
// TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes
// of outer join.
case j @ Join(left, right, joinType, _) if foldableMap.nonEmpty =>
case j @ Join(left, right, joinType, _, _) if foldableMap.nonEmpty =>
val newJoin = j.transformExpressions(replaceFoldable)
val missDerivedAttrsSet: AttributeSet = AttributeSet(joinType match {
case _: InnerLike | LeftExistence(_) => Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
val newJoinType = buildNewJoinType(f, j)
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
// the produced join then becomes unresolved and break structural integrity. We should
// de-duplicate conflicting attributes. We don't use transformation here because we only
// care about the most top join converted from correlated predicate subquery.
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) =>
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond, _) =>
val duplicates = right.outputSet.intersect(left.outputSet)
if (duplicates.nonEmpty) {
val aliasMap = AttributeMap(duplicates.map { dup =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
case join @ Join(left, right, joinType, condition) =>
case join @ Join(left, right, joinType, condition, _) =>
logDebug(s"Considering join on: $condition")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
Expand Down Expand Up @@ -165,11 +165,11 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {
*/
def flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner)
: (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match {
case Join(left, right, joinType: InnerLike, cond) =>
case Join(left, right, joinType: InnerLike, cond, _) =>
val (plans, conditions) = flattenJoin(left, joinType)
(plans ++ Seq((right, joinType)), conditions ++
cond.toSeq.flatMap(splitConjunctivePredicates))
case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) =>
case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition, _)) =>
val (plans, conditions) = flattenJoin(j)
(plans, conditions ++ splitConjunctivePredicates(filterCondition))

Expand All @@ -178,9 +178,9 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {

def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]
= plan match {
case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _)) =>
case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _, _)) =>
Some(flattenJoin(f))
case j @ Join(_, _, joinType, _) =>
case j @ Join(_, _, joinType, _, _) =>
Some(flattenJoin(j))
case _ => None
}
Expand Down
Loading