@@ -33,6 +33,7 @@ import org.apache.spark.internal.config
3333import org .apache .spark .rpc .RpcEndpoint
3434import org .apache .spark .scheduler .SchedulingMode .SchedulingMode
3535import org .apache .spark .scheduler .TaskLocality .TaskLocality
36+ import org .apache .spark .status .config .TASK_SKEW_DETECT_ENABLED
3637import org .apache .spark .storage .BlockManagerId
3738import org .apache .spark .util .{AccumulatorV2 , SystemClock , ThreadUtils , Utils }
3839
@@ -148,6 +149,8 @@ private[spark] class TaskSchedulerImpl(
148149
149150 private [scheduler] var barrierCoordinator : RpcEndpoint = null
150151
152+ private lazy val skewDetectEnabled = conf.get(TASK_SKEW_DETECT_ENABLED )
153+
151154 private def maybeInitBarrierCoordinator (): Unit = {
152155 if (barrierCoordinator == null ) {
153156 barrierCoordinator = new BarrierCoordinator (barrierSyncTimeout, sc.listenerBus,
@@ -214,6 +217,16 @@ private[spark] class TaskSchedulerImpl(
214217 }
215218 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
216219
220+ if (skewDetectEnabled) {
221+ val jobId = taskSet.priority
222+ val stageId = taskSet.stageId
223+ val stageAttemptId = taskSet.stageAttemptId
224+ val executionId = Option (taskSet.properties)
225+ .map(_.getProperty(" spark.sql.execution.id" ))
226+ logInfo(s " On tasks submitting stageId: $stageId, " +
227+ s " stageAttemptId: $stageAttemptId, executionId: $executionId, jobId: $jobId" )
228+ }
229+
217230 if (! isLocal && ! hasReceivedTask) {
218231 starvationTimer.scheduleAtFixedRate(new TimerTask () {
219232 override def run () {
0 commit comments