Skip to content

Commit 9ae80bf

Browse files
andrewor14pwendell
authored andcommitted
[SPARK-1276] Add a HistoryServer to render persisted UI
The new feature of event logging, introduced in #42, allows the user to persist the details of his/her Spark application to storage, and later replay these events to reconstruct an after-the-fact SparkUI. Currently, however, a persisted UI can only be rendered through the standalone Master. This greatly limits the use case of this new feature as many people also run Spark on Yarn / Mesos. This PR introduces a new entity called the HistoryServer, which, given a log directory, keeps track of all completed applications independently of a Spark Master. Unlike Master, the HistoryServer needs not be running while the application is still running. It is relatively light-weight in that it only maintains static information of applications and performs no scheduling. To quickly test it out, generate event logs with ```spark.eventLog.enabled=true``` and run ```sbin/start-history-server.sh <log-dir-path>```. Your HistoryServer awaits on port 18080. Comments and feedback are most welcome. --- A few other changes introduced in this PR include refactoring the WebUI interface, which is beginning to have a lot of duplicate code now that we have added more functionality to it. Two new SparkListenerEvents have been introduced (SparkListenerApplicationStart/End) to keep track of application name and start/finish times. This PR also clarifies the semantics of the ReplayListenerBus introduced in #42. A potential TODO in the future (not part of this PR) is to render live applications in addition to just completed applications. This is useful when applications fail, a condition that our current HistoryServer does not handle unless the user manually signals application completion (by creating the APPLICATION_COMPLETION file). Handling live applications becomes significantly more challenging, however, because it is now necessary to render the same SparkUI multiple times. To avoid reading the entire log every time, which is inefficient, we must handle reading the log from where we previously left off, but this becomes fairly complicated because we must deal with the arbitrary behavior of each input stream. Author: Andrew Or <[email protected]> Closes #204 from andrewor14/master and squashes the following commits: 7b7234c [Andrew Or] Finished -> Completed b158d98 [Andrew Or] Address Patrick's comments 69d1b41 [Andrew Or] Do not block on posting SparkListenerApplicationEnd 19d5dd0 [Andrew Or] Merge github.com:apache/spark f7f5bf0 [Andrew Or] Make history server's web UI port a Spark configuration 2dfb494 [Andrew Or] Decouple checking for application completion from replaying d02dbaa [Andrew Or] Expose Spark version and include it in event logs 2282300 [Andrew Or] Add documentation for the HistoryServer 567474a [Andrew Or] Merge github.com:apache/spark 6edf052 [Andrew Or] Merge github.com:apache/spark 19e1fb4 [Andrew Or] Address Thomas' comments 248cb3d [Andrew Or] Limit number of live applications + add configurability a3598de [Andrew Or] Do not close file system with ReplayBus + fix bind address bc46fc8 [Andrew Or] Merge github.com:apache/spark e2f4ff9 [Andrew Or] Merge github.com:apache/spark 050419e [Andrew Or] Merge github.com:apache/spark 81b568b [Andrew Or] Fix strange error messages... 0670743 [Andrew Or] Decouple page rendering from loading files from disk 1b2f391 [Andrew Or] Minor changes a9eae7e [Andrew Or] Merge branch 'master' of github.com:apache/spark d5154da [Andrew Or] Styling and comments 5dbfbb4 [Andrew Or] Merge branch 'master' of github.com:apache/spark 60bc6d5 [Andrew Or] First complete implementation of HistoryServer (only for finished apps) 7584418 [Andrew Or] Report application start/end times to HistoryServer 8aac163 [Andrew Or] Add basic application table c086bd5 [Andrew Or] Add HistoryServer and scripts ++ Refactor WebUI interface (cherry picked from commit 79820fe) Signed-off-by: Patrick Wendell <[email protected]>
1 parent a74fbbb commit 9ae80bf

38 files changed

+1075
-201
lines changed

bin/spark-class

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ DEFAULT_MEM=${SPARK_MEM:-512m}
4747

4848
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
4949

50-
# Add java opts and memory settings for master, worker, executors, and repl.
50+
# Add java opts and memory settings for master, worker, history server, executors, and repl.
5151
case "$1" in
52-
# Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
52+
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
5353
'org.apache.spark.deploy.master.Master')
5454
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
5555
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
@@ -58,6 +58,10 @@ case "$1" in
5858
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
5959
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
6060
;;
61+
'org.apache.spark.deploy.history.HistoryServer')
62+
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
63+
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
64+
;;
6165

6266
# Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
6367
'org.apache.spark.executor.CoarseGrainedExecutorBackend')

bin/spark-class2.cmd

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,17 @@ if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m
4545

4646
set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
4747

