Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,15 @@ private[spark] class Executor(
val taskId = taskDescription.taskId
val threadName = s"Executor task launch worker for task $taskId"
val taskName = taskDescription.name
val mdcProperties = taskDescription.properties.asScala
.filter(_._1.startsWith("mdc.")).map { item =>
val mdcProperties = (taskDescription.properties.asScala ++
Seq((Executor.TASK_MDC_KEY, taskName)))
.filter(_._1.startsWith(Executor.MDC_KEY)).map { item =>
val key = item._1.substring(4)
if (key == Executor.TASK_MDC_KEY && item._2 != taskName) {
logWarning(s"Override mdc.taskName is not allowed, ignore ${item._2}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we simply set the task mdc key at the end? then it will not be overwritten.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we've already add the task mdc key at the new line 326. We can remove the warning if it's ok to override silently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original setMDCForTask method looks OK as long as we set task mdc key at the end. It also helps avoid duplicated code.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we do not let override the task name in MDC?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then our document is wrong. We must make sure taskName always represent the value as we documented.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should let the user override it, and write to the log that is overridden.
(windows way is not let you do things, Linux way: with great power comes greater responsibility)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit to let users override the task name? It's just confusing to me. Let's not support a non-existing use case.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the UI you can set the description I think many users can benefit from setting the taskName to be the same as that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then, they can use custom MDC properties instead?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
(key, item._2)
}.toSeq
}.toMap

/** If specified, this task has been killed and this option contains the reason. */
@volatile private var reasonIfKilled: Option[String] = None
Expand Down Expand Up @@ -401,9 +405,22 @@ private[spark] class Executor(
}

override def run(): Unit = {
val oldMdcProperties = mdcProperties.keys.map(k => (k, MDC.get(k)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply clear all the mdc properties here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can normally, and I'd prefer to clear it inside finally.

try {
mdcProperties.foreach { case (k, v) => MDC.put(k, v) }
runInternal()
} finally {
oldMdcProperties.foreach { case (k, v) =>
if (v == null) {
MDC.remove(k)
} else {
MDC.put(k, v)
}
}
}
}

setMDCForTask(taskName, mdcProperties)

private def runInternal(): Unit = {
threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
Expand Down Expand Up @@ -702,14 +719,6 @@ private[spark] class Executor(
}
}

private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
MDC.put("taskName", taskName)

mdc.foreach { case (key, value) =>
MDC.put(key, value)
}
}

/**
* Supervises the killing / cancellation of a task by sending the interrupted flag, optionally
* sending a Thread.interrupt(), and monitoring the task until it finishes.
Expand Down Expand Up @@ -750,9 +759,23 @@ private[spark] class Executor(
private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)

override def run(): Unit = {
val mdcProperties = taskRunner.mdcProperties
val oldMdcProperties = mdcProperties.keys.map(k => (k, MDC.get(k)))
try {
mdcProperties.foreach { case (k, v) => MDC.put(k, v) }
runInternal()
} finally {
oldMdcProperties.foreach { case (k, v) =>
if (v == null) {
MDC.remove(k)
} else {
MDC.put(k, v)
}
}
}
}

setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties)

private def runInternal(): Unit = {
val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs
Expand Down Expand Up @@ -969,4 +992,7 @@ private[spark] object Executor {
// task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be
// used instead.
val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties]

val MDC_KEY = "mdc."
val TASK_MDC_KEY = s"${MDC_KEY}taskName"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you change this key you need also to update the docs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the change doesn't introduce any difference to the end-user, no?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I remember before it was just taskName without mdc.taskName

Copy link
Member Author

@Ngone51 Ngone51 Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used internally in order to handle taskName consistently with custom MDC properties. It has no effect on users.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}