Skip to content

Commit db714bb

Browse files
AngersZhuuuuSandishKumarHN
authored andcommitted
[SPARK-38270][SQL] Spark SQL CLI's AM should keep same exit code with client side
### What changes were proposed in this pull request? Current Spark SQL CLI alway use shutdown hook to stop SparkSQLEnv ``` // Clean up after we exit ShutdownHookManager.addShutdownHook { () => SparkSQLEnv.stop() } ``` but use process ret to close client side jvm ``` while (line != null) { if (!line.startsWith("--")) { if (prefix.nonEmpty) { prefix += '\n' } if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) { line = prefix + line ret = cli.processLine(line, true) prefix = "" currentPrompt = promptWithCurrentDB } else { prefix = prefix + line currentPrompt = continuedPromptWithDBSpaces } } line = reader.readLine(currentPrompt + "> ") } sessionState.close() System.exit(ret) } ``` ``` if (sessionState.execString != null) { exitCode = cli.processLine(sessionState.execString) System.exit(exitCode) } try { if (sessionState.fileName != null) { exitCode = cli.processFile(sessionState.fileName) System.exit(exitCode) } } catch { case e: FileNotFoundException => logError(s"Could not open input file for reading. (${e.getMessage})") exitCode = 3 System.exit(exitCode) } ``` This always cause client side exit code not consistent with AM. IN this pr I prefer to pass the exit code to `SparkContext.stop()` method to pass a clear client side status to AM side in client mode. In this pr, I add a new `stop` method in `SchedulerBackend` ``` def stop(exitCode: Int): Unit = stop() ``` So we don't need to implement it for all kinds of scheduler backend. In this pr, we only handle the case me meet for `YarnClientSchedulerBackend`, then we can only implement `stop(exitCode: Int)` for this class, then for yarn client mode, it can work now. I think this can benefit many similar case in future. ### Why are the changes needed? Keep client side status consistent with AM side ### Does this PR introduce _any_ user-facing change? With this pr, client side status will be same with am side ### How was this patch tested? MT Closes apache#35594 from AngersZhuuuu/SPARK-38270. Lead-authored-by: Angerszhuuuu <[email protected]> Co-authored-by: AngersZhuuuu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 393a767 commit db714bb

File tree

14 files changed

+68
-31
lines changed

14 files changed

