From 2c0b148c0491646bce78316714babc0a3460a4e1 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Jun 2024 16:03:49 -0700 Subject: [PATCH 1/3] Add tests. --- .../spark/internal/config/package.scala | 6 ++ .../spark/deploy/SparkSubmitSuite.scala | 66 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a7268c640991..fbd02e8e4f4e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2206,6 +2206,12 @@ package object config { .toSequence .createWithDefault(Nil) + private[spark] val SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT = + ConfigBuilder("spark.submit.callSystemExitOnMainExit") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + private[spark] val SCHEDULER_ALLOCATION_FILE = ConfigBuilder("spark.scheduler.allocation.file") .version("0.8.1") diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 40d8eae644a0..bf3efcb96962 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1589,6 +1589,44 @@ class SparkSubmitSuite runSparkSubmit(argsSuccess, expectFailure = false)) } + test("spark.submit.callSystemExitOnMainExit returns non-zero exit code on unclean main exit") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", MainThrowsUncaughtExceptionSparkApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--conf", s"${SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key}=true", + unusedJar.toString + ) + assertResult(1)(runSparkSubmit(args, expectFailure = true)) + } + + test("spark.submit.callSystemExitOnMainExit calls system exit on clean main exit") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", NonDaemonThreadSparkApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--conf", s"${SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key}=true", + unusedJar.toString + ) + // With SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT set to false, the non-daemon thread will + // prevent the JVM from beginning shutdown and the following call will fail with a + // timeout: + assertResult(0)(runSparkSubmit(args)) + } + + test("spark.submit.callSystemExitOnMainExit with main that explicitly calls System.exit") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", + MainExplicitlyCallsSystemExit3SparkApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--conf", s"${SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key}=true", + unusedJar.toString + ) + // This main class explicitly exits with System.exit(3), hence this expected exit code: + assertResult(3)(runSparkSubmit(args, expectFailure = true)) + } + private def testRemoteResources( enableHttpFs: Boolean, forceDownloadSchemes: Seq[String] = Nil): Unit = { @@ -1855,6 +1893,34 @@ object SimpleApplicationTest { } } +object MainThrowsUncaughtExceptionSparkApplicationTest { + def main(args: Array[String]): Unit = { + throw new Exception("User exception") + } +} + +object NonDaemonThreadSparkApplicationTest { + def main(args: Array[String]): Unit = { + val nonDaemonThread: Thread = new Thread { + override def run(): Unit = { + while (true) { + Thread.sleep(1000) + } + } + } + nonDaemonThread.setDaemon(false) + nonDaemonThread.setName("Non-Daemon-Thread") + nonDaemonThread.start() + // Fall off the end of the main method. + } +} + +object MainExplicitlyCallsSystemExit3SparkApplicationTest { + def main(args: Array[String]): Unit = { + System.exit(3) + } +} + object UserClasspathFirstTest { def main(args: Array[String]): Unit = { val ccl = Thread.currentThread().getContextClassLoader() From d6108ce1f9c16b7e8d93c6b308585f86c0afa179 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Jun 2024 16:05:34 -0700 Subject: [PATCH 2/3] Implement feature. --- .../org/apache/spark/deploy/SparkSubmit.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7bb945ab9f14..73c11f0bb228 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1021,11 +1021,19 @@ private[spark] class SparkSubmit extends Logging { e } + var exitCode: Int = 1 try { app.start(childArgs.toArray, sparkConf) + exitCode = 0 } catch { case t: Throwable => - throw findCause(t) + val cause = findCause(t) + cause match { + case e: SparkUserAppException => + exitCode = e.exitCode + case _ => + } + throw cause } finally { if (args.master.startsWith("k8s") && !isShell(args.primaryResource) && !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) && @@ -1036,6 +1044,12 @@ private[spark] class SparkSubmit extends Logging { case e: Throwable => logError("Failed to close SparkContext", e) } } + if (sparkConf.get(SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT)) { + logInfo( + log"Calling System.exit() with exit code ${MDC(LogKeys.EXIT_CODE, exitCode)} " + + log"because main ${MDC(LogKeys.CONFIG, SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT.key)}=true") + exitFn(exitCode) + } } } From f84ca1ee77b76b39bd274845905063be12803ede Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Jun 2024 16:09:44 -0700 Subject: [PATCH 3/3] config doc --- .../main/scala/org/apache/spark/internal/config/package.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index fbd02e8e4f4e..968b496452f2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2208,6 +2208,9 @@ package object config { private[spark] val SUBMIT_CALL_SYSTEM_EXIT_ON_MAIN_EXIT = ConfigBuilder("spark.submit.callSystemExitOnMainExit") + .doc("If true, SparkSubmit will call System.exit() to initiate JVM shutdown once the " + + "user's main method has exited. This can be useful in cases where non-daemon JVM " + + "threads might otherwise prevent the JVM from shutting down on its own.") .version("4.0.0") .booleanConf .createWithDefault(false)