From c0e6f03cf17415f274319d1a1b03a3eaf438fc96 Mon Sep 17 00:00:00 2001 From: Izek Greenfield Date: Tue, 5 May 2020 22:19:15 +0300 Subject: [PATCH 1/8] Add MDC support in executor --- .../org/apache/spark/executor/Executor.scala | 16 ++- .../org/apache/spark/util/ThreadUtils.scala | 114 +++++++++++++++++- docs/configuration.md | 6 + 3 files changed, 129 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2bfa1cea4b26f..d240226d89165 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -34,6 +34,7 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.slf4j.MDC import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -107,7 +108,7 @@ private[spark] class Executor( .setNameFormat("Executor task launch worker-%d") .setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused")) .build() - Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] + ThreadUtils.newCachedThreadPool(threadFactory) } private val executorSource = new ExecutorSource(threadPool, executorId) // Pool used for threads that supervise task killing / cancellation @@ -395,6 +396,8 @@ private[spark] class Executor( } override def run(): Unit = { + setMDCForTask(taskDescription) + threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) val threadMXBean = ManagementFactory.getThreadMXBean @@ -693,6 +696,17 @@ private[spark] class Executor( } } + private def setMDCForTask(taskDescription: TaskDescription): Unit = { + val properties = taskDescription.properties + + MDC.put("taskName", taskDescription.name) + + properties.asScala.filter(_._1.startsWith("mdc.")).foreach { item => + val key = item._1.substring(4) + MDC.put(key, item._2) + } + } + /** * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally * sending a Thread.interrupt(), and monitoring the task until it finishes. diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 78206c51c1028..3774fe74f8462 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.util +import java.util import java.util.concurrent._ import java.util.concurrent.{Future => JFuture} import java.util.concurrent.locks.ReentrantLock @@ -32,6 +33,99 @@ import org.apache.spark.SparkException private[spark] object ThreadUtils { + object MDCAwareThreadPoolExecutor { + def newCachedThreadPool(threadFactory: ThreadFactory): ThreadPoolExecutor = { + // The values needs to be synced with `Executors.newCachedThreadPool` + new MDCAwareThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue[Runnable], + threadFactory) + } + + def newFixedThreadPool(nThreads: Int, threadFactory: ThreadFactory): ThreadPoolExecutor = { + // The values needs to be synced with `Executors.newFixedThreadPool` + new MDCAwareThreadPoolExecutor( + nThreads, + nThreads, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) + } + + /** + * This method differ from the `java.util.concurrent.Executors#newSingleThreadExecutor` in + * 2 ways: + * 1. It use `org.apache.spark.util.ThreadUtils.MDCAwareThreadPoolExecutor` + * as underline `java.util.concurrent.ExecutorService` + * 2. It does not use the + * `java.util.concurrent.Executors#FinalizableDelegatedExecutorService` from JDK + */ + def newSingleThreadExecutor(threadFactory: ThreadFactory): ExecutorService = { + // The values needs to be synced with `Executors.newSingleThreadExecutor` + Executors.unconfigurableExecutorService( + new MDCAwareThreadPoolExecutor( + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory) + ) + } + + } + + class MDCAwareRunnable(proxy: Runnable) extends Runnable { + val callerThreadMDC: util.Map[String, String] = getMDCMap + + @inline + private def getMDCMap: util.Map[String, String] = { + org.slf4j.MDC.getCopyOfContextMap match { + case null => new util.HashMap[String, String]() + case m => m + } + } + + override def run(): Unit = { + val threadMDC = getMDCMap + org.slf4j.MDC.setContextMap(callerThreadMDC) + try { + proxy.run() + } finally { + org.slf4j.MDC.setContextMap(threadMDC) + } + } + } + + class MDCAwareScheduledThreadPoolExecutor( + corePoolSize: Int, + threadFactory: ThreadFactory) + extends ScheduledThreadPoolExecutor(corePoolSize, threadFactory) { + + override def execute(runnable: Runnable) { + super.execute(new MDCAwareRunnable(runnable)) + } + } + + class MDCAwareThreadPoolExecutor( + corePoolSize: Int, + maximumPoolSize: Int, + keepAliveTime: Long, + unit: TimeUnit, + workQueue: BlockingQueue[Runnable], + threadFactory: ThreadFactory) + extends ThreadPoolExecutor(corePoolSize, maximumPoolSize, + keepAliveTime, unit, workQueue, threadFactory) { + + override def execute(runnable: Runnable) { + super.execute(new MDCAwareRunnable(runnable)) + } + } + private val sameThreadExecutionContext = ExecutionContext.fromExecutorService(sameThreadExecutorService()) @@ -130,7 +224,15 @@ private[spark] object ThreadUtils { */ def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) - Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] + MDCAwareThreadPoolExecutor.newCachedThreadPool(threadFactory) + } + + /** + * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. + */ + def newCachedThreadPool(threadFactory: ThreadFactory): ThreadPoolExecutor = { + MDCAwareThreadPoolExecutor.newCachedThreadPool(threadFactory) } /** @@ -140,7 +242,7 @@ private[spark] object ThreadUtils { def newDaemonCachedThreadPool( prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) - val threadPool = new ThreadPoolExecutor( + val threadPool = new MDCAwareThreadPoolExecutor( maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used keepAliveSeconds, @@ -157,7 +259,7 @@ private[spark] object ThreadUtils { */ def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) - Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor] + MDCAwareThreadPoolExecutor.newFixedThreadPool(nThreads, threadFactory) } /** @@ -165,7 +267,7 @@ private[spark] object ThreadUtils { */ def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = { val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build() - Executors.newSingleThreadExecutor(threadFactory) + MDCAwareThreadPoolExecutor.newSingleThreadExecutor(threadFactory) } /** @@ -173,7 +275,7 @@ private[spark] object ThreadUtils { */ def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = { val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build() - val executor = new ScheduledThreadPoolExecutor(1, threadFactory) + val executor = new MDCAwareScheduledThreadPoolExecutor(1, threadFactory) // By default, a cancelled task is not automatically removed from the work queue until its delay // elapses. We have to enable it manually. executor.setRemoveOnCancelPolicy(true) @@ -189,7 +291,7 @@ private[spark] object ThreadUtils { .setDaemon(true) .setNameFormat(s"$threadNamePrefix-%d") .build() - val executor = new ScheduledThreadPoolExecutor(numThreads, threadFactory) + val executor = new MDCAwareScheduledThreadPoolExecutor(numThreads, threadFactory) // By default, a cancelled task is not automatically removed from the work queue until its delay // elapses. We have to enable it manually. executor.setRemoveOnCancelPolicy(true) diff --git a/docs/configuration.md b/docs/configuration.md index fce04b940594b..b615854eb730c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2955,6 +2955,12 @@ Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can config `log4j.properties` file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there. +By default, Spark adds 1 record to the MDC: `taskName`, which shows something +like `task 1.0 in stage 0.0`. You can add `%X{taskName}` to your patternLayout in +order to print it in the logs. +Moreover, you can use `spark.sparkContext.setLocalProperty("mdc." + name, "value")` to add user specific data into MDC. +The key in MDC will be the string after the `mdc.` prefix. + # Overriding configuration directory To specify a different configuration directory other than the default "SPARK_HOME/conf", From b43f86138a26fe9b0c0e635bceabcec23de6df98 Mon Sep 17 00:00:00 2001 From: Izek Greenfield Date: Mon, 11 May 2020 09:12:59 +0300 Subject: [PATCH 2/8] use only from driver code --- core/src/main/scala/org/apache/spark/util/ThreadUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 3774fe74f8462..c0c711e20bb29 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -259,7 +259,7 @@ private[spark] object ThreadUtils { */ def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) - MDCAwareThreadPoolExecutor.newFixedThreadPool(nThreads, threadFactory) + Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor] } /** From 0324746186c713e7c3b223a2bbc73ecf5af69093 Mon Sep 17 00:00:00 2001 From: Izek Greenfield Date: Mon, 18 May 2020 19:35:50 +0300 Subject: [PATCH 3/8] revert ThreadUtils change --- .../org/apache/spark/util/ThreadUtils.scala | 112 +----------------- 1 file changed, 5 insertions(+), 107 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index c0c711e20bb29..78206c51c1028 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -17,7 +17,6 @@ package org.apache.spark.util -import java.util import java.util.concurrent._ import java.util.concurrent.{Future => JFuture} import java.util.concurrent.locks.ReentrantLock @@ -33,99 +32,6 @@ import org.apache.spark.SparkException private[spark] object ThreadUtils { - object MDCAwareThreadPoolExecutor { - def newCachedThreadPool(threadFactory: ThreadFactory): ThreadPoolExecutor = { - // The values needs to be synced with `Executors.newCachedThreadPool` - new MDCAwareThreadPoolExecutor( - 0, - Integer.MAX_VALUE, - 60L, - TimeUnit.SECONDS, - new SynchronousQueue[Runnable], - threadFactory) - } - - def newFixedThreadPool(nThreads: Int, threadFactory: ThreadFactory): ThreadPoolExecutor = { - // The values needs to be synced with `Executors.newFixedThreadPool` - new MDCAwareThreadPoolExecutor( - nThreads, - nThreads, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue[Runnable], - threadFactory) - } - - /** - * This method differ from the `java.util.concurrent.Executors#newSingleThreadExecutor` in - * 2 ways: - * 1. It use `org.apache.spark.util.ThreadUtils.MDCAwareThreadPoolExecutor` - * as underline `java.util.concurrent.ExecutorService` - * 2. It does not use the - * `java.util.concurrent.Executors#FinalizableDelegatedExecutorService` from JDK - */ - def newSingleThreadExecutor(threadFactory: ThreadFactory): ExecutorService = { - // The values needs to be synced with `Executors.newSingleThreadExecutor` - Executors.unconfigurableExecutorService( - new MDCAwareThreadPoolExecutor( - 1, - 1, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue[Runnable], - threadFactory) - ) - } - - } - - class MDCAwareRunnable(proxy: Runnable) extends Runnable { - val callerThreadMDC: util.Map[String, String] = getMDCMap - - @inline - private def getMDCMap: util.Map[String, String] = { - org.slf4j.MDC.getCopyOfContextMap match { - case null => new util.HashMap[String, String]() - case m => m - } - } - - override def run(): Unit = { - val threadMDC = getMDCMap - org.slf4j.MDC.setContextMap(callerThreadMDC) - try { - proxy.run() - } finally { - org.slf4j.MDC.setContextMap(threadMDC) - } - } - } - - class MDCAwareScheduledThreadPoolExecutor( - corePoolSize: Int, - threadFactory: ThreadFactory) - extends ScheduledThreadPoolExecutor(corePoolSize, threadFactory) { - - override def execute(runnable: Runnable) { - super.execute(new MDCAwareRunnable(runnable)) - } - } - - class MDCAwareThreadPoolExecutor( - corePoolSize: Int, - maximumPoolSize: Int, - keepAliveTime: Long, - unit: TimeUnit, - workQueue: BlockingQueue[Runnable], - threadFactory: ThreadFactory) - extends ThreadPoolExecutor(corePoolSize, maximumPoolSize, - keepAliveTime, unit, workQueue, threadFactory) { - - override def execute(runnable: Runnable) { - super.execute(new MDCAwareRunnable(runnable)) - } - } - private val sameThreadExecutionContext = ExecutionContext.fromExecutorService(sameThreadExecutorService()) @@ -224,15 +130,7 @@ private[spark] object ThreadUtils { */ def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) - MDCAwareThreadPoolExecutor.newCachedThreadPool(threadFactory) - } - - /** - * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a - * unique, sequentially assigned integer. - */ - def newCachedThreadPool(threadFactory: ThreadFactory): ThreadPoolExecutor = { - MDCAwareThreadPoolExecutor.newCachedThreadPool(threadFactory) + Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] } /** @@ -242,7 +140,7 @@ private[spark] object ThreadUtils { def newDaemonCachedThreadPool( prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) - val threadPool = new MDCAwareThreadPoolExecutor( + val threadPool = new ThreadPoolExecutor( maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used keepAliveSeconds, @@ -267,7 +165,7 @@ private[spark] object ThreadUtils { */ def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = { val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build() - MDCAwareThreadPoolExecutor.newSingleThreadExecutor(threadFactory) + Executors.newSingleThreadExecutor(threadFactory) } /** @@ -275,7 +173,7 @@ private[spark] object ThreadUtils { */ def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = { val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build() - val executor = new MDCAwareScheduledThreadPoolExecutor(1, threadFactory) + val executor = new ScheduledThreadPoolExecutor(1, threadFactory) // By default, a cancelled task is not automatically removed from the work queue until its delay // elapses. We have to enable it manually. executor.setRemoveOnCancelPolicy(true) @@ -291,7 +189,7 @@ private[spark] object ThreadUtils { .setDaemon(true) .setNameFormat(s"$threadNamePrefix-%d") .build() - val executor = new MDCAwareScheduledThreadPoolExecutor(numThreads, threadFactory) + val executor = new ScheduledThreadPoolExecutor(numThreads, threadFactory) // By default, a cancelled task is not automatically removed from the work queue until its delay // elapses. We have to enable it manually. executor.setRemoveOnCancelPolicy(true) From 0c23e26902944685846cbad886694dca91f9f806 Mon Sep 17 00:00:00 2001 From: Izek Greenfield Date: Mon, 18 May 2020 19:38:00 +0300 Subject: [PATCH 4/8] Set MDC also in TaskReaper --- .../org/apache/spark/executor/Executor.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d240226d89165..8f0b7cc5de566 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -321,7 +321,11 @@ private[spark] class Executor( val taskId = taskDescription.taskId val threadName = s"Executor task launch worker for task $taskId" - private val taskName = taskDescription.name + val taskName = taskDescription.name + val mdcProperties = taskDescription.properties.asScala.filter(_._1.startsWith("mdc.")).map { item => + val key = item._1.substring(4) + (key, item._2) + }.toMap /** If specified, this task has been killed and this option contains the reason. */ @volatile private var reasonIfKilled: Option[String] = None @@ -396,7 +400,8 @@ private[spark] class Executor( } override def run(): Unit = { - setMDCForTask(taskDescription) + + setMDCForTask(taskDescription.name, mdcProperties) threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) @@ -696,14 +701,11 @@ private[spark] class Executor( } } - private def setMDCForTask(taskDescription: TaskDescription): Unit = { - val properties = taskDescription.properties - - MDC.put("taskName", taskDescription.name) + private def setMDCForTask(taskName: String, mdc: scala.collection.Map[String, String]): Unit = { + MDC.put("taskName", taskName) - properties.asScala.filter(_._1.startsWith("mdc.")).foreach { item => - val key = item._1.substring(4) - MDC.put(key, item._2) + mdc.foreach { case (key, value) => + MDC.put(key, value) } } @@ -747,6 +749,9 @@ private[spark] class Executor( private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP) override def run(): Unit = { + + setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties) + val startTimeNs = System.nanoTime() def elapsedTimeNs = System.nanoTime() - startTimeNs def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs From 04078b4afee156de1be1ec9482416a59678a446b Mon Sep 17 00:00:00 2001 From: Izek Greenfield Date: Mon, 18 May 2020 19:52:25 +0300 Subject: [PATCH 5/8] Add missing method --- .../main/scala/org/apache/spark/util/ThreadUtils.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 78206c51c1028..e0a54b01ecd2a 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -133,6 +133,14 @@ private[spark] object ThreadUtils { Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] } + /** + * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. + */ + def newCachedThreadPool(threadFactory: ThreadFactory): ThreadPoolExecutor = { + Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] + } + /** * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer. From 4deda151cccfbe8118c6bd31ce972ade801bff1b Mon Sep 17 00:00:00 2001 From: Izek Greenfield Date: Mon, 18 May 2020 20:21:00 +0300 Subject: [PATCH 6/8] Fix scalaStyle --- .../main/scala/org/apache/spark/executor/Executor.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 8f0b7cc5de566..a832c9ebc3b9d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -322,10 +322,11 @@ private[spark] class Executor( val taskId = taskDescription.taskId val threadName = s"Executor task launch worker for task $taskId" val taskName = taskDescription.name - val mdcProperties = taskDescription.properties.asScala.filter(_._1.startsWith("mdc.")).map { item => - val key = item._1.substring(4) - (key, item._2) - }.toMap + val mdcProperties = taskDescription.properties.asScala + .filter(_._1.startsWith("mdc.")).map { item => + val key = item._1.substring(4) + (key, item._2) + }.toMap /** If specified, this task has been killed and this option contains the reason. */ @volatile private var reasonIfKilled: Option[String] = None From 0685aba05d364918c7241a7b66f441f353852a3f Mon Sep 17 00:00:00 2001 From: Izek Greenfield Date: Tue, 19 May 2020 09:04:17 +0300 Subject: [PATCH 7/8] Fix instructions --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index b615854eb730c..420942f7b7bbb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2955,7 +2955,7 @@ Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can config `log4j.properties` file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there. -By default, Spark adds 1 record to the MDC: `taskName`, which shows something +By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): `taskName`, which shows something like `task 1.0 in stage 0.0`. You can add `%X{taskName}` to your patternLayout in order to print it in the logs. Moreover, you can use `spark.sparkContext.setLocalProperty("mdc." + name, "value")` to add user specific data into MDC. From d5c1aa97bd69a25de5de03ec79284f39dace3198 Mon Sep 17 00:00:00 2001 From: Izek Greenfield Date: Tue, 19 May 2020 09:04:32 +0300 Subject: [PATCH 8/8] style changes --- .../main/scala/org/apache/spark/executor/Executor.scala | 8 ++++---- .../main/scala/org/apache/spark/util/ThreadUtils.scala | 8 -------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a832c9ebc3b9d..45cec726c4ca7 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -108,7 +108,7 @@ private[spark] class Executor( .setNameFormat("Executor task launch worker-%d") .setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused")) .build() - ThreadUtils.newCachedThreadPool(threadFactory) + Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] } private val executorSource = new ExecutorSource(threadPool, executorId) // Pool used for threads that supervise task killing / cancellation @@ -326,7 +326,7 @@ private[spark] class Executor( .filter(_._1.startsWith("mdc.")).map { item => val key = item._1.substring(4) (key, item._2) - }.toMap + }.toSeq /** If specified, this task has been killed and this option contains the reason. */ @volatile private var reasonIfKilled: Option[String] = None @@ -402,7 +402,7 @@ private[spark] class Executor( override def run(): Unit = { - setMDCForTask(taskDescription.name, mdcProperties) + setMDCForTask(taskName, mdcProperties) threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) @@ -702,7 +702,7 @@ private[spark] class Executor( } } - private def setMDCForTask(taskName: String, mdc: scala.collection.Map[String, String]): Unit = { + private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = { MDC.put("taskName", taskName) mdc.foreach { case (key, value) => diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index e0a54b01ecd2a..78206c51c1028 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -133,14 +133,6 @@ private[spark] object ThreadUtils { Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] } - /** - * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a - * unique, sequentially assigned integer. - */ - def newCachedThreadPool(threadFactory: ThreadFactory): ThreadPoolExecutor = { - Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] - } - /** * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.