Skip to content

Commit 24e84b6

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into binary
2 parents 5ceaa8a + b671ce0 commit 24e84b6

File tree

34 files changed

+1331
-289
lines changed

34 files changed

+1331
-289
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ import scala.language.implicitConversions
2121

2222
import java.io._
2323
import java.net.URI
24-
import java.util.Arrays
24+
import java.util.{Arrays, Properties, UUID}
2525
import java.util.concurrent.atomic.AtomicInteger
26-
import java.util.{Properties, UUID}
2726
import java.util.UUID.randomUUID
2827
import scala.collection.{Map, Set}
2928
import scala.collection.generic.Growable
@@ -41,6 +40,7 @@ import akka.actor.Props
4140
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4241
import org.apache.spark.broadcast.Broadcast
4342
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
43+
import org.apache.spark.executor.TriggerThreadDump
4444
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
4545
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4646
import org.apache.spark.rdd._
@@ -51,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend
5151
import org.apache.spark.storage._
5252
import org.apache.spark.ui.SparkUI
5353
import org.apache.spark.ui.jobs.JobProgressListener
54-
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
54+
import org.apache.spark.util._
5555

5656
/**
5757
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -361,6 +361,29 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
361361
override protected def childValue(parent: Properties): Properties = new Properties(parent)
362362
}
363363

364+
/**
365+
* Called by the web UI to obtain executor thread dumps. This method may be expensive.
366+
* Logs an error and returns None if we failed to obtain a thread dump, which could occur due
367+
* to an executor being dead or unresponsive or due to network issues while sending the thread
368+
* dump message back to the driver.
369+
*/
370+
private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = {
371+
try {
372+
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
373+
Some(Utils.getThreadDump())
374+
} else {
375+
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
376+
val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
377+
Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef,
378+
AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)))
379+
}
380+
} catch {
381+
case e: Exception =>
382+
logError(s"Exception getting thread dump from executor $executorId", e)
383+
None
384+
}
385+
}
386+
364387
private[spark] def getLocalProperties: Properties = localProperties.get()
365388

366389
private[spark] def setLocalProperties(props: Properties) {

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
131131
// Create a new ActorSystem using driver's Spark properties to run the backend.
132132
val driverConf = new SparkConf().setAll(props)
133133
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
134-
"sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf))
134+
SparkEnv.executorActorSystemName,
135+
hostname, port, driverConf, new SecurityManager(driverConf))
135136
// set it
136137
val sparkHostPort = hostname + ":" + boundPort
137138
actorSystem.actorOf(

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection.JavaConversions._
2626
import scala.collection.mutable.{ArrayBuffer, HashMap}
2727
import scala.util.control.NonFatal
2828

29-
import akka.actor.ActorSystem
29+
import akka.actor.{Props, ActorSystem}
3030

3131
import org.apache.spark._
3232
import org.apache.spark.deploy.SparkHadoopUtil
@@ -92,6 +92,10 @@ private[spark] class Executor(
9292
}
9393
}
9494

