Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
82978d7
Set barrier to prevent re-analysis of analyzed plan.
viirya Apr 26, 2017
24905e3
Use a logical node to set analysis barrier.
viirya Apr 27, 2017
e15b001
Add test for analysis barrier.
viirya Apr 30, 2017
a076d83
Let AnalysisBarrier as LeafNode.
viirya May 3, 2017
b29ded3
Remove resolveOperators path.
viirya May 5, 2017
8c8fe1e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 5, 2017
a855182
Solving merging issue.
viirya May 5, 2017
4ff9610
Do not change exposed logicalPlan.
viirya May 5, 2017
d0a94f4
Fix test.
viirya May 6, 2017
02e11f9
Address comments.
viirya May 9, 2017
17f1a02
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 9, 2017
4629959
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 10, 2017
c313e35
Correctly set isStreaming for barrier.
viirya May 10, 2017
7e9dfac
Address comments.
viirya May 11, 2017
fba3690
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
f63ea0b
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
b9d03cd
Fix test.
viirya May 17, 2017
6a7204c
Address comments.
viirya May 19, 2017
3437ae0
Wrap AnalysisBarrier on df.logicalPlan.
viirya May 22, 2017
555fa8e
Fix test.
viirya May 23, 2017
505aba6
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 23, 2017
f3e4208
fix test.
viirya May 24, 2017
c0bee01
Avoid overriding find in AnalysisBarrier.
viirya May 24, 2017
1c1cc9d
Fix test.
viirya May 24, 2017
eb0598e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 24, 2017
cba784b
fix test.
viirya May 24, 2017
b478e55
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 25, 2017
8314cc3
Create a new field in Dataset for the plan with barrier.
viirya May 25, 2017
6add9ec
Address comments.
viirya May 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ class Analyzer(
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
CleanupAliases,
CleanupBarriers)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this rule to Batch("Finish Analysis", ...?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do cleaning up the barriers in the end of Analysis is because we don't want to show it in analyzed plan. If we move it the "Finish Analysis" batch, it will show up.

)

/**
Expand Down Expand Up @@ -2435,6 +2436,13 @@ object CleanupAliases extends Rule[LogicalPlan] {
}
}

/** Remove the barrier nodes of analysis */
object CleanupBarriers extends Rule[LogicalPlan] {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about EliminateBarriers?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transformDown should be better here, as we are adding new sub-trees during transformation

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

case AnalysisBarrier(child) => child
}
}

/**
* Maps a time column to multiple time windows using the Expand operator. Since it's non-trivial to
* figure out how many windows a time column can map to, we over-estimate the number of windows and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation._
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -906,3 +907,9 @@ case class Deduplicate(

override def output: Seq[Attribute] = child.output
}

/** A logical plan for setting a barrier of analysis */
case class AnalysisBarrier(child: LogicalPlan) extends LeafNode {
override def output: Seq[Attribute] = child.output
override def analyzed: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,17 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers {

checkAnalysis(SubqueryAlias("tbl", testRelation).as("tbl2"), testRelation)
}

test("analysis barrier") {
// [[AnalysisBarrier]] will be removed after analysis
checkAnalysis(
Project(Seq(UnresolvedAttribute("tbl.a")),
AnalysisBarrier(SubqueryAlias("tbl", testRelation))),
Project(testRelation.output, SubqueryAlias("tbl", testRelation)))

// Make sure we won't resolve the plans wrapped in an [[AnalysisBarrier]]
val barrier = AnalysisBarrier(Project(Seq(UnresolvedAttribute("tbl.b")),
SubqueryAlias("tbl", testRelation)))
assertAnalysisError(barrier, Seq("cannot resolve '`tbl.b`'"))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this exception thrown?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckAnalysis. We wrap an unresolved plan with a barrier. So the analyzer won't go through it and it remains unresolved.

}
}
28 changes: 16 additions & 12 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,10 @@ class Dataset[T] private[sql](
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
def toDF(): DataFrame = {
val plan = AnalysisBarrier(logicalPlan)
new Dataset[Row](sparkSession, plan, RowEncoder(schema))
}

/**
* :: Experimental ::
Expand Down Expand Up @@ -702,7 +705,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def join(right: Dataset[_]): DataFrame = withPlan {
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
Join(AnalysisBarrier(logicalPlan), right.logicalPlan, joinType = Inner, None)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For self-join de-duplication, we only set barrier for left side.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should check there's duplication between right and left sides and decide using barrier or not for right side.

}

/**
Expand Down Expand Up @@ -785,8 +788,8 @@ class Dataset[T] private[sql](

withPlan {
Join(
joined.left,
joined.right,
AnalysisBarrier(joined.left),
AnalysisBarrier(joined.right),
UsingJoin(JoinType(joinType), usingColumns),
None)
}
Expand Down Expand Up @@ -841,17 +844,18 @@ class Dataset[T] private[sql](
// Trigger analysis so in the case of self-join, the analyzer will clone the plan.
// After the cloning, left and right side will have distinct expression ids.
val plan = withPlan(
Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)))
.queryExecution.analyzed.asInstanceOf[Join]
Join(AnalysisBarrier(logicalPlan), right.logicalPlan, JoinType(joinType),
Some(joinExprs.expr)))
.queryExecution.analyzed.asInstanceOf[Join]

// If auto self join alias is disabled, return the plan.
if (!sparkSession.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity) {
return withPlan(plan)
}

// If left/right have no output set intersection, return the plan.
val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
val lanalyzed = withPlan(AnalysisBarrier(this.logicalPlan)).queryExecution.analyzed
val ranalyzed = withPlan(AnalysisBarrier(right.logicalPlan)).queryExecution.analyzed
if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
return withPlan(plan)
}
Expand Down Expand Up @@ -883,7 +887,7 @@ class Dataset[T] private[sql](
* @since 2.1.0
*/
def crossJoin(right: Dataset[_]): DataFrame = withPlan {
Join(logicalPlan, right.logicalPlan, joinType = Cross, None)
Join(AnalysisBarrier(logicalPlan), right.logicalPlan, joinType = Cross, None)
}

/**
Expand Down Expand Up @@ -1134,7 +1138,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def select(cols: Column*): DataFrame = withPlan {
Project(cols.map(_.named), logicalPlan)
Project(cols.map(_.named), AnalysisBarrier(logicalPlan))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work if we turn off eager analysis?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still have eager analysis? I remember it is removed before.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR #11443, we always do the eager analysis.

}

/**
Expand Down Expand Up @@ -1812,7 +1816,7 @@ class Dataset[T] private[sql](

withPlan {
Generate(generator, join = true, outer = false,
qualifier = None, generatorOutput = Nil, logicalPlan)
qualifier = None, generatorOutput = Nil, AnalysisBarrier(logicalPlan))
}
}

Expand Down Expand Up @@ -1853,7 +1857,7 @@ class Dataset[T] private[sql](

withPlan {
Generate(generator, join = true, outer = false,
qualifier = None, generatorOutput = Nil, logicalPlan)
qualifier = None, generatorOutput = Nil, AnalysisBarrier(logicalPlan))
}
}

Expand Down