Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.slf4j.MDC

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -107,7 +108,7 @@ private[spark] class Executor(
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused"))
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
ThreadUtils.newCachedThreadPool(threadFactory)
}
private val executorSource = new ExecutorSource(threadPool, executorId)
// Pool used for threads that supervise task killing / cancellation
Expand Down Expand Up @@ -395,6 +396,8 @@ private[spark] class Executor(
}

override def run(): Unit = {
setMDCForTask(taskDescription)

threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
Expand Down Expand Up @@ -693,6 +696,17 @@ private[spark] class Executor(
}
}

private def setMDCForTask(taskDescription: TaskDescription): Unit = {
val properties = taskDescription.properties

MDC.put("taskName", taskDescription.name)

properties.asScala.filter(_._1.startsWith("mdc.")).foreach { item =>
val key = item._1.substring(4)
MDC.put(key, item._2)
}
}

/**
* Supervises the killing / cancellation of a task by sending the interrupted flag, optionally
* sending a Thread.interrupt(), and monitoring the task until it finishes.
Expand Down
112 changes: 107 additions & 5 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.util

import java.util
import java.util.concurrent._
import java.util.concurrent.{Future => JFuture}
import java.util.concurrent.locks.ReentrantLock
Expand All @@ -32,6 +33,99 @@ import org.apache.spark.SparkException

private[spark] object ThreadUtils {

object MDCAwareThreadPoolExecutor {
def newCachedThreadPool(threadFactory: ThreadFactory): ThreadPoolExecutor = {
// The values needs to be synced with `Executors.newCachedThreadPool`
new MDCAwareThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue[Runnable],
threadFactory)
}

def newFixedThreadPool(nThreads: Int, threadFactory: ThreadFactory): ThreadPoolExecutor = {
// The values needs to be synced with `Executors.newFixedThreadPool`
new MDCAwareThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable],
threadFactory)
}

/**
* This method differ from the `java.util.concurrent.Executors#newSingleThreadExecutor` in
* 2 ways:
* 1. It use `org.apache.spark.util.ThreadUtils.MDCAwareThreadPoolExecutor`
* as underline `java.util.concurrent.ExecutorService`
* 2. It does not use the
* `java.util.concurrent.Executors#FinalizableDelegatedExecutorService` from JDK
*/
def newSingleThreadExecutor(threadFactory: ThreadFactory): ExecutorService = {
// The values needs to be synced with `Executors.newSingleThreadExecutor`
Executors.unconfigurableExecutorService(
new MDCAwareThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable],
threadFactory)
)
}

}

class MDCAwareRunnable(proxy: Runnable) extends Runnable {
val callerThreadMDC: util.Map[String, String] = getMDCMap

@inline
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need @inline here BTW? Here seems not performance-sensitive.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case you will submit lots of tasks it will be

private def getMDCMap: util.Map[String, String] = {
org.slf4j.MDC.getCopyOfContextMap match {
case null => new util.HashMap[String, String]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do MDC.clear instead of creating new object?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it more expensive to clear then create new.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate more why it's more expensive?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
     * Removes all of the mappings from this map.
     * The map will be empty after this call returns.
     */
    public void clear() {
        Node<K,V>[] tab;
        modCount++;
        if ((tab = table) != null && size > 0) {
            size = 0;
            for (int i = 0; i < tab.length; ++i)
                tab[i] = null;
        }
    }

so it iterates over each node and clears it. so it o(n) . while new is O(1)

case m => m
}
}

override def run(): Unit = {
val threadMDC = getMDCMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is any reason to keep current thread's MDC? I think when you use MDCAwareRunnable, you always pass in some MDC or getting empty map. Do we use current thread's MDC for logging?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case later someone will use MDC it in other places with the same thread you do not want to throw his.

org.slf4j.MDC.setContextMap(callerThreadMDC)
try {
proxy.run()
} finally {
org.slf4j.MDC.setContextMap(threadMDC)
}
}
}

class MDCAwareScheduledThreadPoolExecutor(
corePoolSize: Int,
threadFactory: ThreadFactory)
extends ScheduledThreadPoolExecutor(corePoolSize, threadFactory) {

override def execute(runnable: Runnable) {
super.execute(new MDCAwareRunnable(runnable))
}
}

class MDCAwareThreadPoolExecutor(
corePoolSize: Int,
maximumPoolSize: Int,
keepAliveTime: Long,
unit: TimeUnit,
workQueue: BlockingQueue[Runnable],
threadFactory: ThreadFactory)
extends ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, threadFactory) {

override def execute(runnable: Runnable) {
super.execute(new MDCAwareRunnable(runnable))
}
}

private val sameThreadExecutionContext =
ExecutionContext.fromExecutorService(sameThreadExecutorService())

