Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private val NOT_STARTED = "<Not Started>"

private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"

// Interval between safemode checks.
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
"spark.history.fs.safemodeCheck.interval", "5s")
Expand All @@ -89,6 +91,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.cleaner.interval", "1d")

// Number of threads used to replay event logs.
private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)

private val logDir = conf.getOption("spark.history.fs.logDirectory")
.map { d => Utils.resolveURI(d).toString }
.getOrElse(DEFAULT_LOG_DIR)
Expand Down Expand Up @@ -129,11 +135,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

/**
* An Executor to fetch and parse log files.
* Fixed size thread pool to fetch and parse log files.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should update the description since no longer a single Executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

private val replayExecutor: ExecutorService = {
if (!conf.contains("spark.testing")) {
ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor")
} else {
MoreExecutors.sameThreadExecutor()
}
Expand Down Expand Up @@ -297,10 +303,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (logInfos.nonEmpty) {
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
}
logInfos.grouped(20)
.map { batch =>
logInfos.map { file =>
replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(batch)
override def run(): Unit = mergeApplicationListing(file)
})
}
.foreach { task =>
Expand Down Expand Up @@ -385,9 +390,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
val newAttempts = logs.flatMap { fileStatus =>
try {
private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
val newAttempts = try {
val bus = new ReplayListenerBus()
val res = replay(fileStatus, bus)
res match {
Expand All @@ -403,7 +407,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
e)
None
}
}

if (newAttempts.isEmpty) {
return
Expand All @@ -413,45 +416,48 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// contains both the new app attempt, and those that were already loaded in the existing apps
// map. If an attempt has been updated, it replaces the old attempt in the list.
val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
newAttempts.foreach { attempt =>
val appInfo = newAppMap.get(attempt.appId)
.orElse(applications.get(attempt.appId))
.map { app =>
val attempts =
app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
new FsApplicationHistoryInfo(attempt.appId, attempt.name,
attempts.sortWith(compareAttemptInfo))
}
.getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
newAppMap(attempt.appId) = appInfo
}

// Merge the new app list with the existing one, maintaining the expected ordering (descending
// end time). Maintaining the order is important to avoid having to sort the list every time
// there is a request for the log list.
val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
if (!mergedApps.contains(info.id)) {
mergedApps += (info.id -> info)
applications.synchronized {
newAttempts.foreach { attempt =>
val appInfo = newAppMap.get(attempt.appId)
.orElse(applications.get(attempt.appId))
.map { app =>
val attempts =
app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt)
new FsApplicationHistoryInfo(attempt.appId, attempt.name,
attempts.sortWith(compareAttemptInfo))
}
.getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
newAppMap(attempt.appId) = appInfo
}
}

val newIterator = newApps.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newAppMap.contains(oldIterator.head.id)) {
oldIterator.next()
} else if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next())
} else {
addIfAbsent(oldIterator.next())
// Merge the new app list with the existing one, maintaining the expected ordering (descending
// end time). Maintaining the order is important to avoid having to sort the list every time
// there is a request for the log list.
val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
if (!mergedApps.contains(info.id)) {
mergedApps += (info.id -> info)
}
}
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)

applications = mergedApps
val newIterator = newApps.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newAppMap.contains(oldIterator.head.id)) {
oldIterator.next()
} else if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next())
} else {
addIfAbsent(oldIterator.next())
}
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)

applications = mergedApps
}
}

/**
Expand Down
7 changes: 7 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ The history server can be configured as follows:
Job history files older than this will be deleted when the filesystem history cleaner runs.
</td>
</tr>
<tr>
<td>spark.history.fs.numReplayThreads</td>
<td>25% of available cores</td>
<td>
Number of threads that will be used by history server to process event logs.
</td>
</tr>
</table>

Note that in all of these UIs, the tables are sortable by clicking their headers,
Expand Down