Skip to content

Commit 60bc6d5

Browse files
committed
First complete implementation of HistoryServer (only for finished apps)
This involves a change in Spark's event log format. All event logs are now prefixed with EVENT_LOG_. If compression is used, the logger creates a special empty file prefixed with COMPRESSION_CODEC_ that indicates which codec is used. After the application finishes, the logger logs a special empty file named APPLICATION_COMPLETE. The ReplayListenerBus is now responsible for parsing all of the above file formats. In this commit, we establish a one-to-one mapping between ReplayListenerBus and event logging applications. The semantics of the ReplayListenerBus is further clarified (e.g. replay is not allowed before starting, and can only be called once). This commit also adds a control mechanism for the frequency at which HistoryServer accesses the disk to check for log updates. This enforces a minimum interval of N seconds between two checks, where N is arbitrarily chosen to be 5.
1 parent 7584418 commit 60bc6d5

File tree

12 files changed

+237
-151
lines changed

12 files changed

+237
-151
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,12 @@ class SparkContext(
164164
private[spark] val eventLogger: Option[EventLoggingListener] = {
165165
if (conf.getBoolean("spark.eventLog.enabled", false)) {
166166
val logger = new EventLoggingListener(appName, conf)
167+
logger.start()
167168
listenerBus.addListener(logger)
168169
Some(logger)
169170
} else None
170171
}
171172

172-
// Information needed to replay logged events, if any
173-
private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
174-
eventLogger.map { logger => Some(logger.info) }.getOrElse(None)
175-
176173
// At this point, all relevant SparkListeners have been registered, so begin releasing events
177174
listenerBus.start()
178175

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,14 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import org.apache.spark.scheduler.EventLoggingInfo
21-
2220
private[spark] class ApplicationDescription(
2321
val name: String,
2422
val maxCores: Option[Int],
2523
val memoryPerSlave: Int,
2624
val command: Command,
2725
val sparkHome: Option[String],
2826
var appUiUrl: String,
29-
val eventLogInfo: Option[EventLoggingInfo] = None)
27+
val eventLogDir: Option[String] = None)
3028
extends Serializable {
3129

3230
val user = System.getProperty("user.name", "<unknown>")

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 83 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
5353
private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir))
5454
private val securityManager = new SecurityManager(conf)
5555

