Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
conf.getInt("spark.history.updateInterval", 10)) * 1000

private val logDir = conf.get("spark.history.fs.logDirectory", null)
if (logDir == null) {
throw new IllegalArgumentException("Logging directory must be specified.")
}
private val resolvedLogDir = Option(logDir)
.map { d => Utils.resolveURI(d) }
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }

private val fs = Utils.getHadoopFileSystem(logDir)
private val fs = Utils.getHadoopFileSystem(resolvedLogDir)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
Expand Down Expand Up @@ -76,14 +76,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

private def initialize() {
// Validate the log directory.
val path = new Path(logDir)
val path = new Path(resolvedLogDir)
if (!fs.exists(path)) {
throw new IllegalArgumentException(
"Logging directory specified does not exist: %s".format(logDir))
"Logging directory specified does not exist: %s".format(resolvedLogDir))
}
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
"Logging directory specified is not a directory: %s".format(resolvedLogDir))
}

checkForLogs()
Expand All @@ -95,15 +95,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

override def getAppUI(appId: String): SparkUI = {
try {
val appLogDir = fs.getFileStatus(new Path(logDir, appId))
loadAppInfo(appLogDir, true)._2
val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
ui
} catch {
case e: FileNotFoundException => null
}
}

override def getConfig(): Map[String, String] =
Map(("Event Log Location" -> logDir))
Map("Event Log Location" -> resolvedLogDir.toString)

/**
* Builds the application list based on the current contents of the log directory.
Expand All @@ -114,14 +115,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
val logStatus = fs.listStatus(new Path(logDir))
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs.filter {
dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
val logInfos = logDirs.filter { dir =>
fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
}

val currentApps = Map[String, ApplicationHistoryInfo](
appList.map(app => (app.id -> app)):_*)
appList.map(app => app.id -> app):_*)

// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
Expand All @@ -131,7 +132,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
try {
newApps += loadAppInfo(dir, false)._1
val (app, _) = loadAppInfo(dir, renderUI = false)
newApps += app
} catch {
case e: Exception => logError(s"Failed to load app info from directory $dir.")
}
Expand Down Expand Up @@ -159,9 +161,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
*/
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
val path = logDir.getPath
val appId = path.getName
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
{ providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
if (allApps.size > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}
import org.apache.spark.util.SignalLogger

/**
* A web server that renders SparkUIs of completed applications.
Expand Down Expand Up @@ -175,7 +175,7 @@ object HistoryServer extends Logging {
def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
val args = new HistoryServerArguments(conf, argStrings)
new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)

val providerName = conf.getOption("spark.history.provider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy.history

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

/**
* Command-line parser for the master.
Expand All @@ -32,6 +31,7 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
args match {
case ("--dir" | "-d") :: value :: tail =>
logDir = value
conf.set("spark.history.fs.logDirectory", value)
parse(tail)

case ("--help" | "-h") :: tail =>
Expand All @@ -42,9 +42,6 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case _ =>
printUsageAndExit(1)
}
if (logDir != null) {
conf.set("spark.history.fs.logDirectory", logDir)
}
}

private def printUsageAndExit(exitCode: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{FileLogger, JsonProtocol}
import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}

/**
* A SparkListener that logs events to persistent storage.
Expand All @@ -55,7 +55,7 @@ private[spark] class EventLoggingListener(
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
val logDir = logBaseDir + "/" + name
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")

protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
Expand Down Expand Up @@ -208,7 +208,7 @@ private[spark] object EventLoggingListener extends Logging {
} catch {
case e: Exception =>
logError("Exception in parsing logging info from directory %s".format(logDir), e)
EventLoggingInfo.empty
EventLoggingInfo.empty
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] class FileLogger(
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}

private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
private val fileSystem = Utils.getHadoopFileSystem(logDir)
var fileIndex = 0

// Only used if compression is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val expectedLogDir = logDirPath.toString
assert(eventLogger.logDir.startsWith(expectedLogDir))
assert(eventLogger.logDir.contains(expectedLogDir))

// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
Expand Down