From d506d7b5a02a0ab5728dea1060b46ec635fb493e Mon Sep 17 00:00:00 2001 From: Yan Xiaole Date: Sun, 9 Aug 2020 16:47:31 -0700 Subject: [PATCH 1/2] [SPARK-32557][CORE] Logging and swallowing the exception per entry in History server ### What changes were proposed in this pull request? This PR adds a try catch wrapping the History server scan logic to log and swallow the exception per entry. ### Why are the changes needed? As discussed in #29350 , one entry failure shouldn't affect others. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested. Closes #29374 from yanxiaole/SPARK-32557. Authored-by: Yan Xiaole Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c26215295e914..f5e7c4fa6ac91 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -27,6 +27,7 @@ import java.util.zip.ZipOutputStream import scala.collection.JavaConverters._ import scala.collection.mutable import scala.io.Source +import scala.util.control.NonFatal import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore @@ -528,7 +529,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case _: FileNotFoundException => false } - case _: FileNotFoundException => + case NonFatal(e) => + logWarning(s"Error while filtering log ${reader.rootPath}", e) false } } From 3a0af1011a1bf18d6445fd6426b047cffd865d72 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 15 Oct 2020 15:28:52 +0900 Subject: [PATCH 2/2] Revert "Revert "[SPARK-33146][CORE] Check for non-fatal errors when loading new applications in SHS"" This reverts commit e40c147a5d194adbba13f12590959dc68347ec14. --- .../deploy/history/FsHistoryProvider.scala | 3 ++ .../history/FsHistoryProviderSuite.scala | 49 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f5e7c4fa6ac91..7e63d5500b2ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -527,6 +527,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) reader.fileSizeForLastIndex > 0 } catch { case _: FileNotFoundException => false + case NonFatal(e) => + logWarning(s"Error while reading new log ${reader.rootPath}", e) + false } case NonFatal(e) => diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index c2f34fc3a95ed..f3beb35f1f011 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1470,6 +1470,55 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } + test("SPARK-33146: don't let one bad rolling log folder prevent loading other applications") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val provider = new FsHistoryProvider(conf) + + val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf) + writer.start() + + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app", Some("app"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + provider.checkForLogs() + provider.cleanLogs() + assert(dir.listFiles().size === 1) + assert(provider.getListing.length === 1) + + // Manually delete the appstatus file to make an invalid rolling event log + val appStatusPath = RollingEventLogFilesWriter.getAppStatusFilePath(new Path(writer.logPath), + "app", None, true) + fs.delete(appStatusPath, false) + provider.checkForLogs() + provider.cleanLogs() + assert(provider.getListing.length === 0) + + // Create a new application + val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI, conf, hadoopConf) + writer2.start() + writeEventsToRollingWriter(writer2, Seq( + SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + + // Both folders exist but only one application found + provider.checkForLogs() + provider.cleanLogs() + assert(provider.getListing.length === 1) + assert(dir.listFiles().size === 2) + + // Make sure a new provider sees the valid application + provider.stop() + val newProvider = new FsHistoryProvider(conf) + newProvider.checkForLogs() + assert(newProvider.getListing.length === 1) + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: