Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,13 @@ private[deploy] class Master(
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, recoveryTimeoutMs, TimeUnit.MILLISECONDS)
if (beginRecovery(storedApps, storedDrivers, storedWorkers)) {
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, recoveryTimeoutMs, TimeUnit.MILLISECONDS)
}
}

case CompleteRecovery => completeRecovery()
Expand Down Expand Up @@ -590,7 +591,7 @@ private[deploy] class Master(
private var recoveryStartTimeNs = 0L

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]): Unit = {
storedWorkers: Seq[WorkerInfo]): Boolean = {
recoveryStartTimeNs = System.nanoTime()
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
Expand Down Expand Up @@ -619,6 +620,14 @@ private[deploy] class Master(
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}

// In case of zero workers and apps, we can complete recovery.
if (canCompleteRecovery) {
completeRecovery()
false
} else {
true
}
}

private def completeRecovery(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,30 @@ class MasterSuite extends SparkFunSuite
CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
}

test("SPARK-46664: master should recover quickly in case of zero workers and apps") {
val conf = new SparkConf(loadDefaults = false)
conf.set(RECOVERY_MODE, "CUSTOM")
conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
conf.set(MASTER_REST_SERVER_ENABLED, false)

var master: Master = null
try {
master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
eventually(timeout(2.seconds), interval(100.milliseconds)) {
getState(master) should be(RecoveryState.ALIVE)
}
master.workers.size should be(0)
} finally {
if (master != null) {
master.rpcEnv.shutdown()
master.rpcEnv.awaitTermination()
master = null
FakeRecoveryModeFactory.persistentData.clear()
}
}
}

test("master correctly recover the application") {
val conf = new SparkConf(loadDefaults = false)
conf.set(RECOVERY_MODE, "CUSTOM")
Expand Down