Skip to content
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,20 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Shut down the SparkContext.
*/
def stop(): Unit = {
def stop(): Unit = stop(0)

/**
* Shut down the SparkContext with exit code that will passed to scheduler backend.
* In client mode, client side may call `SparkContext.stop()` to clean up but exit with
* code not equal to 0. This behavior cause resource scheduler such as `ApplicationMaster`
* exit with success status but client side exited with failed status. Spark can call
* this method to stop SparkContext and pass client side correct exit code to scheduler backend.
* Then scheduler backend should send the exit code to corresponding resource scheduler
* to keep consistent.
*
* @param exitCode Specified exit code that will passed to scheduler backend in client mode.
*/
def stop(exitCode: Int): Unit = {
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(s"Cannot stop SparkContext within listener bus thread.")
}
Expand Down Expand Up @@ -2101,7 +2114,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
if (_dagScheduler != null) {
Utils.tryLogNonFatalError {
_dagScheduler.stop()
_dagScheduler.stop(exitCode)
}
_dagScheduler = null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2847,11 +2847,11 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}

def stop(): Unit = {
def stop(exitCode: Int = 0): Unit = {
messageScheduler.shutdownNow()
shuffleMergeFinalizeScheduler.shutdownNow()
eventProcessLoop.stop()
taskScheduler.stop()
taskScheduler.stop(exitCode)
}

eventProcessLoop.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private[spark] trait SchedulerBackend {

def start(): Unit
def stop(): Unit
def stop(exitCode: Int): Unit = stop()
/**
* Update the current offers and schedule tasks
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[spark] trait TaskScheduler {
def postStartHook(): Unit = { }

// Disconnect from the cluster.
def stop(): Unit
def stop(exitCode: Int = 0): Unit

// Submit a sequence of tasks to run.
def submitTasks(taskSet: TaskSet): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,10 +970,10 @@ private[spark] class TaskSchedulerImpl(
}
}

override def stop(): Unit = {
override def stop(exitCode: Int = 0): Unit = {
speculationScheduler.shutdown()
if (backend != null) {
backend.stop()
backend.stop(exitCode)
}
if (taskResultGetter != null) {
taskResultGetter.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage

// Used internally by executors to shut themselves down.
case object Shutdown extends CoarseGrainedClusterMessage
case class Shutdown(exitCode: Int = 0) extends CoarseGrainedClusterMessage

// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
override def start() = {}
override def stop() = {}
override def stop(exitCode: Int) = {}
override def executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
Expand Down Expand Up @@ -839,7 +839,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
override def start(): Unit = {}
override def stop(): Unit = {}
override def stop(exitCode: Int): Unit = {}
override def submitTasks(taskSet: TaskSet): Unit = {
taskSets += taskSet
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private class DummyTaskScheduler extends TaskScheduler {
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
override def start(): Unit = {}
override def stop(): Unit = {}
override def stop(exitCode: Int): Unit = {}
override def submitTasks(taskSet: TaskSet): Unit = {}
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
override def killTaskAttempt(
Expand Down
11 changes: 10 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,16 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.AnalysisException.copy$default$7"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.AnalysisException.copy"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.AnalysisException.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.SparkException.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.SparkException.this"),


// [SPARK-38270][SQL] Spark SQL CLI's AM should keep same exit code with client side
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productPrefix"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productArity"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productElement"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.productIterator"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.canEqual"),
ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages#Shutdown.toString")
)

// Defulat exclude rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,8 @@ private[spark] class ApplicationMaster(
private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef)
extends RpcEndpoint with Logging {
@volatile private var shutdown = false
@volatile private var exitCode = 0

private val clientModeTreatDisconnectAsFailed =
sparkConf.get(AM_CLIENT_MODE_TREAT_DISCONNECT_AS_FAILED)

Expand All @@ -810,7 +812,9 @@ private[spark] class ApplicationMaster(
case UpdateDelegationTokens(tokens) =>
SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)

case Shutdown => shutdown = true
case Shutdown(code) =>
exitCode = code
shutdown = true
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -854,8 +858,13 @@ private[spark] class ApplicationMaster(
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
if (shutdown || !clientModeTreatDisconnectAsFailed) {
logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
if (exitCode == 0) {
logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
} else {
logError(s"Driver terminated with exit code ${exitCode}! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.FAILED, exitCode)
}
} else {
logError(s"Application Master lost connection with driver! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_DISCONNECTED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ private[spark] class YarnClientSchedulerBackend(
/**
* Stop the scheduler. This assumes `start()` has already been called.
*/
override def stop(): Unit = {
override def stop(exitCode: Int): Unit = {
assert(client != null, "Attempted to stop this scheduler before starting it!")
yarnSchedulerEndpoint.handleClientModeDriverStop()
yarnSchedulerEndpoint.handleClientModeDriverStop(exitCode)
if (monitorThread != null) {
monitorThread.stopMonitor()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ private[spark] abstract class YarnSchedulerBackend(
removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
}

private[cluster] def handleClientModeDriverStop(): Unit = {
private[cluster] def handleClientModeDriverStop(exitCode: Int): Unit = {
amEndpoint match {
case Some(am) =>
am.send(Shutdown)
am.send(Shutdown(exitCode))
case None =>
logWarning("Attempted to send shutdown message before the AM has registered!")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
private val continuedPrompt = "".padTo(prompt.length, ' ')
private var transport: TSocket = _
private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop."
private var exitCode = 0

initializeLogIfNecessary(true)
installSignalHandler()
Expand All @@ -83,6 +84,11 @@ private[hive] object SparkSQLCLIDriver extends Logging {
})
}

def exit(code: Int): Unit = {
exitCode = code
System.exit(exitCode)
}

def main(args: Array[String]): Unit = {
val oproc = new OptionsProcessor()
if (!oproc.process_stage1(args)) {
Expand All @@ -105,12 +111,12 @@ private[hive] object SparkSQLCLIDriver extends Logging {
} catch {
case e: UnsupportedEncodingException =>
sessionState.close()
System.exit(ERROR_PATH_NOT_FOUND)
exit(ERROR_PATH_NOT_FOUND)
}

if (!oproc.process_stage2(sessionState)) {
sessionState.close()
System.exit(ERROR_MISUSE_SHELL_BUILTIN)
exit(ERROR_MISUSE_SHELL_BUILTIN)
}

// Set all properties specified via command line.
Expand Down Expand Up @@ -145,7 +151,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Clean up after we exit
ShutdownHookManager.addShutdownHook { () =>
sessionState.close()
SparkSQLEnv.stop()
SparkSQLEnv.stop(exitCode)
}

if (isRemoteMode(sessionState)) {
Expand Down Expand Up @@ -190,7 +196,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
sessionState.info = new PrintStream(System.err, true, UTF_8.name())
sessionState.err = new PrintStream(System.err, true, UTF_8.name())
} catch {
case e: UnsupportedEncodingException => System.exit(ERROR_PATH_NOT_FOUND)
case e: UnsupportedEncodingException => exit(ERROR_PATH_NOT_FOUND)
}

if (sessionState.database != null) {
Expand All @@ -211,17 +217,17 @@ private[hive] object SparkSQLCLIDriver extends Logging {
cli.printMasterAndAppId

if (sessionState.execString != null) {
System.exit(cli.processLine(sessionState.execString))
exit(cli.processLine(sessionState.execString))
}

try {
if (sessionState.fileName != null) {
System.exit(cli.processFile(sessionState.fileName))
exit(cli.processFile(sessionState.fileName))
}
} catch {
case e: FileNotFoundException =>
logError(s"Could not open input file for reading. (${e.getMessage})")
System.exit(ERROR_PATH_NOT_FOUND)
exit(ERROR_PATH_NOT_FOUND)
}

val reader = new ConsoleReader()
Expand Down Expand Up @@ -303,7 +309,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {

sessionState.close()

System.exit(ret)
exit(ret)
}


Expand Down Expand Up @@ -358,7 +364,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
if (cmd_lower.equals("quit") ||
cmd_lower.equals("exit")) {
sessionState.close()
System.exit(EXIT_SUCCESS)
SparkSQLCLIDriver.exit(EXIT_SUCCESS)
}
if (tokens(0).toLowerCase(Locale.ROOT).equals("source") ||
cmd_trimmed.startsWith("!") || isRemoteMode) {
Expand Down Expand Up @@ -481,7 +487,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
// Kill the VM on second ctrl+c
if (!initialRequest) {
console.printInfo("Exiting the JVM")
System.exit(ERROR_COMMAND_NOT_FOUND)
SparkSQLCLIDriver.exit(ERROR_COMMAND_NOT_FOUND)
}

// Interrupt the CLI thread to stop the current statement and return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ private[hive] object SparkSQLEnv extends Logging {
}

/** Cleans up and shuts down the Spark SQL environments. */
def stop(): Unit = {
def stop(exitCode: Int = 0): Unit = {
logDebug("Shutting down Spark SQL Environment")
// Stop the SparkContext
if (SparkSQLEnv.sparkContext != null) {
sparkContext.stop()
sparkContext.stop(exitCode)
sparkContext = null
sqlContext = null
}
Expand Down