Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ class Analyzer(
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " +
s"between $left and $right")

right.collect {
val conflictPlans = right.collect {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Expand Down Expand Up @@ -1089,27 +1089,31 @@ class Analyzer(
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
}
// Only handle first case, others will be fixed on the next pass.
.headOption match {
case None =>
/*
* No result implies that there is a logical plan node that produces new references
* that this rule cannot handle. When that is the case, there must be another rule
* that resolves these conflicts. Otherwise, the analysis will fail.
*/
right
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute =>
dedupAttr(a, attributeRewrites)
case s: SubqueryExpression =>
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
}

/*
* Note that it's possible `conflictPlans` can be empty which implies that there
* is a logical plan node that produces new references that this rule cannot handle.
* When that is the case, there must be another rule that resolves these conflicts.
* Otherwise, the analysis will fail.
*/
if (conflictPlans.isEmpty) {
right
} else {
val attributeRewrites = AttributeMap(conflictPlans.flatMap {
case (oldRelation, newRelation) => oldRelation.output.zip(newRelation.output)})
val conflictPlanMap = conflictPlans.toMap
// transformDown so that we can replace all the old Relations in one turn due to
// the reason that `conflictPlans` are also collected in pre-order.
right transformDown {
case r => conflictPlanMap.getOrElse(r, r)
} transformUp {
case other => other transformExpressions {
case a: Attribute =>
dedupAttr(a, attributeRewrites)
case s: SubqueryExpression =>
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
}
}
}
}

Expand Down