Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}

/** Physical plan for Project. */
Expand Down Expand Up @@ -749,9 +749,11 @@ case class SubqueryExec(name: String, child: SparkPlan)
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val localProps = Utils.cloneProperties(sparkContext.getLocalProperties)
Future {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
sparkContext.setLocalProperties(localProps)
SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
val beforeCollect = System.nanoTime()
// Note that we use .executeCollect() because we don't want to convert data to Scala types
Expand Down