Skip to content

Commit 216c5a3

Browse files
author
Marcelo Vanzin
committed
Review comments.
1 parent dce28e9 commit 216c5a3

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
271271
* Loads a legacy log directory. This assumes that the log directory contains a single event
272272
* log file (along with other metadata files), which is the case for directories generated by
273273
* the code in previous releases.
274+
*
275+
* @return 2-tuple of (input stream of the events, version of Spark which wrote the log)
274276
*/
275277
private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
276278
val children = fs.listStatus(dir)
@@ -293,21 +295,27 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
293295
}
294296
}
295297

298+
if (eventLogPath == null || sparkVersion == null) {
299+
throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
300+
}
301+
296302
val codec = try {
297303
codecName.map { c => CompressionCodec.createCodec(conf, c) }
298304
} catch {
299305
case e: Exception =>
300306
throw new IllegalArgumentException(s"Unknown compression codec $codecName.")
301307
}
302308

303-
if (eventLogPath == null || sparkVersion == null) {
304-
throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
305-
}
306-
307309
val in = new BufferedInputStream(fs.open(eventLogPath))
308310
(codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
309311
}
310312

313+
/**
314+
* Return whether the specified event log path contains a old directory-based event log.
315+
* Previously, the event log of an application comprises of multiple files in a directory.
316+
* As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
317+
* See SPARK-2261 for more detail.
318+
*/
311319
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
312320

313321
private def getModificationTime(fsEntry: FileStatus): Long = {

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private[spark] class EventLoggingListener(
8787
*/
8888
def start() {
8989
if (!fileSystem.isDirectory(new Path(logBaseDir))) {
90-
throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.");
90+
throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
9191
}
9292

9393
val workingPath = logPath + IN_PROGRESS

0 commit comments

Comments
 (0)