56+
// A timestamp of when the disk was last accessed to check for log updates
57+
private var lastLogCheck = -1L
58+
5659
private val handlers = Seq[ServletContextHandler](
5760
createStaticHandler(HistoryServer.STATIC_RESOURCE_DIR, "/static"),
5861
createServletHandler("/",
@@ -74,81 +77,106 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
7477
checkForLogs()
7578
}
7679

77-
/** Parse app ID from the given log path. */
78-
def getAppId(logPath: String): String = logPath.split("/").last
79-
80-
/** Return the address of this server. */
81-
def getAddress = "http://" + host + ":" + boundPort
82-
8380
/**
84-
* Check for any updated event logs.
81+
* Check for any updates to event logs in the base directory.
82+
*
83+
* If a new finished application is found, the server renders the associated SparkUI
84+
* from the application's event logs, attaches this UI to itself, and stores metadata
85+
* information for this application.
8586
*
86-
* If a new application is found, render the associated SparkUI and remember it.
87-
* If an existing application is updated, re-render the associated SparkUI.
88-
* If an existing application is removed, remove the associated SparkUI.
87+
* If the logs for an existing finished application are no longer found, remove all
88+
* associated information and detach the SparkUI.
8989
*/
9090
def checkForLogs() {
91-
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
92-
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
93-
94-
// Render any missing or outdated SparkUI
95-
logDirs.foreach { dir =>
96-
val path = dir.getPath.toString
97-
val appId = getAppId(path)
98-
val lastUpdated = {
99-
val logFiles = fileSystem.listStatus(dir.getPath)
100-
if (logFiles != null) logFiles.map(_.getModificationTime).max else dir.getModificationTime
101-
}
102-
if (!appIdToInfo.contains(appId) || appIdToInfo(appId).lastUpdated < lastUpdated) {
103-
maybeRenderUI(appId, path, lastUpdated)
91+
if (logCheckReady) {
92+
lastLogCheck = System.currentTimeMillis
93+
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
94+
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
95+
96+
// Render SparkUI for any new completed applications
97+
logDirs.foreach { dir =>
98+
val path = dir.getPath.toString
99+
val appId = getAppId(path)
100+
val lastUpdated = getModificationTime(dir)
101+
if (!appIdToInfo.contains(appId)) {
102+
maybeRenderUI(appId, path, lastUpdated)
103+
}
104104
}
105-
}
106105

107-
// Remove any outdated SparkUIs
108-
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
109-
appIdToInfo.foreach { case (appId, info) =>
110-
if (!appIds.contains(appId)) {
111-
detachUI(info.ui)
112-
appIdToInfo.remove(appId)
106+
// Remove any outdated SparkUIs
107+
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
108+
appIdToInfo.foreach { case (appId, info) =>
109+
if (!appIds.contains(appId)) {
110+
detachUI(info.ui)
111+
appIdToInfo.remove(appId)
112+
}
113113
}
114114
}
115115
}
116116

117-
/** Attempt to render a new SparkUI from event logs residing in the given log directory. */
117+
/**
118+
* Render a new SparkUI from the event logs if the associated application is finished.
119+
*
120+
* HistoryServer looks for a special file that indicates application completion in the given
121+
* directory. If this file exists, the associated application is regarded to be complete, in
122+
* which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
123+
*/
118124
private def maybeRenderUI(appId: String, logPath: String, lastUpdated: Long) {
119-
val replayBus = new ReplayListenerBus(conf)
120-
val appListener = new ApplicationListener
121-
replayBus.addListener(appListener)
122-
val ui = new SparkUI(conf, replayBus, appId, "/history/%s".format(appId))
123-
124-
// Do not call ui.bind() to avoid creating a new server for each application
125-
ui.start()
126-
val success = replayBus.replay(logPath)
127-
if (success) {
128-
attachUI(ui)
129-
if (!appListener.started) {
130-
logWarning("Application has event logs but has not started: %s".format(appId))
125+
val replayBus = new ReplayListenerBus(logPath)
126+
replayBus.start()
127+
128+
// If the application completion file is found
129+
if (replayBus.isApplicationComplete) {
130+
val ui = new SparkUI(replayBus, appId, "/history/%s".format(appId))
131+
val appListener = new ApplicationListener
132+
replayBus.addListener(appListener)
133+
134+
// Do not call ui.bind() to avoid creating a new server for each application
135+
ui.start()
136+
val success = replayBus.replay()
137+
if (success) {
138+
attachUI(ui)
139+
val appName = if (appListener.applicationStarted) appListener.appName else appId
140+
ui.setAppName("%s (history)".format(appName))
141+
val startTime = appListener.startTime
142+
val endTime = appListener.endTime
143+
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)
144+
appIdToInfo(appId) = info
131145
}
132-
val appName = appListener.appName
133-
val startTime = appListener.startTime
134-
val endTime = appListener.endTime
135-
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)
136-
137-
// If the UI already exists, terminate it and replace it
138-
appIdToInfo.remove(appId).foreach { info => detachUI(info.ui) }
139-
appIdToInfo(appId) = info
140-
141-
// Use mnemonic original app name rather than app ID
142-
val originalAppName = "%s (history)".format(appName)
143-
ui.setAppName(originalAppName)
146+
} else {
147+
logWarning("Skipping incomplete application: %s".format(logPath))
148+
}
149+
replayBus.stop()
150+
}
151+
152+
/** Parse app ID from the given log path. */
153+
def getAppId(logPath: String): String = logPath.split("/").last
154+
155+
/** Return the address of this server. */
156+
def getAddress = "http://" + host + ":" + boundPort
157+
158+
/** Return when this directory is last modified. */
159+
private def getModificationTime(dir: FileStatus): Long = {
160+
val logFiles = fileSystem.listStatus(dir.getPath)
161+
if (logFiles != null) {
162+
logFiles.map(_.getModificationTime).max
163+
} else {
164+
dir.getModificationTime
144165
}
145166
}
146167

168+
/** Return whether the last log check has happened sufficiently long ago. */
169+
private def logCheckReady: Boolean = {
170+
System.currentTimeMillis - lastLogCheck > HistoryServer.UPDATE_INTERVAL_SECONDS * 1000
171+
}
147172
}
148173

149174
object HistoryServer {
150175
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
151176

177+
// Minimum interval between each check for logs, which requires a disk access
178+
val UPDATE_INTERVAL_SECONDS = 5
179+
152180
def main(argStrings: Array[String]) {
153181
val conf = new SparkConf
154182
val args = new HistoryServerArguments(argStrings, conf)

core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,18 @@ private[spark] class IndexPage(parent: HistoryServer) {
3030
private val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
3131

3232
def render(request: HttpServletRequest): Seq[Node] = {
33-
// Check if logs have been updated
3433
parent.checkForLogs()
3534

3635
// Populate app table, with most recently modified app first
3736
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
3837
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
39-
4038
val content =
4139
<div class="row-fluid">
4240
<div class="span12">
4341
<ul class="unstyled">
4442
<li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
45-
<h4>Applications</h4> {appTable}
43+
<br></br>
44+
<h4>Finished Applications</h4> {appTable}
4645
</ul>
4746
</div>
4847
</div>
@@ -67,7 +66,6 @@ private[spark] class IndexPage(parent: HistoryServer) {
6766
val duration = if (difference > 0) DeployWebUI.formatDuration(difference) else "---"
6867
val logDirectory = parent.getAppId(info.logPath)
6968
val lastUpdated = dateFmt.format(new Date(info.lastUpdated))
70-
7169
<tr>
7270
<td><a href={uiAddress}>{appName}</a></td>
7371
<td>{startTime}</td>

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ private[spark] class Master(
149149

150150
override def postStop() {
151151
webUi.stop()
152-
appIdToUI.values.foreach(_.stop())
153152
masterMetricsSystem.stop()
154153
applicationMetricsSystem.stop()
155154
persistenceEngine.close()
@@ -622,10 +621,7 @@ private[spark] class Master(
622621
if (completedApps.size >= RETAINED_APPLICATIONS) {
623622
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
624623
completedApps.take(toRemove).foreach( a => {
625-
appIdToUI.remove(a.id).foreach { ui =>
626-
ui.stop()
627-
webUi.detachUI(ui)
628-
}
624+
appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) }
629625
applicationMetricsSystem.removeSource(a.appSource)
630626
})
631627
completedApps.trimStart(toRemove)
@@ -663,28 +659,14 @@ private[spark] class Master(
663659
*/
664660
def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = {
665661
val appName = app.desc.name
666-
val eventLogInfo = app.desc.eventLogInfo.getOrElse { return None }
667-
val eventLogDir = eventLogInfo.logDir
668-
val eventCompressionCodec = eventLogInfo.compressionCodec
669-
val appConf = new SparkConf
670-
eventCompressionCodec.foreach { codec =>
671-
appConf.set("spark.eventLog.compress", "true")
672-
appConf.set("spark.io.compression.codec", codec)
673-
}
674-
val replayBus = new ReplayListenerBus(appConf)
675-
val ui = new SparkUI(
676-
appConf,
677-
replayBus,
678-
"%s (finished)".format(appName),
679-
"/history/%s".format(app.id))
662+
val eventLogDir = app.desc.eventLogDir.getOrElse { return None }
663+
val replayBus = new ReplayListenerBus(eventLogDir)
664+
val ui = new SparkUI(replayBus, "%s (finished)".format(appName), "/history/%s".format(app.id))
680665

681666
// Do not call ui.bind() to avoid creating a new server for each application
682667
ui.start()
683-
val success = replayBus.replay(eventLogDir)
684-
if (!success) {
685-
ui.stop()
686-
None
687-
} else Some(ui)
668+
val success = replayBus.replay()
669+
if (success) Some(ui) else None
688670
}
689671

690672
/** Generate a new app ID given a app's submission date */

core/src/main/scala/org/apache/spark/scheduler/ApplicationListener.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ private[spark] class ApplicationListener extends SparkListener {
2929
var startTime = -1L
3030
var endTime = -1L
3131

32-
def started = startTime != -1
32+
def applicationStarted = startTime != -1
3333

34-
def finished = endTime != -1
34+
def applicationFinished = endTime != -1
3535

36-
def duration: Long = {
36+
def applicationDuration: Long = {
3737
val difference = endTime - startTime
38-
if (started && finished && difference > 0) difference else -1L
38+
if (applicationStarted && applicationFinished && difference > 0) difference else -1L
3939
}
4040

4141
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import org.apache.spark.util.{JsonProtocol, FileLogger}
3636
private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
3737
extends SparkListener with Logging {
3838

39+
import EventLoggingListener._
40+
3941
private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
4042
private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
4143
private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
@@ -46,16 +48,19 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
4648
private val logger =
4749
new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
4850

49-
// Information needed to replay the events logged by this listener later
50-
val info = {
51-
val compressionCodec = if (shouldCompress) {
52-
Some(conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC))
53-
} else None
54-
EventLoggingInfo(logDir, compressionCodec)
51+
/**
52+
* Begin logging events. If compression is used, log a file that indicates which compression
53+
* library is used.
54+
*/
55+
def start() {
56+
logInfo("Logging events to %s".format(logDir))
57+
if (shouldCompress) {
58+
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
59+
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
60+
}
61+
logger.newFile(LOG_PREFIX + logger.fileIndex)
5562
}
5663

57-
logInfo("Logging events to %s".format(logDir))
58-
5964
/** Log the event as JSON */
6065
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
6166
val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
@@ -95,8 +100,36 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
95100
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
96101
logEvent(event, flushLogger = true)
97102

98-
def stop() = logger.stop()
103+
/**
104+
* Stop logging events. In addition, create an empty special file to indicate application
105+
* completion.
106+
*/
107+
def stop() = {
108+
logger.newFile(APPLICATION_COMPLETE)
109+
logger.stop()
110+
}
99111
}
100112

101-
// If compression is not enabled, compressionCodec is None
102-
private[spark] case class EventLoggingInfo(logDir: String, compressionCodec: Option[String])
113+
private[spark] object EventLoggingListener {
114+
val LOG_PREFIX = "EVENT_LOG_"
115+
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
116+
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
117+
118+
def isEventLogFile(fileName: String): Boolean = {
119+
fileName.contains(LOG_PREFIX)
120+
}
121+
122+
def isCompressionCodecFile(fileName: String): Boolean = {
123+
fileName.contains(COMPRESSION_CODEC_PREFIX)
124+
}
125+
126+
def isApplicationCompleteFile(fileName: String): Boolean = {
127+
fileName == APPLICATION_COMPLETE
128+
}
129+
130+
def parseCompressionCodec(fileName: String): String = {
131+
if (isCompressionCodecFile(fileName)) {
132+
fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "")
133+
} else ""
134+
}
135+
}

0 commit comments

Comments
 (0)