Skip to content

Commit 4564519

Browse files
Marcelo VanzinAndrew Or
authored andcommitted
[SPARK-2261] Make event logger use a single file.
Currently the event logger uses a directory and several files to describe an app's event log, all but one of which are empty. This is not very HDFS-friendly, since creating lots of nodes in HDFS (especially when they don't contain any data) is frowned upon due to the node metadata being kept in the NameNode's memory. Instead, add a header section to the event log file that contains metadata needed to read the events. This metadata includes things like the Spark version (for future code that may need it for backwards compatibility) and the compression codec used for the event data. With the new approach, aside from reducing the load on the NN, there's also a lot less remote calls needed when reading the log directory. Author: Marcelo Vanzin <[email protected]> Closes #1222 from vanzin/hist-server-single-log and squashes the following commits: cc8f5de [Marcelo Vanzin] Store header in plain text. c7e6123 [Marcelo Vanzin] Update comment. 59c561c [Marcelo Vanzin] Review feedback. 216c5a3 [Marcelo Vanzin] Review comments. dce28e9 [Marcelo Vanzin] Fix log overwrite test. f91c13e [Marcelo Vanzin] Handle "spark.eventLog.overwrite", and add unit test. 346f0b4 [Marcelo Vanzin] Review feedback. ed0023e [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log 3f4500f [Marcelo Vanzin] Unit test for SPARK-3697. 45c7a1f [Marcelo Vanzin] Version of SPARK-3697 for this branch. b3ee30b [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log a6d5c50 [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log 16fd491 [Marcelo Vanzin] Use unique log directory for each codec. 0ef3f70 [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log d93c44a [Marcelo Vanzin] Add a newline to make the header more readable. 9e928ba [Marcelo Vanzin] Add types. bd6ba8c [Marcelo Vanzin] Review feedback. a624a89 [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log 04364dc [Marcelo Vanzin] Merge branch 'master' into hist-server-single-log bb7c2d3 [Marcelo Vanzin] Fix scalastyle warning. 16661a3 [Marcelo Vanzin] Simplify some internal code. cc6bce4 [Marcelo Vanzin] Some review feedback. a722184 [Marcelo Vanzin] Do not encode metadata in log file name. 3700586 [Marcelo Vanzin] Restore log flushing. f677930 [Marcelo Vanzin] Fix botched rebase. ae571fa [Marcelo Vanzin] Fix end-to-end event logger test. 9db0efd [Marcelo Vanzin] Show prettier name in UI. 8f42274 [Marcelo Vanzin] Make history server parse old-style log directories. 6251dd7 [Marcelo Vanzin] Make event logger use a single file.
1 parent c28083f commit 4564519

File tree

10 files changed

+675
-929
lines changed

10 files changed

