Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.slf4j.MDC

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -320,7 +321,12 @@ private[spark] class Executor(

val taskId = taskDescription.taskId
val threadName = s"Executor task launch worker for task $taskId"
private val taskName = taskDescription.name
val taskName = taskDescription.name
val mdcProperties = taskDescription.properties.asScala
.filter(_._1.startsWith("mdc.")).map { item =>
val key = item._1.substring(4)
(key, item._2)
}.toSeq

/** If specified, this task has been killed and this option contains the reason. */
@volatile private var reasonIfKilled: Option[String] = None
Expand Down Expand Up @@ -395,6 +401,9 @@ private[spark] class Executor(
}

override def run(): Unit = {

setMDCForTask(taskName, mdcProperties)

threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
Expand Down Expand Up @@ -693,6 +702,14 @@ private[spark] class Executor(
}
}

private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
MDC.put("taskName", taskName)

mdc.foreach { case (key, value) =>
MDC.put(key, value)
}
}

/**
* Supervises the killing / cancellation of a task by sending the interrupted flag, optionally
* sending a Thread.interrupt(), and monitoring the task until it finishes.
Expand Down Expand Up @@ -733,6 +750,9 @@ private[spark] class Executor(
private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)

override def run(): Unit = {

setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties)

val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2955,6 +2955,12 @@ Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can config
`log4j.properties` file in the `conf` directory. One way to start is to copy the existing
`log4j.properties.template` located there.

By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): `taskName`, which shows something
like `task 1.0 in stage 0.0`. You can add `%X{taskName}` to your patternLayout in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

patternLayout -> PatternLayout .

Could you give an example to show how to use it in this document? For example, use an example to show how to specify your application names/identifiers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile were you able to figure this out? My MDC values are not propagating in my logs after following this same procedure.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alefischer13 Just a guess, but it looks like this was changed in 54e702c so that the MDC key still includes the mdc. prefix.

order to print it in the logs.
Moreover, you can use `spark.sparkContext.setLocalProperty("mdc." + name, "value")` to add user specific data into MDC.
The key in MDC will be the string after the `mdc.` prefix.

# Overriding configuration directory

To specify a different configuration directory other than the default "SPARK_HOME/conf",
Expand Down