From 40e3933104b9f520cc31d8244674abcb6a6944f1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 1 Jul 2014 16:26:49 -0700 Subject: [PATCH 1/6] Resolve history server file paths --- .../spark/deploy/history/FsHistoryProvider.scala | 13 +++++++------ .../apache/spark/deploy/history/HistoryPage.scala | 2 +- .../apache/spark/deploy/history/HistoryServer.scala | 5 ++--- .../deploy/history/HistoryServerArguments.scala | 1 - 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a8c9ac072449f..12cae81f1fff4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -76,7 +76,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private def initialize() { // Validate the log directory. - val path = new Path(logDir) + val path = new Path(Utils.resolveURI(logDir)) if (!fs.exists(path)) { throw new IllegalArgumentException( "Logging directory specified does not exist: %s".format(logDir)) @@ -96,14 +96,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis override def getAppUI(appId: String): SparkUI = { try { val appLogDir = fs.getFileStatus(new Path(logDir, appId)) - loadAppInfo(appLogDir, true)._2 + val (_, ui) = loadAppInfo(appLogDir, renderUI = true) + ui } catch { case e: FileNotFoundException => null } } - override def getConfig(): Map[String, String] = - Map(("Event Log Location" -> logDir)) + override def getConfig(): Map[String, String] = Map("Event Log Location" -> logDir) /** * Builds the application list based on the current contents of the log directory. @@ -121,7 +121,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } val currentApps = Map[String, ApplicationHistoryInfo]( - appList.map(app => (app.id -> app)):_*) + appList.map(app => app.id -> app):_*) // For any application that either (i) is not listed or (ii) has changed since the last time // the listing was created (defined by the log dir's modification time), load the app's info. @@ -131,7 +131,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val curr = currentApps.getOrElse(dir.getPath().getName(), null) if (curr == null || curr.lastUpdated < getModificationTime(dir)) { try { - newApps += loadAppInfo(dir, false)._1 + val (app, _) = loadAppInfo(dir, renderUI = false) + newApps += app } catch { case e: Exception => logError(s"Failed to load app info from directory $dir.") } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index a958c837c2ff6..32f9a0e5460eb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -45,7 +45,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
    - { providerConfig.map(e =>
  • {e._1}: {e._2}
  • ) } + {providerConfig.map { case (k, v) =>
  • {k}: {v}
  • }}
{ if (allApps.size > 0) { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 29a78a56c8ed5..ce44d455a6e6e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -25,9 +25,8 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.ui.{WebUI, SparkUI, UIUtils} +import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.Utils /** * A web server that renders SparkUIs of completed applications. @@ -174,7 +173,7 @@ object HistoryServer { def main(argStrings: Array[String]) { initSecurity() - val args = new HistoryServerArguments(conf, argStrings) + new HistoryServerArguments(conf, argStrings) val securityManager = new SecurityManager(conf) val providerName = conf.getOption("spark.history.provider") diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index be9361b754fc3..d3cffa9e00a40 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.history import org.apache.spark.SparkConf -import org.apache.spark.util.Utils /** * Command-line parser for the master. From c7e36eefc49a36885c7b9295640d44f1341e8b64 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 1 Jul 2014 17:05:21 -0700 Subject: [PATCH 2/6] Resolve paths for event logging too --- .../scala/org/apache/spark/util/FileLogger.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 6a95dc06e155d..41936019246c1 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -51,8 +51,8 @@ private[spark] class FileLogger( private val dateFormat = new ThreadLocal[SimpleDateFormat]() { override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") } - - private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) + private val resolvedLogDir = Utils.resolveURI(logDir) + private val fileSystem = Utils.getHadoopFileSystem(resolvedLogDir) var fileIndex = 0 // Only used if compression is enabled @@ -82,18 +82,18 @@ private[spark] class FileLogger( * Create a logging directory with the given path. */ private def createLogDir() { - val path = new Path(logDir) + val path = new Path(resolvedLogDir) if (fileSystem.exists(path)) { if (overwrite) { - logWarning("Log directory %s already exists. Overwriting...".format(logDir)) + logWarning("Log directory %s already exists. Overwriting...".format(resolvedLogDir)) // Second parameter is whether to delete recursively fileSystem.delete(path, true) } else { - throw new IOException("Log directory %s already exists!".format(logDir)) + throw new IOException("Log directory %s already exists!".format(resolvedLogDir)) } } if (!fileSystem.mkdirs(path)) { - throw new IOException("Error in creating log directory: %s".format(logDir)) + throw new IOException("Error in creating log directory: %s".format(resolvedLogDir)) } if (dirPermissions.isDefined) { val fsStatus = fileSystem.getFileStatus(path) @@ -109,7 +109,7 @@ private[spark] class FileLogger( * (dirPermissions) used when class was instantiated. */ private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = { - val logPath = logDir + "/" + fileName + val logPath = resolvedLogDir + "/" + fileName val uri = new URI(logPath) val path = new Path(logPath) val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme From b037c0c1cde89a65d666f8f1448f34bd7d678f5d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 1 Jul 2014 17:35:11 -0700 Subject: [PATCH 3/6] Use resolved paths for everything in history server --- .../deploy/history/FsHistoryProvider.scala | 27 ++++++++++--------- .../history/HistoryServerArguments.scala | 4 +-- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 12cae81f1fff4..8398857e7c9c6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -36,11 +36,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis conf.getInt("spark.history.updateInterval", 10)) * 1000 private val logDir = conf.get("spark.history.fs.logDirectory", null) - if (logDir == null) { - throw new IllegalArgumentException("Logging directory must be specified.") - } + private val resolvedLogDir = Option(logDir) + .map { d => Utils.resolveURI(d) } + .getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") } - private val fs = Utils.getHadoopFileSystem(logDir) + private val fs = Utils.getHadoopFileSystem(resolvedLogDir) // A timestamp of when the disk was last accessed to check for log updates private var lastLogCheckTimeMs = -1L @@ -76,14 +76,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private def initialize() { // Validate the log directory. - val path = new Path(Utils.resolveURI(logDir)) + val path = new Path(resolvedLogDir) if (!fs.exists(path)) { throw new IllegalArgumentException( - "Logging directory specified does not exist: %s".format(logDir)) + "Logging directory specified does not exist: %s".format(resolvedLogDir)) } if (!fs.getFileStatus(path).isDir) { throw new IllegalArgumentException( - "Logging directory specified is not a directory: %s".format(logDir)) + "Logging directory specified is not a directory: %s".format(resolvedLogDir)) } checkForLogs() @@ -95,7 +95,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis override def getAppUI(appId: String): SparkUI = { try { - val appLogDir = fs.getFileStatus(new Path(logDir, appId)) + val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId)) val (_, ui) = loadAppInfo(appLogDir, renderUI = true) ui } catch { @@ -103,7 +103,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } - override def getConfig(): Map[String, String] = Map("Event Log Location" -> logDir) + override def getConfig(): Map[String, String] = + Map("Event Log Location" -> resolvedLogDir.toString) /** * Builds the application list based on the current contents of the log directory. @@ -114,10 +115,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis lastLogCheckTimeMs = getMonotonicTimeMs() logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) try { - val logStatus = fs.listStatus(new Path(logDir)) + val logStatus = fs.listStatus(new Path(resolvedLogDir)) val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs.filter { - dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) + val logInfos = logDirs.filter { dir => + fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE)) } val currentApps = Map[String, ApplicationHistoryInfo]( @@ -160,9 +161,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false. */ private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = { - val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs) val path = logDir.getPath val appId = path.getName + val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs) val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec) val appListener = new ApplicationEventListener replayBus.addListener(appListener) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index d3cffa9e00a40..25fc76c23e0fb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -31,6 +31,7 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] args match { case ("--dir" | "-d") :: value :: tail => logDir = value + conf.set("spark.history.fs.logDirectory", value) parse(tail) case ("--help" | "-h") :: tail => @@ -41,9 +42,6 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] case _ => printUsageAndExit(1) } - if (logDir != null) { - conf.set("spark.history.fs.logDirectory", logDir) - } } private def printUsageAndExit(exitCode: Int) { From 0e20f7121b9e8ecb334d69c36ef3cdcbf326699d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Jul 2014 14:51:11 -0700 Subject: [PATCH 4/6] Shift responsibility of resolving paths up one level --- .../spark/scheduler/EventLoggingListener.scala | 6 +++--- .../scala/org/apache/spark/util/FileLogger.scala | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a90b0d475c04e..d4bb2992be541 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -29,7 +29,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec -import org.apache.spark.util.{FileLogger, JsonProtocol} +import org.apache.spark.util.{FileLogger, JsonProtocol, Utils} /** * A SparkListener that logs events to persistent storage. @@ -55,7 +55,7 @@ private[spark] class EventLoggingListener( private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis - val logDir = logBaseDir + "/" + name + val logDir = Utils.resolveURI(logBaseDir) + "/" + name protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) @@ -208,7 +208,7 @@ private[spark] object EventLoggingListener extends Logging { } catch { case e: Exception => logError("Exception in parsing logging info from directory %s".format(logDir), e) - EventLoggingInfo.empty + EventLoggingInfo.empty } } diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 41936019246c1..aacb0220c8449 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -51,8 +51,8 @@ private[spark] class FileLogger( private val dateFormat = new ThreadLocal[SimpleDateFormat]() { override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") } - private val resolvedLogDir = Utils.resolveURI(logDir) - private val fileSystem = Utils.getHadoopFileSystem(resolvedLogDir) + + private val fileSystem = Utils.getHadoopFileSystem(logDir) var fileIndex = 0 // Only used if compression is enabled @@ -82,18 +82,18 @@ private[spark] class FileLogger( * Create a logging directory with the given path. */ private def createLogDir() { - val path = new Path(resolvedLogDir) + val path = new Path(logDir) if (fileSystem.exists(path)) { if (overwrite) { - logWarning("Log directory %s already exists. Overwriting...".format(resolvedLogDir)) + logWarning("Log directory %s already exists. Overwriting...".format(logDir)) // Second parameter is whether to delete recursively fileSystem.delete(path, true) } else { - throw new IOException("Log directory %s already exists!".format(resolvedLogDir)) + throw new IOException("Log directory %s already exists!".format(logDir)) } } if (!fileSystem.mkdirs(path)) { - throw new IOException("Error in creating log directory: %s".format(resolvedLogDir)) + throw new IOException("Error in creating log directory: %s".format(logDir)) } if (dirPermissions.isDefined) { val fsStatus = fileSystem.getFileStatus(path) @@ -109,7 +109,7 @@ private[spark] class FileLogger( * (dirPermissions) used when class was instantiated. */ private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = { - val logPath = resolvedLogDir + "/" + fileName + val logPath = logDir + "/" + fileName val uri = new URI(logPath) val path = new Path(logPath) val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme From 622a471c42b116cdecdf3ed7f8abfc28ce93a70c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Jul 2014 16:43:00 -0700 Subject: [PATCH 5/6] Fix test in EventLoggingListenerSuite --- .../org/apache/spark/scheduler/EventLoggingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 21e3db34b8b7a..10d8b299317ea 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -259,7 +259,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get val expectedLogDir = logDirPath.toString - assert(eventLogger.logDir.startsWith(expectedLogDir)) + assert(eventLogger.logDir.contains(expectedLogDir)) // Begin listening for events that trigger asserts val eventExistenceListener = new EventExistenceListener(eventLogger) From b393e17204e86dddfa869be896d63fe996aee544 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Jul 2014 16:46:48 -0700 Subject: [PATCH 6/6] Strip trailing "/" from logging directory Before it was printing: Logging events to file:/tmp/spark-events//pysparkshell-... --- .../scala/org/apache/spark/scheduler/EventLoggingListener.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index d4bb2992be541..014767ecc63bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -55,7 +55,7 @@ private[spark] class EventLoggingListener( private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis - val logDir = Utils.resolveURI(logBaseDir) + "/" + name + val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))