Skip to content

Commit 7881622

Browse files
vhlinskyidongjoon-hyun
authored andcommitted
[SPARK-33841][CORE][3.0] Fix issue with jobs disappearing intermittently from the SHS under high load
### What changes were proposed in this pull request? Mark SHS event log entries that were `processing` at the beginning of the `checkForLogs` run as not stale and check for this mark before deleting an event log. This fixes the issue when a particular job was displayed in the SHS and disappeared after some time, but then, in several minutes showed up again. ### Why are the changes needed? The issue is caused by [SPARK-29043](https://issues.apache.org/jira/browse/SPARK-29043), which is designated to improve the concurrent performance of the History Server. The [change](https://github.com/apache/spark/pull/25797/files#) breaks the ["app deletion" logic](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R563) because of missing proper synchronization for `processing` event log entries. Since SHS now [filters out](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R462) all `processing` event log entries, such entries do not have a chance to be [updated with the new `lastProcessed`](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R472) time and thus any entity that completes processing right after [filtering](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R462) and before [the check for stale entities](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R560) will be identified as stale and will be deleted from the UI until the next `checkForLogs` run. This is because [updated `lastProcessed` time is used as criteria](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R557), and event log entries that missed to be updated with a new time, will match that criteria. The issue can be reproduced by generating a big number of event logs and uploading them to the SHS event log directory on S3. Essentially, around 800(82.6 MB) copies of an event log file were created using [shs-monitor](https://github.com/vladhlinsky/shs-monitor) script. Strange behavior of SHS counting the total number of applications was noticed - at first, the number was increasing as expected, but with the next page refresh, the total number of applications decreased. No errors were logged by SHS. 241 entities are displayed at `20:50:42`: ![1-241-entities-at-20-50](https://user-images.githubusercontent.com/61428392/102611539-c2138d00-4137-11eb-9bbd-d77b22041f3b.png) 203 entities are displayed at `20:52:17`: ![2-203-entities-at-20-52](https://user-images.githubusercontent.com/61428392/102611561-cdff4f00-4137-11eb-91ed-7405fe58a695.png) The number of loaded applications over time: ![4-loaded-applications](https://user-images.githubusercontent.com/61428392/102611586-d8b9e400-4137-11eb-8747-4007fc5469de.png) ### Does this PR introduce _any_ user-facing change? Yes, SHS users won't face the behavior when the number of displayed applications decreases periodically. ### How was this patch tested? Tested using [shs-monitor](https://github.com/vladhlinsky/shs-monitor) script: * Build SHS with the proposed change * Download Hadoop AWS and AWS Java SDK * Prepare S3 bucket and user for programmatic access, grant required roles to the user. Get access key and secret key * Configure SHS to read event logs from S3 * Start [monitor](https://github.com/vladhlinsky/shs-monitor/blob/main/monitor.sh) script to query SHS API * Run 8 [producers](https://github.com/vladhlinsky/shs-monitor/blob/main/producer.sh) for ~10 mins, create 805(83.1 MB) event log copies * Wait for SHS to load all the applications * Verify that the number of loaded applications increases continuously over time ![5-loaded-applications-fixed](https://user-images.githubusercontent.com/61428392/102617363-bf1d9a00-4141-11eb-9bae-f982d02fd30f.png) For more details, please refer to the [shs-monitor](https://github.com/vladhlinsky/shs-monitor) repository. Closes #30842 from vladhlinsky/SPARK-33841-branch-3.0. Authored-by: Vlad Glinsky <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent f67c3c2 commit 7881622

File tree

1 file changed

+20
-6
lines changed

1 file changed

+20
-6
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -460,9 +460,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
460460
val newLastScanTime = clock.getTimeMillis()
461461
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
462462

463+
// Mark entries that are processing as not stale. Such entries do not have a chance to be
464+
// updated with the new 'lastProcessed' time and thus any entity that completes processing
465+
// right after this check and before the check for stale entities will be identified as stale
466+
// and will be deleted from the UI until the next 'checkForLogs' run.
467+
val notStale = mutable.HashSet[String]()
463468
val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
464469
.filter { entry => !isBlacklisted(entry.getPath) }
465-
.filter { entry => !isProcessing(entry.getPath) }
470+
.filter { entry =>
471+
if (isProcessing(entry.getPath)) {
472+
notStale.add(entry.getPath.toString())
473+
false
474+
} else {
475+
true
476+
}
477+
}
466478
.flatMap { entry => EventLogFileReader(fs, entry) }
467479
.filter { reader =>
468480
try {
@@ -562,12 +574,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
562574
.last(newLastScanTime - 1)
563575
.asScala
564576
.toList
565-
stale.filterNot(isProcessing).foreach { log =>
566-
log.appId.foreach { appId =>
567-
cleanAppData(appId, log.attemptId, log.logPath)
568-
listing.delete(classOf[LogInfo], log.logPath)
577+
stale.filterNot(isProcessing)
578+
.filterNot(info => notStale.contains(info.logPath))
579+
.foreach { log =>
580+
log.appId.foreach { appId =>
581+
cleanAppData(appId, log.attemptId, log.logPath)
582+
listing.delete(classOf[LogInfo], log.logPath)
583+
}
569584
}
570-
}
571585

572586
lastScanTime.set(newLastScanTime)
573587
} catch {

0 commit comments

Comments
 (0)