95+
// Create an actor for receiving RPCs from the driver
96+
private val executorActor = env.actorSystem.actorOf(
97+
Props(new ExecutorActor(executorId)), "ExecutorActor")
98+
9599
// Create our ClassLoader
96100
// do this after SparkEnv creation so can access the SecurityManager
97101
private val urlClassLoader = createClassLoader()
@@ -131,6 +135,7 @@ private[spark] class Executor(
131135

132136
def stop() {
133137
env.metricsSystem.report()
138+
env.actorSystem.stop(executorActor)
134139
isStopped = true
135140
threadPool.shutdown()
136141
if (!isLocal) {
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import akka.actor.Actor
21+
import org.apache.spark.Logging
22+
23+
import org.apache.spark.util.{Utils, ActorLogReceive}
24+
25+
/**
26+
* Driver -> Executor message to trigger a thread dump.
27+
*/
28+
private[spark] case object TriggerThreadDump
29+
30+
/**
31+
* Actor that runs inside of executors to enable driver -> executor RPC.
32+
*/
33+
private[spark]
34+
class ExecutorActor(executorId: String) extends Actor with ActorLogReceive with Logging {
35+
36+
override def receiveWithLogging = {
37+
case TriggerThreadDump =>
38+
sender ! Utils.getThreadDump()
39+
}
40+
41+
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ class BlockManagerMaster(
8888
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
8989
}
9090

91+
def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
92+
askDriverWithReply[Option[(String, Int)]](GetActorSystemHostPortForExecutor(executorId))
93+
}
94+
9195
/**
9296
* Remove a block from the slaves that have it. This can only be used to remove
9397
* blocks that the driver knows about.

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
8686
case GetPeers(blockManagerId) =>
8787
sender ! getPeers(blockManagerId)
8888

89+
case GetActorSystemHostPortForExecutor(executorId) =>
90+
sender ! getActorSystemHostPortForExecutor(executorId)
91+
8992
case GetMemoryStatus =>
9093
sender ! memoryStatus
9194

@@ -412,6 +415,21 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
412415
Seq.empty
413416
}
414417
}
418+
419+
/**
420+
* Returns the hostname and port of an executor's actor system, based on the Akka address of its
421+
* BlockManagerSlaveActor.
422+
*/
423+
private def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
424+
for (
425+
blockManagerId <- blockManagerIdByExecutor.get(executorId);
426+
info <- blockManagerInfo.get(blockManagerId);
427+
host <- info.slaveActor.path.address.host;
428+
port <- info.slaveActor.path.address.port
429+
) yield {
430+
(host, port)
431+
}
432+
}
415433
}
416434

417435
@DeveloperApi

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ private[spark] object BlockManagerMessages {
9292

9393
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
9494

95+
case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster
96+
9597
case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
9698

9799
case object StopBlockManagerMaster extends ToBlockManagerMaster
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ui.exec
19+
20+
import javax.servlet.http.HttpServletRequest
21+
22+
import scala.util.Try
23+
import scala.xml.{Text, Node}
24+
25+
import org.apache.spark.ui.{UIUtils, WebUIPage}
26+
27+
private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") {
28+
29+
private val sc = parent.sc
30+
31+
def render(request: HttpServletRequest): Seq[Node] = {
32+
val executorId = Option(request.getParameter("executorId")).getOrElse {
33+
return Text(s"Missing executorId parameter")
34+
}
35+
val time = System.currentTimeMillis()
36+
val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)
37+
38+
val content = maybeThreadDump.map { threadDump =>
39+
val dumpRows = threadDump.map { thread =>
40+
<div class="accordion-group">
41+
<div class="accordion-heading" onclick="$(this).next().toggleClass('hidden')">
42+
<a class="accordion-toggle">
43+
Thread {thread.threadId}: {thread.threadName} ({thread.threadState})
44+
</a>
45+
</div>
46+
<div class="accordion-body hidden">
47+
<div class="accordion-inner">
48+
<pre>{thread.stackTrace}</pre>
49+
</div>
50+
</div>
51+
</div>
52+
}
53+
54+
<div class="row-fluid">
55+
<p>Updated at {UIUtils.formatDate(time)}</p>
56+
{
57+
// scalastyle:off
58+
<p><a class="expandbutton"
59+
onClick="$('.accordion-body').removeClass('hidden'); $('.expandbutton').toggleClass('hidden')">
60+
Expand All
61+
</a></p>
62+
<p><a class="expandbutton hidden"
63+
onClick="$('.accordion-body').addClass('hidden'); $('.expandbutton').toggleClass('hidden')">
64+
Collapse All
65+
</a></p>
66+
// scalastyle:on
67+
}
68+
<div class="accordion">{dumpRows}</div>
69+
</div>
70+
}.getOrElse(Text("Error fetching thread dump"))
71+
UIUtils.headerSparkPage(s"Thread dump for executor $executorId", content, parent)
72+
}
73+
}

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ private case class ExecutorSummaryInfo(
4141
totalShuffleWrite: Long,
4242
maxMemory: Long)
4343

44-
private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
44+
private[ui] class ExecutorsPage(
45+
parent: ExecutorsTab,
46+
threadDumpEnabled: Boolean)
47+
extends WebUIPage("") {
4548
private val listener = parent.listener
4649

4750
def render(request: HttpServletRequest): Seq[Node] = {
@@ -75,6 +78,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
7578
Shuffle Write
7679
</span>
7780
</th>
81+
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
7882
</thead>
7983
<tbody>
8084
{execInfoSorted.map(execRow)}
@@ -133,6 +137,15 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
133137
<td sorttable_customkey={info.totalShuffleWrite.toString}>
134138
{Utils.bytesToString(info.totalShuffleWrite)}
135139
</td>
140+
{
141+
if (threadDumpEnabled) {
142+
<td>
143+
<a href={s"threadDump/?executorId=${info.id}"}>Thread Dump</a>
144+
</td>
145+
} else {
146+
Seq.empty
147+
}
148+
}
136149
</tr>
137150
}
138151

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,14 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
2727

2828
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
2929
val listener = parent.executorsListener
30+
val sc = parent.sc
31+
val threadDumpEnabled =
32+
sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)
3033

31-
attachPage(new ExecutorsPage(this))
34+
attachPage(new ExecutorsPage(this, threadDumpEnabled))
35+
if (threadDumpEnabled) {
36+
attachPage(new ExecutorThreadDumpPage(this))
37+
}
3238
}
3339

3440
/**

0 commit comments

Comments
 (0)