Expand Down Expand Up @@ -130,7 +224,15 @@ private[spark] object ThreadUtils {
*/
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point out where we call these ThreadUtil methods to create threads pool at the executor side?

If your use case is for the driver side, can we document the usage? e.g. users can set MDC properties via MDC.put and they can see them in the driver logs, produced by DAGScheduler etc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without my code, it will not work hence the MDC will not passed to the thread pull.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my usecase I set the MDC in job-server so the driver will use it. but without the MDCAware it will not work. so I don't understand what you want to document.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does your job server use these ThreadUtil methods? If it does then please move these MDCAware* classes to your job server codebase.

The executor logs can print the taskName via MDC without all these changes in ThreadUtil. The use case you documented in https://github.com/apache/spark/pull/26624/files#diff-76e731333fb756df3bff5ddb3b731c46R2958 does not need these changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My code just set the MDC and the only way to pass MDC between ThreadPools is the code I add to the ThreadUtils.
the executor creates threads pool, so in order that all logs of the executor will have the MDC populated you need that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the propagating you are talking about? A common Spark application looks like

val session = ...
session.sql(...)
// other jobs
session.stop()

If you are using a job server and your job server submits Spark jobs in a thread pool, please propagate the MDC properties in that thread pool. But that thread pool is not part of Spark.

If you are referencing to other thread pools inside Spark, please point it out.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, the thread pool is part of spark - unless we have different views of what "spark" is.
MDC properties must pass through to the thread pool that is created by spark - hence it is part of spark.
Am I missing anything here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that, private[spark] object ThreadUtils means it's private, and is supposed to be only used within Spark codebase. AFAIK the usages of ThreadUtils inside Spark don't need to propagate the MDC properties.

If you use these private APIs in your Spark application, and has new requirements, please fix it in your Spark application. You can simply put these MDCAware* classes in your application.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will move the ThreadUtil change to another pool request, will need to set MDC in TaskReaper also.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan pushed changes. please review

val threadFactory = namedThreadFactory(prefix)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
MDCAwareThreadPoolExecutor.newCachedThreadPool(threadFactory)
}

/**
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/
def newCachedThreadPool(threadFactory: ThreadFactory): ThreadPoolExecutor = {
MDCAwareThreadPoolExecutor.newCachedThreadPool(threadFactory)
}

/**
Expand All @@ -140,7 +242,7 @@ private[spark] object ThreadUtils {
def newDaemonCachedThreadPool(
prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
val threadPool = new ThreadPoolExecutor(
val threadPool = new MDCAwareThreadPoolExecutor(
maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
keepAliveSeconds,
Expand All @@ -165,15 +267,15 @@ private[spark] object ThreadUtils {
*/
def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
Executors.newSingleThreadExecutor(threadFactory)
MDCAwareThreadPoolExecutor.newSingleThreadExecutor(threadFactory)
}

/**
* Wrapper over ScheduledThreadPoolExecutor.
*/
def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
val executor = new MDCAwareScheduledThreadPoolExecutor(1, threadFactory)
// By default, a cancelled task is not automatically removed from the work queue until its delay
// elapses. We have to enable it manually.
executor.setRemoveOnCancelPolicy(true)
Expand All @@ -189,7 +291,7 @@ private[spark] object ThreadUtils {
.setDaemon(true)
.setNameFormat(s"$threadNamePrefix-%d")
.build()
val executor = new ScheduledThreadPoolExecutor(numThreads, threadFactory)
val executor = new MDCAwareScheduledThreadPoolExecutor(numThreads, threadFactory)
// By default, a cancelled task is not automatically removed from the work queue until its delay
// elapses. We have to enable it manually.
executor.setRemoveOnCancelPolicy(true)
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2955,6 +2955,12 @@ Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can config
`log4j.properties` file in the `conf` directory. One way to start is to copy the existing
`log4j.properties.template` located there.

By default, Spark adds 1 record to the MDC: `taskName`, which shows something
like `task 1.0 in stage 0.0`. You can add `%X{taskName}` to your patternLayout in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

patternLayout -> PatternLayout .

Could you give an example to show how to use it in this document? For example, use an example to show how to specify your application names/identifiers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile were you able to figure this out? My MDC values are not propagating in my logs after following this same procedure.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alefischer13 Just a guess, but it looks like this was changed in 54e702c so that the MDC key still includes the mdc. prefix.

order to print it in the logs.
Moreover, you can use `spark.sparkContext.setLocalProperty("mdc." + name, "value")` to add user specific data into MDC.
The key in MDC will be the string after the `mdc.` prefix.

# Overriding configuration directory

To specify a different configuration directory other than the default "SPARK_HOME/conf",
Expand Down