1717
1818package org .apache .spark .sql .execution .adaptive
1919
20+ import java .util .Properties
21+
2022import scala .concurrent .{ExecutionContext , Future }
2123import scala .concurrent .duration .Duration
2224
23- import org .apache .spark .MapOutputStatistics
24- import org .apache .spark .broadcast
25+ import org .apache .spark .{broadcast , MapOutputStatistics , SparkContext }
2526import org .apache .spark .rdd .RDD
2627import org .apache .spark .sql .catalyst .InternalRow
2728import org .apache .spark .sql .catalyst .expressions ._
@@ -48,20 +49,30 @@ abstract class QueryStage extends UnaryExecNode {
4849
4950 override def outputOrdering : Seq [SortOrder ] = child.outputOrdering
5051
52+ def withLocalProperties [T ](sc : SparkContext , properties : Properties )(body : => T ): T = {
53+ val oldProperties = sc.getLocalProperties
54+ try {
55+ sc.setLocalProperties(properties)
56+ body
57+ } finally {
58+ sc.setLocalProperties(oldProperties)
59+ }
60+ }
61+
5162 /**
5263 * Execute childStages and wait until all stages are completed. Use a thread pool to avoid
5364 * blocking on one child stage.
5465 */
5566 def executeChildStages (): Unit = {
56- val executionId = sqlContext.sparkContext.getLocalProperty( SQLExecution . EXECUTION_ID_KEY )
67+ val localProperties = sqlContext.sparkContext.getLocalProperties
5768
5869 // Handle broadcast stages
5970 val broadcastQueryStages : Seq [BroadcastQueryStage ] = child.collect {
6071 case bqs : BroadcastQueryStageInput => bqs.childStage
6172 }
6273 val broadcastFutures = broadcastQueryStages.map { queryStage =>
6374 Future {
64- SQLExecution .withExecutionId (sqlContext.sparkSession, executionId ) {
75+ withLocalProperties (sqlContext.sparkContext, localProperties ) {
6576 queryStage.prepareBroadcast()
6677 }
6778 }(QueryStage .executionContext)
@@ -73,7 +84,7 @@ abstract class QueryStage extends UnaryExecNode {
7384 }
7485 val shuffleStageFutures = shuffleQueryStages.map { queryStage =>
7586 Future {
76- SQLExecution .withExecutionId (sqlContext.sparkSession, executionId ) {
87+ withLocalProperties (sqlContext.sparkContext, localProperties ) {
7788 queryStage.execute()
7889 }
7990 }(QueryStage .executionContext)
0 commit comments