Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,9 @@ private[deploy] class Master(
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}

// In case of zero workers and apps, we can complete recovery.
if (canCompleteRecovery) { completeRecovery() }
}

private def completeRecovery(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,30 @@ class MasterSuite extends SparkFunSuite
CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
}

test("SPARK-46664: master should recover quickly in case of zero workers and apps") {
val conf = new SparkConf(loadDefaults = false)
conf.set(RECOVERY_MODE, "CUSTOM")
conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
conf.set(MASTER_REST_SERVER_ENABLED, false)

var master: Master = null
try {
master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
eventually(timeout(2.seconds), interval(100.milliseconds)) {
getState(master) should be(RecoveryState.ALIVE)
}
master.workers.size should be(0)
} finally {
if (master != null) {
master.rpcEnv.shutdown()
master.rpcEnv.awaitTermination()
master = null
FakeRecoveryModeFactory.persistentData.clear()
}
}
}

test("master correctly recover the application") {
val conf = new SparkConf(loadDefaults = false)
conf.set(RECOVERY_MODE, "CUSTOM")
Expand Down