Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,33 @@ private[spark] object RDDOperationScope {
/**
* Execute the given body such that all RDDs created in this body will have the same scope.
*
* If nesting is allowed, this concatenates the previous scope with the new one in a way that
* signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to
* this method executed in the body will have no effect.
* If nesting is allowed, any subsequent calls to this method in the given body will instantiate
* child scopes that are nested within our scope. Otherwise, these calls will take no effect.
*
* Additionally, the caller of this method may optionally ignore the configurations and scopes
* set by the higher level caller. In this case, this method will ignore the parent caller's
* intention to disallow nesting, and the new scope instantiated will not have a parent. This
* is useful for scoping physical operations in Spark SQL, for instance.
*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean)(body: => T): T = {
allowNesting: Boolean,
ignoreParent: Boolean = false)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
// Set the scope only if the higher level caller allows us to do so
if (sc.getLocalProperty(noOverrideKey) == null) {
if (ignoreParent) {
// Ignore all parent settings and scopes and start afresh with our own root scope
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private[sql] case class InMemoryColumnarTableScan(

private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
if (enableAccumulators) {
readPartitions.setValue(0)
readBatches.setValue(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ case class Aggregate(
}
}

override def execute(): RDD[Row] = attachTree(this, "execute") {
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ case class Exchange(
serializer
}

override def execute(): RDD[Row] = attachTree(this , "execute") {
protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon

/** Physical plan node for scanning data from an RDD. */
private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
override def execute(): RDD[Row] = rdd
protected override def doExecute(): RDD[Row] = rdd
}

/** Logical plan node for scanning data from a local collection. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ case class Expand(
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)

override def execute(): RDD[Row] = attachTree(this, "execute") {
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
child.execute().mapPartitions { iter =>
// TODO Move out projection objects creation and transfer to
// workers via closure. However we can't assume the Projection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case class Generate(

val boundGenerator = BindReferences.bindReference(generator, child.output)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
if (join) {
child.execute().mapPartitions { iter =>
val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class GeneratedAggregate(

override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val aggregatesToCompute = aggregateExpressions.flatMap { a =>
a.collect { case agg: AggregateExpression => agg}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) e

private lazy val rdd = sqlContext.sparkContext.parallelize(rows)

override def execute(): RDD[Row] = rdd
protected override def doExecute(): RDD[Row] = rdd


override def executeCollect(): Array[Row] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -81,12 +81,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Runs this query returning the result as an RDD.
*/
def execute(): RDD[Row]
final def execute(): RDD[Row] = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
doExecute()
}
}

/**
* Runs this query returning the result as an array.
* Runs this query returning the result as an RDD.
*/
protected def doExecute(): RDD[Row]

/**
* Runs this query returning the result as an array.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify in the scala doc here the purpose of the two methods.

execute(): Returns the result of this query as an RDD[Row] by delegating to doExecute after adding query plan information to created RDDs for visualization. Concrete implementations of SparkPlan should override doExecute instead.

doExecute(): Overridden by concrete implementations of SparkPlan. Produces the result of the query as an RDD[Row]

def executeCollect(): Array[Row] = {
execute().mapPartitions { iter =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ case class Window(
}
}

def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
new Iterator[Row] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends

@transient lazy val buildProjection = newMutableProjection(projectList, child.output)

override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
val resuableProjection = buildProjection()
iter.map(resuableProjection)
}
Expand All @@ -54,7 +54,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {

@transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output)

override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
iter.filter(conditionEvaluator)
}

Expand Down Expand Up @@ -83,7 +83,7 @@ case class Sample(
override def output: Seq[Attribute] = child.output

// TODO: How to pick seed?
override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
if (withReplacement) {
child.execute().map(_.copy()).sample(withReplacement, upperBound - lowerBound, seed)
} else {
Expand All @@ -99,7 +99,7 @@ case class Sample(
case class Union(children: Seq[SparkPlan]) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output: Seq[Attribute] = children.head.output
override def execute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
}

/**
Expand All @@ -124,7 +124,7 @@ case class Limit(limit: Int, child: SparkPlan)

override def executeCollect(): Array[Row] = child.executeTake(limit)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter =>
iter.take(limit).map(row => (false, row.copy()))
Expand Down Expand Up @@ -166,7 +166,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
protected override def doExecute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)

override def outputOrdering: Seq[SortOrder] = sortOrder
}
Expand All @@ -186,7 +186,7 @@ case class Sort(
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute(): RDD[Row] = attachTree(this, "sort") {
protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
iterator.map(_.copy()).toArray.sorted(ordering).iterator
Expand Down Expand Up @@ -214,7 +214,7 @@ case class ExternalSort(
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute(): RDD[Row] = attachTree(this, "sort") {
protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
Expand Down Expand Up @@ -244,7 +244,7 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
override def requiredChildDistribution: Seq[Distribution] =
if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
val hashSet = new scala.collection.mutable.HashSet[Row]()

Expand All @@ -270,7 +270,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
}
}
Expand All @@ -285,7 +285,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
}
}
Expand All @@ -299,7 +299,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = children.head.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}
Expand All @@ -314,5 +314,5 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children: Seq[SparkPlan] = child :: Nil

def execute(): RDD[Row] = child.execute()
protected override def doExecute(): RDD[Row] = child.execute()
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan

override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val converted = sideEffectResult.map(r =>
CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[Row])
sqlContext.sparkContext.parallelize(converted, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ package object debug {
}
}

def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
new Iterator[Row] {
def hasNext: Boolean = iter.hasNext
Expand Down Expand Up @@ -193,7 +193,7 @@ package object debug {

def children: List[SparkPlan] = child :: Nil

def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().map { row =>
try typeCheck(row, child.schema) catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class BroadcastHashJoin(
sparkContext.broadcast(hashed)
}

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)

streamedPlan.execute().mapPartitions { streamedIter =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class BroadcastLeftSemiJoinHash(

override def output: Seq[Attribute] = left.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ case class BroadcastNestedLoopJoin(
@transient private lazy val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ case class HashOuterJoin(
hashTable
}

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val joinedRow = new JoinedRow()
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
// TODO this probably can be replaced by external sort (sort merged join?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case class LeftSemiJoinBNL(
@transient private lazy val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class LeftSemiJoinHash(

override def output: Seq[Attribute] = left.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ case class ShuffledHashJoin(
override def requiredChildDistribution: Seq[ClusteredDistribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
hashJoin(streamIter, hashed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class SortMergeJoin(
private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
keys.map(SortOrder(_, Ascending))

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:

def children: Seq[SparkPlan] = child :: Nil

def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val childResults = child.execute().map(_.copy())

val parent = childResults.mapPartitions { iter =>
Expand Down
Loading