48-
rem Add java opts and memory settings for master, worker, executors, and repl.
49-
rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
48+
rem Add java opts and memory settings for master, worker, history server, executors, and repl.
49+
rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
5050
if "%1"=="org.apache.spark.deploy.master.Master" (
5151
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
5252
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
5353
) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
5454
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
5555
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
56+
) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
57+
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
58+
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
5659

5760
rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
5861
) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,15 +219,12 @@ class SparkContext(config: SparkConf) extends Logging {
219219
private[spark] val eventLogger: Option[EventLoggingListener] = {
220220
if (conf.getBoolean("spark.eventLog.enabled", false)) {
221221
val logger = new EventLoggingListener(appName, conf)
222+
logger.start()
222223
listenerBus.addListener(logger)
223224
Some(logger)
224225
} else None
225226
}
226227

227-
// Information needed to replay logged events, if any
228-
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
229-
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)
230-
231228
// At this point, all relevant SparkListeners have been registered, so begin releasing events
232229
listenerBus.start()
233230

@@ -292,6 +289,7 @@ class SparkContext(config: SparkConf) extends Logging {
292289
cleaner.foreach(_.start())
293290

294291
postEnvironmentUpdate()
292+
postApplicationStart()
295293

296294
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
297295
val hadoopConfiguration: Configuration = {
@@ -777,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging {
777775
listenerBus.addListener(listener)
778776
}
779777

778+
/** The version of Spark on which this application is running. */
779+
def version = SparkContext.SPARK_VERSION
780+
780781
/**
781782
* Return a map from the slave to the max memory available for caching and the remaining
782783
* memory available for caching.
@@ -930,6 +931,7 @@ class SparkContext(config: SparkConf) extends Logging {
930931

931932
/** Shut down the SparkContext. */
932933
def stop() {
934+
postApplicationEnd()
933935
ui.stop()
934936
// Do this only if not stopped already - best case effort.
935937
// prevent NPE if stopped more than once.
@@ -1175,6 +1177,20 @@ class SparkContext(config: SparkConf) extends Logging {
11751177
/** Register a new RDD, returning its RDD ID */
11761178
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
11771179

1180+
/** Post the application start event */
1181+
private def postApplicationStart() {
1182+
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
1183+
}
1184+
1185+
/**
1186+
* Post the application end event to all listeners immediately, rather than adding it
1187+
* to the event queue for it to be asynchronously processed eventually. Otherwise, a race
1188+
* condition exists in which the listeners may stop before this event has been propagated.
1189+
*/
1190+
private def postApplicationEnd() {
1191+
listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
1192+
}
1193+
11781194
/** Post the environment update event once the task scheduler is ready */
11791195
private def postEnvironmentUpdate() {
11801196
if (taskScheduler != null) {
@@ -1200,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging {
12001216
*/
12011217
object SparkContext extends Logging {
12021218

1219+
private[spark] val SPARK_VERSION = "1.0.0"
1220+
12031221
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
12041222

12051223
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,14 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import org.apache.spark.scheduler.EventLoggingInfo
21-
2220
private[spark] class ApplicationDescription(
2321
val name: String,
2422
val maxCores: Option[Int],
2523
val memoryPerSlave: Int,
2624
val command: Command,
2725
val sparkHome: Option[String],
2826
var appUiUrl: String,
29-
val eventLogInfo: Option[EventLoggingInfo] = None)
27+
val eventLogDir: Option[String] = None)
3028
extends Serializable {
3129

3230
val user = System.getProperty("user.name", "<unknown>")
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import org.apache.spark.ui.{SparkUI, WebUI}
21+
22+
private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) {
23+
24+
/** Attach a SparkUI to this container. Only valid after bind(). */
25+
def attachUI(ui: SparkUI) {
26+
assert(serverInfo.isDefined,
27+
"%s must be bound to a server before attaching SparkUIs".format(name))
28+
val rootHandler = serverInfo.get.rootHandler
29+
for (handler <- ui.handlers) {
30+
rootHandler.addHandler(handler)
31+
if (!handler.isStarted) {
32+
handler.start()
33+
}
34+
}
35+
}
36+
37+
/** Detach a SparkUI from this container. Only valid after bind(). */
38+
def detachUI(ui: SparkUI) {
39+
assert(serverInfo.isDefined,
40+
"%s must be bound to a server before detaching SparkUIs".format(name))
41+
val rootHandler = serverInfo.get.rootHandler
42+
for (handler <- ui.handlers) {
43+
if (handler.isStarted) {
44+
handler.stop()
45+
}
46+
rootHandler.removeHandler(handler)
47+
}
48+
}
49+
50+
}

0 commit comments

Comments
 (0)