diff --git a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala index dd5e7414060..c8b6491b27e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala @@ -424,10 +424,44 @@ case class StackTrace(elems: Seq[String]) { * Note: code was initially copied from Apache Spark(v3.5.1). */ case class ThreadStackTrace( - val threadId: Long, - val threadName: String, - val threadState: Thread.State, - val stackTrace: StackTrace, - val blockedByThreadId: Option[Long], - val blockedByLock: String, - val holdingLocks: Seq[String]) + threadId: Long, + threadName: String, + threadState: Thread.State, + stackTrace: StackTrace, + blockedByThreadId: Option[Long], + blockedByLock: String, + holdingLocks: Seq[String], + synchronizers: Seq[String], + monitors: Seq[String], + lockName: Option[String], + lockOwnerName: Option[String], + suspended: Boolean, + inNative: Boolean) { + + /** + * Returns a string representation of this thread stack trace w.r.t java.lang.management.ThreadInfo(JDK 8)'s toString. + * + * TODO(SPARK-44896): Also considering adding information os_prio, cpu, elapsed, tid, nid, etc., from the jstack tool + */ + override def toString: String = { + val sb = new StringBuilder( + s""""$threadName" Id=$threadId $threadState""") + lockName.foreach(lock => sb.append(s" on $lock")) + lockOwnerName.foreach { + owner => sb.append(s"""owned by "$owner"""") + } + blockedByThreadId.foreach(id => s" Id=$id") + if (suspended) sb.append(" (suspended)") + if (inNative) sb.append(" (in native)") + sb.append('\n') + + sb.append(stackTrace.elems.map(e => s"\tat $e").mkString) + + if (synchronizers.nonEmpty) { + sb.append(s"\n\tNumber of locked synchronizers = ${synchronizers.length}\n") + synchronizers.foreach(sync => sb.append(s"\t- $sync\n")) + } + sb.append('\n') + sb.toString + } +} diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index dc5b6ea346f..7c8f168466d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -766,29 +766,39 @@ object Utils extends Logging { * Note: code was initially copied from Apache Spark(v3.5.1). */ private def threadInfoToThreadStackTrace(threadInfo: ThreadInfo): ThreadStackTrace = { - val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackFrame -> m).toMap - val stackTrace = StackTrace(threadInfo.getStackTrace.map { frame => - monitors.get(frame) match { - case Some(monitor) => - monitor.getLockedStackFrame.toString + s" => holding ${monitor.lockString}" - case None => - frame.toString - } + val threadState = threadInfo.getThreadState + val monitors = threadInfo.getLockedMonitors.map(m => m.getLockedStackDepth -> m.toString).toMap + val stackTrace = StackTrace(threadInfo.getStackTrace.zipWithIndex.map { case (frame, idx) => + val locked = + if (idx == 0 && threadInfo.getLockInfo != null) { + threadState match { + case Thread.State.BLOCKED => + s"\t- blocked on ${threadInfo.getLockInfo}\n" + case Thread.State.WAITING | Thread.State.TIMED_WAITING => + s"\t- waiting on ${threadInfo.getLockInfo}\n" + case _ => "" + } + } else "" + val locking = monitors.get(idx).map(mi => s"\t- locked $mi\n").getOrElse("") + s"${frame.toString}\n$locked$locking" }) - // use a set to dedup re-entrant locks that are held at multiple places - val heldLocks = - (threadInfo.getLockedSynchronizers ++ threadInfo.getLockedMonitors).map(_.lockString).toSet - + val synchronizers = threadInfo.getLockedSynchronizers.map(_.toString) + val monitorStrs = monitors.values.toSeq ThreadStackTrace( - threadId = threadInfo.getThreadId, - threadName = threadInfo.getThreadName, - threadState = threadInfo.getThreadState, - stackTrace = stackTrace, - blockedByThreadId = - if (threadInfo.getLockOwnerId < 0) None else Some(threadInfo.getLockOwnerId), - blockedByLock = Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""), - holdingLocks = heldLocks.toSeq) + threadInfo.getThreadId, + threadInfo.getThreadName, + threadState, + stackTrace, + if (threadInfo.getLockOwnerId < 0) None else Some(threadInfo.getLockOwnerId), + Option(threadInfo.getLockInfo).map(_.lockString).getOrElse(""), + synchronizers ++ monitorStrs, + synchronizers, + monitorStrs, + Option(threadInfo.getLockName), + Option(threadInfo.getLockOwnerName), + threadInfo.isSuspended, + threadInfo.isInNative) } private def readProcessStdout(process: Process): String = { diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ThreadStack.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ThreadStack.java index dbcdd2241e4..ff133a23c71 100644 --- a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ThreadStack.java +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ThreadStack.java @@ -41,7 +41,15 @@ ThreadStack.JSON_PROPERTY_STACK_TRACE, ThreadStack.JSON_PROPERTY_BLOCKED_BY_THREAD_ID, ThreadStack.JSON_PROPERTY_BLOCKED_BY_LOCK, - ThreadStack.JSON_PROPERTY_HOLDING_LOCKS + ThreadStack.JSON_PROPERTY_HOLDING_LOCKS, + ThreadStack.JSON_PROPERTY_SYNCHRONIZERS, + ThreadStack.JSON_PROPERTY_MONITORS, + ThreadStack.JSON_PROPERTY_LOCK_NAME, + ThreadStack.JSON_PROPERTY_LOCK_OWNER_NAME, + ThreadStack.JSON_PROPERTY_SUSPENDED, + ThreadStack.JSON_PROPERTY_IN_NATIVE, + ThreadStack.JSON_PROPERTY_IS_DAEMON, + ThreadStack.JSON_PROPERTY_PRIORITY }) @javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0") public class ThreadStack { @@ -66,6 +74,30 @@ public class ThreadStack { public static final String JSON_PROPERTY_HOLDING_LOCKS = "holdingLocks"; private List holdingLocks = new ArrayList<>(); + public static final String JSON_PROPERTY_SYNCHRONIZERS = "synchronizers"; + private List synchronizers = new ArrayList<>(); + + public static final String JSON_PROPERTY_MONITORS = "monitors"; + private List monitors = new ArrayList<>(); + + public static final String JSON_PROPERTY_LOCK_NAME = "lockName"; + private String lockName; + + public static final String JSON_PROPERTY_LOCK_OWNER_NAME = "lockOwnerName"; + private String lockOwnerName; + + public static final String JSON_PROPERTY_SUSPENDED = "suspended"; + private Boolean suspended; + + public static final String JSON_PROPERTY_IN_NATIVE = "inNative"; + private Boolean inNative; + + public static final String JSON_PROPERTY_IS_DAEMON = "isDaemon"; + private Boolean isDaemon; + + public static final String JSON_PROPERTY_PRIORITY = "priority"; + private Integer priority; + public ThreadStack() { } @@ -260,6 +292,222 @@ public void setHoldingLocks(List holdingLocks) { this.holdingLocks = holdingLocks; } + public ThreadStack synchronizers(List synchronizers) { + + this.synchronizers = synchronizers; + return this; + } + + public ThreadStack addSynchronizersItem(String synchronizersItem) { + if (this.synchronizers == null) { + this.synchronizers = new ArrayList<>(); + } + this.synchronizers.add(synchronizersItem); + return this; + } + + /** + * The ownable synchronizers locked by the thread. + * @return synchronizers + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_SYNCHRONIZERS) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public List getSynchronizers() { + return synchronizers; + } + + + @JsonProperty(JSON_PROPERTY_SYNCHRONIZERS) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setSynchronizers(List synchronizers) { + this.synchronizers = synchronizers; + } + + public ThreadStack monitors(List monitors) { + + this.monitors = monitors; + return this; + } + + public ThreadStack addMonitorsItem(String monitorsItem) { + if (this.monitors == null) { + this.monitors = new ArrayList<>(); + } + this.monitors.add(monitorsItem); + return this; + } + + /** + * The object monitors locked by the thread. + * @return monitors + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_MONITORS) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public List getMonitors() { + return monitors; + } + + + @JsonProperty(JSON_PROPERTY_MONITORS) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setMonitors(List monitors) { + this.monitors = monitors; + } + + public ThreadStack lockName(String lockName) { + + this.lockName = lockName; + return this; + } + + /** + * The string representation of the object on which the thread is blocked if any. + * @return lockName + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_LOCK_NAME) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public String getLockName() { + return lockName; + } + + + @JsonProperty(JSON_PROPERTY_LOCK_NAME) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setLockName(String lockName) { + this.lockName = lockName; + } + + public ThreadStack lockOwnerName(String lockOwnerName) { + + this.lockOwnerName = lockOwnerName; + return this; + } + + /** + * The name of the thread that owns the object this thread is blocked on. + * @return lockOwnerName + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_LOCK_OWNER_NAME) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public String getLockOwnerName() { + return lockOwnerName; + } + + + @JsonProperty(JSON_PROPERTY_LOCK_OWNER_NAME) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setLockOwnerName(String lockOwnerName) { + this.lockOwnerName = lockOwnerName; + } + + public ThreadStack suspended(Boolean suspended) { + + this.suspended = suspended; + return this; + } + + /** + * Whether the thread is suspended. + * @return suspended + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_SUSPENDED) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public Boolean getSuspended() { + return suspended; + } + + + @JsonProperty(JSON_PROPERTY_SUSPENDED) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setSuspended(Boolean suspended) { + this.suspended = suspended; + } + + public ThreadStack inNative(Boolean inNative) { + + this.inNative = inNative; + return this; + } + + /** + * Whether the thread is executing native code. + * @return inNative + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_IN_NATIVE) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public Boolean getInNative() { + return inNative; + } + + + @JsonProperty(JSON_PROPERTY_IN_NATIVE) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setInNative(Boolean inNative) { + this.inNative = inNative; + } + + public ThreadStack isDaemon(Boolean isDaemon) { + + this.isDaemon = isDaemon; + return this; + } + + /** + * Whether the thread is a daemon thread. + * @return isDaemon + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_IS_DAEMON) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public Boolean getIsDaemon() { + return isDaemon; + } + + + @JsonProperty(JSON_PROPERTY_IS_DAEMON) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setIsDaemon(Boolean isDaemon) { + this.isDaemon = isDaemon; + } + + public ThreadStack priority(Integer priority) { + + this.priority = priority; + return this; + } + + /** + * The priority of the thread. + * @return priority + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_PRIORITY) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public Integer getPriority() { + return priority; + } + + + @JsonProperty(JSON_PROPERTY_PRIORITY) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setPriority(Integer priority) { + this.priority = priority; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -275,12 +523,20 @@ public boolean equals(Object o) { Objects.equals(this.stackTrace, threadStack.stackTrace) && Objects.equals(this.blockedByThreadId, threadStack.blockedByThreadId) && Objects.equals(this.blockedByLock, threadStack.blockedByLock) && - Objects.equals(this.holdingLocks, threadStack.holdingLocks); + Objects.equals(this.holdingLocks, threadStack.holdingLocks) && + Objects.equals(this.synchronizers, threadStack.synchronizers) && + Objects.equals(this.monitors, threadStack.monitors) && + Objects.equals(this.lockName, threadStack.lockName) && + Objects.equals(this.lockOwnerName, threadStack.lockOwnerName) && + Objects.equals(this.suspended, threadStack.suspended) && + Objects.equals(this.inNative, threadStack.inNative) && + Objects.equals(this.isDaemon, threadStack.isDaemon) && + Objects.equals(this.priority, threadStack.priority); } @Override public int hashCode() { - return Objects.hash(threadId, threadName, threadState, stackTrace, blockedByThreadId, blockedByLock, holdingLocks); + return Objects.hash(threadId, threadName, threadState, stackTrace, blockedByThreadId, blockedByLock, holdingLocks, synchronizers, monitors, lockName, lockOwnerName, suspended, inNative, isDaemon, priority); } @Override @@ -294,6 +550,14 @@ public String toString() { sb.append(" blockedByThreadId: ").append(toIndentedString(blockedByThreadId)).append("\n"); sb.append(" blockedByLock: ").append(toIndentedString(blockedByLock)).append("\n"); sb.append(" holdingLocks: ").append(toIndentedString(holdingLocks)).append("\n"); + sb.append(" synchronizers: ").append(toIndentedString(synchronizers)).append("\n"); + sb.append(" monitors: ").append(toIndentedString(monitors)).append("\n"); + sb.append(" lockName: ").append(toIndentedString(lockName)).append("\n"); + sb.append(" lockOwnerName: ").append(toIndentedString(lockOwnerName)).append("\n"); + sb.append(" suspended: ").append(toIndentedString(suspended)).append("\n"); + sb.append(" inNative: ").append(toIndentedString(inNative)).append("\n"); + sb.append(" isDaemon: ").append(toIndentedString(isDaemon)).append("\n"); + sb.append(" priority: ").append(toIndentedString(priority)).append("\n"); sb.append("}"); return sb.toString(); } diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml index a4b2802ce69..108e185a3af 100644 --- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml +++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml @@ -561,6 +561,28 @@ components: description: The locks that the current thread is holding. items: type: string + synchronizers: + type: array + description: The ownable synchronizers locked by the thread. + items: + type: string + monitors: + type: array + description: The object monitors locked by the thread. + items: + type: string + lockName: + type: string + description: The string representation of the object on which the thread is blocked if any. + lockOwnerName: + type: string + description: The name of the thread that owns the object this thread is blocked on. + suspended: + type: boolean + description: Whether the thread is suspended. + inNative: + type: boolean + description: Whether the thread is executing native code. required: - threadId - threadName diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml index f2c82ab0a3d..52f927db0a0 100644 --- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml +++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml @@ -298,6 +298,28 @@ components: description: The locks that the current thread is holding. items: type: string + synchronizers: + type: array + description: The ownable synchronizers locked by the thread. + items: + type: string + monitors: + type: array + description: The object monitors locked by the thread. + items: + type: string + lockName: + type: string + description: The string representation of the object on which the thread is blocked if any. + lockOwnerName: + type: string + description: The name of the thread that owns the object this thread is blocked on. + suspended: + type: boolean + description: Whether the thread is suspended. + inNative: + type: boolean + description: Whether the thread is executing native code. required: - threadId - threadName diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala index 3b91ebd0f6f..908a2667cd6 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResource.scala @@ -57,6 +57,12 @@ class ApiV1BaseResource extends ApiRequestContext { threadStack.blockedByThreadId.getOrElse(null).asInstanceOf[java.lang.Long]) .blockedByLock(threadStack.blockedByLock) .holdingLocks(threadStack.holdingLocks.asJava) + .synchronizers(threadStack.synchronizers.asJava) + .monitors(threadStack.monitors.asJava) + .lockName(threadStack.lockName.orNull) + .lockOwnerName(threadStack.lockOwnerName.orNull) + .suspended(threadStack.suspended) + .inNative(threadStack.inNative) }.asJava) } diff --git a/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala b/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala index 274c65c1382..876f3dbecfe 100644 --- a/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala +++ b/service/src/test/scala/org/apache/celeborn/server/common/http/api/v1/ApiV1BaseResourceSuite.scala @@ -46,5 +46,6 @@ abstract class ApiV1BaseResourceSuite extends HttpTestHelper { val threadStacks = response.readEntity(classOf[ThreadStackResponse]).getThreadStacks.asScala assert(threadStacks.nonEmpty) assert(threadStacks.exists(_.getBlockedByThreadId == null)) + assert(threadStacks.exists(_.getLockName != null)) } }