+68
-31
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,7 +2068,20 @@ class SparkContext(config: SparkConf) extends Logging {
20682068
/**
20692069
* Shut down the SparkContext.
20702070
*/
2071-
def stop(): Unit = {
2071+
def stop(): Unit = stop(0)
2072+
2073+
/**
2074+
* Shut down the SparkContext with exit code that will passed to scheduler backend.
2075+
* In client mode, client side may call `SparkContext.stop()` to clean up but exit with
2076+
* code not equal to 0. This behavior cause resource scheduler such as `ApplicationMaster`
2077+
* exit with success status but client side exited with failed status. Spark can call
2078+
* this method to stop SparkContext and pass client side correct exit code to scheduler backend.
2079+
* Then scheduler backend should send the exit code to corresponding resource scheduler
2080+
* to keep consistent.
2081+
*
2082+
* @param exitCode Specified exit code that will passed to scheduler backend in client mode.
2083+
*/
2084+
def stop(exitCode: Int): Unit = {
20722085
if (LiveListenerBus.withinListenerThread.value) {
20732086
throw new SparkException(s"Cannot stop SparkContext within listener bus thread.")
20742087
}
@@ -2101,7 +2114,7 @@ class SparkContext(config: SparkConf) extends Logging {
21012114
}
21022115
if (_dagScheduler != null) {
21032116
Utils.tryLogNonFatalError {
2104-
_dagScheduler.stop()
2117+
_dagScheduler.stop(exitCode)
21052118
}
21062119
_dagScheduler = null
21072120
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2889,7 +2889,7 @@ private[spark] class DAGScheduler(
28892889
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
28902890
}
28912891

2892-
def stop(): Unit = {
2892+
def stop(exitCode: Int = 0): Unit = {
28932893
Utils.tryLogNonFatalError {
28942894
messageScheduler.shutdownNow()
28952895
}
@@ -2900,7 +2900,7 @@ private[spark] class DAGScheduler(
29002900
eventProcessLoop.stop()
29012901
}
29022902
Utils.tryLogNonFatalError {
2903-
taskScheduler.stop()
2903+
taskScheduler.stop(exitCode)
29042904
}
29052905
}
29062906

core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ private[spark] trait SchedulerBackend {
3030

3131
def start(): Unit
3232
def stop(): Unit
33+
def stop(exitCode: Int): Unit = stop()
3334
/**
3435
* Update the current offers and schedule tasks
3536
*/

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[spark] trait TaskScheduler {
4949
def postStartHook(): Unit = { }
5050

5151
// Disconnect from the cluster.
52-
def stop(): Unit
52+
def stop(exitCode: Int = 0): Unit
5353

5454
// Submit a sequence of tasks to run.
5555
def submitTasks(taskSet: TaskSet): Unit

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -971,13 +971,13 @@ private[spark] class TaskSchedulerImpl(
971971
}
972972
}
973973

974-
override def stop(): Unit = {
974+
override def stop(exitCode: Int = 0): Unit = {
975975
Utils.tryLogNonFatalError {
976976
speculationScheduler.shutdown()
977977
}
978978
if (backend != null) {
979979
Utils.tryLogNonFatalError {
980-
backend.stop()
980+
backend.stop(exitCode)
981981
}
982982
}
983983
if (taskResultGetter != null) {

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private[spark] object CoarseGrainedClusterMessages {
151151
case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
152152

153153
// Used internally by executors to shut themselves down.
154-
case object Shutdown extends CoarseGrainedClusterMessage
154+
case class Shutdown(exitCode: Int = 0) extends CoarseGrainedClusterMessage
155155

156156
// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
157157
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
176176
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
177177
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
178178
override def start() = {}
179-
override def stop() = {}
179+
override def stop(exitCode: Int) = {}
180180
override def executorHeartbeatReceived(
181181
execId: String,
182182
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
@@ -846,7 +846,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
846846
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
847847
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
848848
override def start(): Unit = {}
849-
override def stop(): Unit = {}
849+
override def stop(exitCode: Int): Unit = {}
850850
override def submitTasks(taskSet: TaskSet): Unit = {
851851
taskSets += taskSet
852852
}

core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private class DummyTaskScheduler extends TaskScheduler {
8080
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
8181
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
8282
override def start(): Unit = {}
83-
override def stop(): Unit = {}
83+
override def stop(exitCode: Int): Unit = {}
8484
override def submitTasks(taskSet: TaskSet): Unit = {}
8585
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
8686
override def killTaskAttempt(

project/MimaExcludes.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,15 @@ object MimaExcludes {
100100

101101
// [SPARK-37935][SQL] Eliminate separate error sub-classes fields
102102
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkException.this"),
103-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.AnalysisException.this")
103+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.AnalysisException.this"),
104+
105+
// [SPARK-38270][SQL] Spark SQL CLI's AM should keep same exit code with client side
106+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productPrefix"),
107+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productArity"),
108+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productElement"),
109+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productIterator"),
110+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.canEqual"),
111+
ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.toString")
104112
)
105113

106114
// Defulat exclude rules

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,8 @@ private[spark] class ApplicationMaster(
790790
private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef)
791791
extends RpcEndpoint with Logging {
792792
@volatile private var shutdown = false
793+
@volatile private var exitCode = 0
794+
793795
private val clientModeTreatDisconnectAsFailed =
794796
sparkConf.get(AM_CLIENT_MODE_TREAT_DISCONNECT_AS_FAILED)
795797

@@ -810,7 +812,9 @@ private[spark] class ApplicationMaster(
810812
case UpdateDelegationTokens(tokens) =>
811813
SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
812814

813-
case Shutdown => shutdown = true
815+
case Shutdown(code) =>
816+
exitCode = code
817+
shutdown = true
814818
}
815819

816820
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -854,8 +858,13 @@ private[spark] class ApplicationMaster(
854858
// This avoids potentially reporting incorrect exit codes if the driver fails
855859
if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
856860
if (shutdown || !clientModeTreatDisconnectAsFailed) {
857-
logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
858-
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
861+
if (exitCode == 0) {
862+
logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
863+
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
864+
} else {
865+
logError(s"Driver terminated with exit code ${exitCode}! Shutting down. $remoteAddress")
866+
finish(FinalApplicationStatus.FAILED, exitCode)
867+
}
859868
} else {
860869
logError(s"Application Master lost connection with driver! Shutting down. $remoteAddress")
861870
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_DISCONNECTED)

0 commit comments

Comments
 (0)