+675
-929
lines changed

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ private[spark] class ApplicationDescription(
2323
val memoryPerSlave: Int,
2424
val command: Command,
2525
var appUiUrl: String,
26-
val eventLogDir: Option[String] = None)
26+
val eventLogFile: Option[String] = None)
2727
extends Serializable {
2828

2929
val user = System.getProperty("user.name", "<unknown>")

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 137 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.io.FileNotFoundException
20+
import java.io.{BufferedInputStream, FileNotFoundException, InputStream}
2121

2222
import scala.collection.mutable
2323

2424
import org.apache.hadoop.fs.{FileStatus, Path}
25+
import org.apache.hadoop.fs.permission.AccessControlException
2526

2627
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2728
import org.apache.spark.deploy.SparkHadoopUtil
29+
import org.apache.spark.io.CompressionCodec
2830
import org.apache.spark.scheduler._
2931
import org.apache.spark.ui.SparkUI
3032
import org.apache.spark.util.Utils
@@ -64,6 +66,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
6466
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
6567
= new mutable.LinkedHashMap()
6668

69+
// Constants used to parse Spark 1.0.0 log directories.
70+
private[history] val LOG_PREFIX = "EVENT_LOG_"
71+
private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
72+
private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
73+
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
74+
6775
/**
6876
* A background thread that periodically checks for event log updates on disk.
6977
*
@@ -90,7 +98,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
9098

9199
initialize()
92100

93-
private def initialize() {
101+
private def initialize(): Unit = {
94102
// Validate the log directory.
95103
val path = new Path(logDir)
96104
if (!fs.exists(path)) {
@@ -106,17 +114,20 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
106114
}
107115

108116
checkForLogs()
109-
logCheckingThread.setDaemon(true)
110-
logCheckingThread.start()
117+
118+
// Disable the background thread during tests.
119+
if (!conf.contains("spark.testing")) {
120+
logCheckingThread.setDaemon(true)
121+
logCheckingThread.start()
122+
}
111123
}
112124

113125
override def getListing() = applications.values
114126

115127
override def getAppUI(appId: String): Option[SparkUI] = {
116128
try {
117129
applications.get(appId).map { info =>
118-
val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
119-
new Path(logDir, info.logDir)))
130+
val replayBus = new ReplayListenerBus()
120131
val ui = {
121132
val conf = this.conf.clone()
122133
val appSecManager = new SecurityManager(conf)
@@ -125,15 +136,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
125136
// Do not call ui.bind() to avoid creating a new server for each application
126137
}
127138

128-
replayBus.replay()
139+
val appListener = new ApplicationEventListener()
140+
replayBus.addListener(appListener)
141+
val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)
129142

130-
ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
143+
ui.setAppName(s"${appInfo.name} ($appId)")
131144

132145
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
133146
ui.getSecurityManager.setAcls(uiAclsEnabled)
134147
// make sure to set admin acls before view acls so they are properly picked up
135148
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
136-
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
149+
ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
137150
appListener.viewAcls.getOrElse(""))
138151
ui
139152
}
@@ -149,41 +162,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
149162
* Tries to reuse as much of the data already in memory as possible, by not reading
150163
* applications that haven't been updated since last time the logs were checked.
151164
*/
152-
private def checkForLogs() = {
165+
private[history] def checkForLogs(): Unit = {
153166
lastLogCheckTimeMs = getMonotonicTimeMs()
154167
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
155-
try {
156-
val logStatus = fs.listStatus(new Path(logDir))
157-
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
158168

159-
// Load all new logs from the log directory. Only directories that have a modification time
160-
// later than the last known log directory will be loaded.
169+
try {
161170
var newLastModifiedTime = lastModifiedTime
162-
val logInfos = logDirs
163-
.filter { dir =>
164-
if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
165-
val modTime = getModificationTime(dir)
166-
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
167-
modTime > lastModifiedTime
168-
} else {
169-
false
171+
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
172+
.getOrElse(Seq[FileStatus]())
173+
val logInfos = statusList
174+
.filter { entry =>
175+
try {
176+
val isFinishedApplication =
177+
if (isLegacyLogDirectory(entry)) {
178+
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
179+
} else {
180+
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
181+
}
182+
183+
if (isFinishedApplication) {
184+
val modTime = getModificationTime(entry)
185+
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
186+
modTime >= lastModifiedTime
187+
} else {
188+
false
189+
}
190+
} catch {
191+
case e: AccessControlException =>
192+
// Do not use "logInfo" since these messages can get pretty noisy if printed on
193+
// every poll.
194+
logDebug(s"No permission to read $entry, ignoring.")
195+
false
170196
}
171197
}
172-
.flatMap { dir =>
198+
.flatMap { entry =>
173199
try {
174-
val (replayBus, appListener) = createReplayBus(dir)
175-
replayBus.replay()
176-
Some(new FsApplicationHistoryInfo(
177-
dir.getPath().getName(),
178-
appListener.appId.getOrElse(dir.getPath().getName()),
179-
appListener.appName.getOrElse(NOT_STARTED),
180-
appListener.startTime.getOrElse(-1L),
181-
appListener.endTime.getOrElse(-1L),
182-
getModificationTime(dir),
183-
appListener.sparkUser.getOrElse(NOT_STARTED)))
200+
Some(replay(entry, new ReplayListenerBus()))
184201
} catch {
185202
case e: Exception =>
186-
logInfo(s"Failed to load application log data from $dir.", e)
203+
logError(s"Failed to load application log data from $entry.", e)
187204
None
188205
}
189206
}
@@ -217,37 +234,100 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
217234
applications = newApps
218235
}
219236
} catch {
220-
case t: Throwable => logError("Exception in checking for event log updates", t)
237+
case e: Exception => logError("Exception in checking for event log updates", e)
221238
}
222239
}
223240

224-
private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
225-
val path = logDir.getPath()
226-
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
227-
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
228-
val appListener = new ApplicationEventListener
229-
replayBus.addListener(appListener)
230-
(replayBus, appListener)
241+
/**
242+
* Replays the events in the specified log file and returns information about the associated
243+
* application.
244+
*/
245+
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
246+
val logPath = eventLog.getPath()
247+
val (logInput, sparkVersion) =
248+
if (isLegacyLogDirectory(eventLog)) {
249+
openLegacyEventLog(logPath)
250+
} else {
251+
EventLoggingListener.openEventLog(logPath, fs)
252+
}
253+
try {
254+
val appListener = new ApplicationEventListener
255+
bus.addListener(appListener)
256+
bus.replay(logInput, sparkVersion)
257+
new FsApplicationHistoryInfo(
258+
logPath.getName(),
259+
appListener.appId.getOrElse(logPath.getName()),
260+
appListener.appName.getOrElse(NOT_STARTED),
261+
appListener.startTime.getOrElse(-1L),
262+
appListener.endTime.getOrElse(-1L),
263+
getModificationTime(eventLog),
264+
appListener.sparkUser.getOrElse(NOT_STARTED))
265+
} finally {
266+
logInput.close()
267+
}
231268
}
232269

233-
/** Return when this directory was last modified. */
234-
private def getModificationTime(dir: FileStatus): Long = {
235-
try {
236-
val logFiles = fs.listStatus(dir.getPath)
237-
if (logFiles != null && !logFiles.isEmpty) {
238-
logFiles.map(_.getModificationTime).max
239-
} else {
240-
dir.getModificationTime
270+
/**
271+
* Loads a legacy log directory. This assumes that the log directory contains a single event
272+
* log file (along with other metadata files), which is the case for directories generated by
273+
* the code in previous releases.
274+
*
275+
* @return 2-tuple of (input stream of the events, version of Spark which wrote the log)
276+
*/
277+
private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
278+
val children = fs.listStatus(dir)
279+
var eventLogPath: Path = null
280+
var codecName: Option[String] = None
281+
var sparkVersion: String = null
282+
283+
children.foreach { child =>
284+
child.getPath().getName() match {
285+
case name if name.startsWith(LOG_PREFIX) =>
286+
eventLogPath = child.getPath()
287+
288+
case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
289+
codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
290+
291+
case version if version.startsWith(SPARK_VERSION_PREFIX) =>
292+
sparkVersion = version.substring(SPARK_VERSION_PREFIX.length())
293+
294+
case _ =>
241295
}
242-
} catch {
243-
case t: Throwable =>
244-
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
245-
-1L
296+
}
297+
298+
if (eventLogPath == null || sparkVersion == null) {
299+
throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
300+
}
301+
302+
val codec = try {
303+
codecName.map { c => CompressionCodec.createCodec(conf, c) }
304+
} catch {
305+
case e: Exception =>
306+
throw new IllegalArgumentException(s"Unknown compression codec $codecName.")
307+
}
308+
309+
val in = new BufferedInputStream(fs.open(eventLogPath))
310+
(codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
311+
}
312+
313+
/**
314+
* Return whether the specified event log path contains a old directory-based event log.
315+
* Previously, the event log of an application comprises of multiple files in a directory.
316+
* As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
317+
* See SPARK-2261 for more detail.
318+
*/
319+
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
320+
321+
private def getModificationTime(fsEntry: FileStatus): Long = {
322+
if (fsEntry.isDir) {
323+
fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
324+
} else {
325+
fsEntry.getModificationTime()
246326
}
247327
}
248328

249329
/** Returns the system's mononotically increasing time. */
250-
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
330+
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
251331

252332
}
253333

@@ -256,7 +336,7 @@ private object FsHistoryProvider {
256336
}
257337

258338
private class FsApplicationHistoryInfo(
259-
val logDir: String,
339+
val logPath: String,
260340
id: String,
261341
name: String,
262342
startTime: Long,

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.deploy.master
1919

20+
import java.io.FileNotFoundException
2021
import java.net.URLEncoder
2122
import java.text.SimpleDateFormat
2223
import java.util.Date
@@ -32,6 +33,7 @@ import akka.pattern.ask
3233
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
3334
import akka.serialization.Serialization
3435
import akka.serialization.SerializationExtension
36+
import org.apache.hadoop.fs.Path
3537

3638
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
3739
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
@@ -56,6 +58,7 @@ private[spark] class Master(
5658
import context.dispatcher // to use Akka's scheduler.schedule()
5759

5860
val conf = new SparkConf
61+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
5962

6063
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
6164
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
@@ -514,7 +517,7 @@ private[spark] class Master(
514517
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
515518
val numWorkersAlive = shuffledAliveWorkers.size
516519
var curPos = 0
517-
520+
518521
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
519522
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
520523
// start from the last worker that was assigned a driver, and continue onwards until we have
@@ -711,41 +714,38 @@ private[spark] class Master(
711714
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
712715
val appName = app.desc.name
713716
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
714-
val eventLogDir = app.desc.eventLogDir.getOrElse {
717+
val eventLogFile = app.desc.eventLogFile.getOrElse {
715718
// Event logging is not enabled for this application
716719
app.desc.appUiUrl = notFoundBasePath
717720
return false
718721
}
719722

720-
val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id)
721-
val fileSystem = Utils.getHadoopFileSystem(appEventLogDir,
722-
SparkHadoopUtil.get.newConfiguration(conf))
723-
val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem)
724-
val eventLogPaths = eventLogInfo.logPaths
725-
val compressionCodec = eventLogInfo.compressionCodec
726-
727-
if (eventLogPaths.isEmpty) {
728-
// Event logging is enabled for this application, but no event logs are found
729-
val title = s"Application history not found (${app.id})"
730-
var msg = s"No event logs found for application $appName in $appEventLogDir."
731-
logWarning(msg)
732-
msg += " Did you specify the correct logging directory?"
733-
msg = URLEncoder.encode(msg, "UTF-8")
734-
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
735-
return false
736-
}
737-
738723
try {
739-
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
724+
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
725+
val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
726+
val replayBus = new ReplayListenerBus()
740727
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
741728
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
742-
replayBus.replay()
729+
try {
730+
replayBus.replay(logInput, sparkVersion)
731+
} finally {
732+
logInput.close()
733+
}
743734
appIdToUI(app.id) = ui
744735
webUi.attachSparkUI(ui)
745736
// Application UI is successfully rebuilt, so link the Master UI to it
746-
app.desc.appUiUrl = ui.getBasePath
737+
app.desc.appUiUrl = ui.basePath
747738
true
748739
} catch {
740+
case fnf: FileNotFoundException =>
741+
// Event logging is enabled for this application, but no event logs are found
742+
val title = s"Application history not found (${app.id})"
743+
var msg = s"No event logs found for application $appName in $eventLogFile."
744+
logWarning(msg)
745+
msg += " Did you specify the correct logging directory?"
746+
msg = URLEncoder.encode(msg, "UTF-8")
747+
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
748+
false
749749
case e: Exception =>
750750
// Relay exception message to application UI page
751751
val title = s"Application history load error (${app.id})"

0 commit comments

Comments
 (0)