Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function toggleThreadStackTrace(threadId, forceAdd) {
if (stackTrace.length == 0) {
var stackTraceText = $('#' + threadId + "_td_stacktrace").html()
var threadCell = $("#thread_" + threadId + "_tr")
threadCell.after("<tr id=\"" + threadId +"_stacktrace\" class=\"accordion-body\"><td colspan=\"3\"><pre>" +
threadCell.after("<tr id=\"" + threadId +"_stacktrace\" class=\"accordion-body\"><td colspan=\"4\"><pre>" +
stackTraceText + "</pre></td></tr>")
} else {
if (!forceAdd) {
Expand Down Expand Up @@ -73,6 +73,7 @@ function onMouseOverAndOut(threadId) {
$("#" + threadId + "_td_id").toggleClass("threaddump-td-mouseover");
$("#" + threadId + "_td_name").toggleClass("threaddump-td-mouseover");
$("#" + threadId + "_td_state").toggleClass("threaddump-td-mouseover");
$("#" + threadId + "_td_locking").toggleClass("threaddump-td-mouseover");
}

function onSearchStringChange() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,24 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
}
}.map { thread =>
val threadId = thread.threadId
val blockedBy = thread.blockedByThreadId match {
case Some(blockedByThreadId) =>
<div>
Blocked by <a href={s"#${thread.blockedByThreadId}_td_id"}>
Thread {thread.blockedByThreadId} {thread.blockedByLock}</a>
</div>
case None => Text("")
}
val heldLocks = thread.holdingLocks.mkString(", ")

<tr id={s"thread_${threadId}_tr"} class="accordion-heading"
onclick={s"toggleThreadStackTrace($threadId, false)"}
onmouseover={s"onMouseOverAndOut($threadId)"}
onmouseout={s"onMouseOverAndOut($threadId)"}>
<td id={s"${threadId}_td_id"}>{threadId}</td>
<td id={s"${threadId}_td_name"}>{thread.threadName}</td>
<td id={s"${threadId}_td_state"}>{thread.threadState}</td>
<td id={s"${threadId}_td_locking"}>{blockedBy}{heldLocks}</td>
<td id={s"${threadId}_td_stacktrace"} class="hidden">{thread.stackTrace}</td>
</tr>
}
Expand Down Expand Up @@ -86,6 +97,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
<th onClick="collapseAllThreadStackTrace(false)">Thread ID</th>
<th onClick="collapseAllThreadStackTrace(false)">Thread Name</th>
<th onClick="collapseAllThreadStackTrace(false)">Thread State</th>
<th onClick="collapseAllThreadStackTrace(false)">Thread Locks</th>
</thead>
<tbody>{dumpRows}</tbody>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ private[spark] case class ThreadStackTrace(
threadId: Long,
threadName: String,
threadState: Thread.State,
stackTrace: String)
stackTrace: String,
blockedByThreadId: Option[Long],
blockedByLock: String,
holdingLocks: Seq[String])

34 changes: 30 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.util

import java.io._
import java.lang.management.ManagementFactory
import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo}
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.Channels
Expand Down Expand Up @@ -2007,15 +2007,41 @@ private[spark] object Utils extends Logging {
}
}

private implicit class Lock(lock: LockInfo) {
def lockString: String = {
lock match {
case monitor: MonitorInfo =>
s"Monitor(${lock.getClassName}@${lock.getIdentityHashCode}})"
case _ =>
s"Lock(${lock.getClassName}@${lock.getIdentityHashCode}})"
}
}
}

/** Return a thread dump of all threads' stacktraces. Used to capture dumps for the web UI */
def getThreadDump(): Array[ThreadStackTrace] = {
// We need to filter out null values here because dumpAllThreads() may return null array
// elements for threads that are dead / don't exist.
val threadInfos = ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).filter(_ != null)
threadInfos.sortBy(_.getThreadId).map { case threadInfo =>
val stackTrace = threadInfo.getStackTrace.map(_.toString).mkString("\n")
ThreadStackTrace(threadInfo.getThreadId, threadInfo.getThreadName,
threadInfo.getThreadState, stackTrace)
val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackFrame -> m).toMap
val stackTrace = threadInfo.getStackTrace.map { frame =>
monitors.get(frame) match {
case Some(monitor) =>
monitor.getLockedStackFrame.toString + s" => holding ${monitor.lockString}"
case None =>
frame.toString
}
}.mkString("\n")

// use a set to dedup re-entrant locks that are held at multiple places
val heldLocks = (threadInfo.getLockedSynchronizers.map(_.lockString)
++ threadInfo.getLockedMonitors.map(_.lockString)
).toSet

ThreadStackTrace(threadInfo.getThreadId, threadInfo.getThreadName, threadInfo.getThreadState,
stackTrace, if (threadInfo.getLockOwnerId < 0) None else Some(threadInfo.getLockOwnerId),
Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""), heldLocks.toSeq)
}
}

Expand Down