Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,44 @@ case class StackTrace(elems: Seq[String]) {
* Note: code was initially copied from Apache Spark(v3.5.1).
*/
case class ThreadStackTrace(
val threadId: Long,
val threadName: String,
val threadState: Thread.State,
val stackTrace: StackTrace,
val blockedByThreadId: Option[Long],
val blockedByLock: String,
val holdingLocks: Seq[String])
threadId: Long,
threadName: String,
threadState: Thread.State,
stackTrace: StackTrace,
blockedByThreadId: Option[Long],
blockedByLock: String,
holdingLocks: Seq[String],
synchronizers: Seq[String],
monitors: Seq[String],
lockName: Option[String],
lockOwnerName: Option[String],
suspended: Boolean,
inNative: Boolean) {

/**
* Returns a string representation of this thread stack trace w.r.t java.lang.management.ThreadInfo(JDK 8)'s toString.
*
* TODO(SPARK-44896): Also considering adding information os_prio, cpu, elapsed, tid, nid, etc., from the jstack tool
*/
override def toString: String = {
val sb = new StringBuilder(
s""""$threadName" Id=$threadId $threadState""")
lockName.foreach(lock => sb.append(s" on $lock"))
lockOwnerName.foreach {
owner => sb.append(s"""owned by "$owner"""")
}
blockedByThreadId.foreach(id => s" Id=$id")
if (suspended) sb.append(" (suspended)")
if (inNative) sb.append(" (in native)")
sb.append('\n')

sb.append(stackTrace.elems.map(e => s"\tat $e").mkString)

if (synchronizers.nonEmpty) {
sb.append(s"\n\tNumber of locked synchronizers = ${synchronizers.length}\n")
synchronizers.foreach(sync => sb.append(s"\t- $sync\n"))
}
sb.append('\n')
sb.toString
}
}
50 changes: 30 additions & 20 deletions common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -766,29 +766,39 @@ object Utils extends Logging {
* Note: code was initially copied from Apache Spark(v3.5.1).
*/
private def threadInfoToThreadStackTrace(threadInfo: ThreadInfo): ThreadStackTrace = {
val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackFrame -> m).toMap
val stackTrace = StackTrace(threadInfo.getStackTrace.map { frame =>
monitors.get(frame) match {
case Some(monitor) =>
monitor.getLockedStackFrame.toString + s" => holding ${monitor.lockString}"
case None =>
frame.toString
}
val threadState = threadInfo.getThreadState
val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackDepth -> m.toString).toMap
val stackTrace = StackTrace(threadInfo.getStackTrace.zipWithIndex.map { case (frame, idx) =>
val locked =
if (idx == 0 && threadInfo.getLockInfo != null) {
threadState match {
case Thread.State.BLOCKED =>
s"\t- blocked on ${threadInfo.getLockInfo}\n"
case Thread.State.WAITING | Thread.State.TIMED_WAITING =>
s"\t- waiting on ${threadInfo.getLockInfo}\n"
case _ => ""
}
} else ""
val locking = monitors.get(idx).map(mi => s"\t- locked $mi\n").getOrElse("")
s"${frame.toString}\n$locked$locking"
})

// use a set to dedup re-entrant locks that are held at multiple places
val heldLocks =
(threadInfo.getLockedSynchronizers ++ threadInfo.getLockedMonitors).map(_.lockString).toSet

val synchronizers = threadInfo.getLockedSynchronizers.map(_.toString)
val monitorStrs = monitors.values.toSeq
ThreadStackTrace(
threadId = threadInfo.getThreadId,
threadName = threadInfo.getThreadName,
threadState = threadInfo.getThreadState,
stackTrace = stackTrace,
blockedByThreadId =
if (threadInfo.getLockOwnerId < 0) None else Some(threadInfo.getLockOwnerId),
blockedByLock = Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""),
holdingLocks = heldLocks.toSeq)
threadInfo.getThreadId,
threadInfo.getThreadName,
threadState,
stackTrace,
if (threadInfo.getLockOwnerId < 0) None else Some(threadInfo.getLockOwnerId),
Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""),
synchronizers ++ monitorStrs,
synchronizers,
monitorStrs,
Option(threadInfo.getLockName),
Option(threadInfo.getLockOwnerName),
threadInfo.isSuspended,
threadInfo.isInNative)
}

private def readProcessStdout(process: Process): String = {
Expand Down
Loading
Loading