Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
82978d7
Set barrier to prevent re-analysis of analyzed plan.
viirya Apr 26, 2017
24905e3
Use a logical node to set analysis barrier.
viirya Apr 27, 2017
e15b001
Add test for analysis barrier.
viirya Apr 30, 2017
a076d83
Let AnalysisBarrier as LeafNode.
viirya May 3, 2017
b29ded3
Remove resolveOperators path.
viirya May 5, 2017
8c8fe1e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 5, 2017
a855182
Solving merging issue.
viirya May 5, 2017
4ff9610
Do not change exposed logicalPlan.
viirya May 5, 2017
d0a94f4
Fix test.
viirya May 6, 2017
02e11f9
Address comments.
viirya May 9, 2017
17f1a02
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 9, 2017
4629959
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 10, 2017
c313e35
Correctly set isStreaming for barrier.
viirya May 10, 2017
7e9dfac
Address comments.
viirya May 11, 2017
fba3690
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
f63ea0b
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 17, 2017
b9d03cd
Fix test.
viirya May 17, 2017
6a7204c
Address comments.
viirya May 19, 2017
3437ae0
Wrap AnalysisBarrier on df.logicalPlan.
viirya May 22, 2017
555fa8e
Fix test.
viirya May 23, 2017
505aba6
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 23, 2017
f3e4208
fix test.
viirya May 24, 2017
c0bee01
Avoid overriding find in AnalysisBarrier.
viirya May 24, 2017
1c1cc9d
Fix test.
viirya May 24, 2017
eb0598e
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 24, 2017
cba784b
fix test.
viirya May 24, 2017
b478e55
Merge remote-tracking branch 'upstream/master' into SPARK-20392
viirya May 25, 2017
8314cc3
Create a new field in Dataset for the plan with barrier.
viirya May 25, 2017
6add9ec
Address comments.
viirya May 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,34 @@ object CurrentOrigin {
}
}

case class Barrier(node: Option[TreeNode[_]] = None)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just create a logical plan node and override the transformUp/transformDown functions?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original thought is: If we use a barrier node, we need to modify many places where we create a new logical plan and wrap it with the barrier node.

I will revamp it with a barrier node.


/**
* Provides a barrier for TreeNodes to prevent transformation from specified nodes.
*/
object CurrentBarrier {
private val value = new ThreadLocal[Barrier]() {
override def initialValue: Barrier = Barrier()
}

def get: Barrier = value.get()
def set(b: Barrier): Unit = value.set(b)

def reset(): Unit = value.set(Barrier())

def hitBarrier(currentNode: TreeNode[_]): Boolean = {
val barrier = value.get()
barrier.node.isDefined && (barrier.node.get fastEquals currentNode)
}

def withBarrier[A](b: Barrier)(f: => A): A = {
val barrier = get
set(b)
val ret = try f finally { set(barrier) }
ret
}
}

// scalastyle:off
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
// scalastyle:on
Expand Down Expand Up @@ -115,15 +143,19 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
*/
def foreach(f: BaseType => Unit): Unit = {
f(this)
children.foreach(_.foreach(f))
if (!CurrentBarrier.hitBarrier(this)) {
children.foreach(_.foreach(f))
}
}

/**
* Runs the given function recursively on [[children]] then on this node.
* @param f the function to be applied to each node in the tree.
*/
def foreachUp(f: BaseType => Unit): Unit = {
children.foreach(_.foreachUp(f))
if (!CurrentBarrier.hitBarrier(this)) {
children.foreach(_.foreachUp(f))
}
f(this)
}

Expand Down Expand Up @@ -267,11 +299,19 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
rule.applyOrElse(this, identity[BaseType])
}

// Check if unchanged and then possibly return old copy to avoid gc churn.
if (this fastEquals afterRule) {
mapChildren(_.transformDown(rule))
if (CurrentBarrier.hitBarrier(this)) {
if (this fastEquals afterRule) {
this
} else {
afterRule
}
} else {
afterRule.mapChildren(_.transformDown(rule))
// Check if unchanged and then possibly return old copy to avoid gc churn.
if (this fastEquals afterRule) {
mapChildren(_.transformDown(rule))
} else {
afterRule.mapChildren(_.transformDown(rule))
}
}
}

Expand All @@ -283,14 +323,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
* @param rule the function use to transform this nodes children
*/
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
val afterRuleOnChildren = mapChildren(_.transformUp(rule))
if (this fastEquals afterRuleOnChildren) {
if (CurrentBarrier.hitBarrier(this)) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(this, identity[BaseType])
}
} else {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
val afterRuleOnChildren = mapChildren(_.transformUp(rule))
if (this fastEquals afterRuleOnChildren) {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(this, identity[BaseType])
}
} else {
CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
}
}
}
}
Expand Down
29 changes: 20 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.trees.{Barrier, CurrentBarrier}
import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -203,7 +204,7 @@ class Dataset[T] private[sql](
* custom objects, e.g. collect. Here we resolve and bind the encoder so that we can call its
* `fromRow` method later.
*/
private val boundEnc =
private lazy val boundEnc =

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't let boundEnc as lazy val because we need early exception when the encoder can't be resolved.

exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer)

private implicit def classTag = exprEnc.clsTag
Expand Down Expand Up @@ -356,7 +357,11 @@ class Dataset[T] private[sql](
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
def toDF(): DataFrame = {
CurrentBarrier.withBarrier(Barrier(Some(logicalPlan))) {
new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
}
}

/**
* :: Experimental ::
Expand Down Expand Up @@ -2828,21 +2833,27 @@ class Dataset[T] private[sql](

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, logicalPlan)
CurrentBarrier.withBarrier(Barrier(Some(this.logicalPlan))) {
Dataset.ofRows(sparkSession, logicalPlan)
}
}

/** A convenient function to wrap a logical plan and produce a Dataset. */
@inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
Dataset(sparkSession, logicalPlan)
CurrentBarrier.withBarrier(Barrier(Some(this.logicalPlan))) {
Dataset(sparkSession, logicalPlan)
}
}

/** A convenient function to wrap a set based logical plan and produce a Dataset. */
@inline private def withSetOperator[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
// Set operators widen types (change the schema), so we cannot reuse the row encoder.
Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
} else {
Dataset(sparkSession, logicalPlan)
CurrentBarrier.withBarrier(Barrier(Some(this.logicalPlan))) {
if (classTag.runtimeClass.isAssignableFrom(classOf[Row])) {
// Set operators widen types (change the schema), so we cannot reuse the row encoder.
Dataset.ofRows(sparkSession, logicalPlan).asInstanceOf[Dataset[U]]
} else {
Dataset(sparkSession, logicalPlan)
}
}
}
}