Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ private[spark] class ApplicationMaster(
case Shutdown(code) =>
exitCode = code
shutdown = true
allocator.setShutdown(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so maybe I'm missing a call but it looks like this Shutdown message is only sent in Client mode. Is that the only time you are seeing this issue? ie when you have the driver is local and the applicationmaster and yarn allocator are on the cluster?
But the log message in the description has YarnClusterSchedulerBackend which makes me think this is cluster mode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like this Shutdown message is only sent in Client mode

Yea, I just noticed that, do you think it's a good idea to send Shutdown in cluster mode as well? or any other suggestions? cc @AngersZhuuuu as you are the author of that code.

the log message in the description has YarnClusterSchedulerBackend which makes me think this is cluster mode.

You are right, my reported job failed in cluster mode, and I think both yarn client/cluster modes have this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this won't fix your issue as is so we would need something for cluster mode. I'm fine with sending the shutdown message. It would be nice if you could test the fix on your use case to make sure it fixes it as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to send Shutdown in cluster mode too, not only for client mode.

}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ private[yarn] class YarnAllocator(
}
}

@volatile private var shutdown = false

// The default profile is always present so we need to initialize the datastructures keyed by
// ResourceProfile id to ensure its present if things start running before a request for
// executors could add it. This approach is easier then going and special casing everywhere.
Expand All @@ -215,6 +217,8 @@ private[yarn] class YarnAllocator(

initDefaultProfile()

def setShutdown(shutdown: Boolean): Unit = this.shutdown = shutdown

def getNumExecutorsRunning: Int = synchronized {
runningExecutorsPerResourceProfileId.values.map(_.size).sum
}
Expand Down Expand Up @@ -835,6 +839,8 @@ private[yarn] class YarnAllocator(
// now I think its ok as none of the containers are expected to exit.
val exitStatus = completedContainer.getExitStatus
val (exitCausedByApp, containerExitReason) = exitStatus match {
case _ if shutdown =>
(false, s"Executor for container $containerId exited after Application shutdown.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore all executor exit after Spark shutdown.

case ContainerExitStatus.SUCCESS =>
(false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
"preemption) and not because of an error in the running job.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private[spark] class YarnClientSchedulerBackend(
*/
override def stop(exitCode: Int): Unit = {
assert(client != null, "Attempted to stop this scheduler before starting it!")
yarnSchedulerEndpoint.handleClientModeDriverStop(exitCode)
yarnSchedulerEndpoint.signalDriverStop(exitCode)
if (monitorThread != null) {
monitorThread.stopMonitor()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ private[spark] class YarnClusterSchedulerBackend(
startBindings()
}

override def stop(exitCode: Int): Unit = {
yarnSchedulerEndpoint.signalDriverStop(exitCode)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs I just notice super.stop is missed here, opened #39053 for that.

}

override def getDriverLogUrls: Option[Map[String, String]] = {
YarnContainerInfoHelper.getLogUrls(sc.hadoopConfiguration, container = None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private[spark] abstract class YarnSchedulerBackend(
removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
}

private[cluster] def handleClientModeDriverStop(exitCode: Int): Unit = {
private[cluster] def signalDriverStop(exitCode: Int): Unit = {
amEndpoint match {
case Some(am) =>
am.send(Shutdown(exitCode))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,28 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers {
.updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
}

test("SPARK-39601 YarnAllocator should not count executor failure after shutdown") {
val (handler, _) = createAllocator()
handler.updateResourceRequests()
handler.getNumExecutorsFailed should be(0)

val failedBeforeShutdown = createContainer("host1")
val failedAfterShutdown = createContainer("host2")
handler.handleAllocatedContainers(Seq(failedBeforeShutdown, failedAfterShutdown))

val failedBeforeShutdownStatus = ContainerStatus.newInstance(
failedBeforeShutdown.getId, ContainerState.COMPLETE, "Failed", -1)
val failedAfterShutdownStatus = ContainerStatus.newInstance(
failedAfterShutdown.getId, ContainerState.COMPLETE, "Failed", -1)

handler.processCompletedContainers(Seq(failedBeforeShutdownStatus))
handler.getNumExecutorsFailed should be(1)

handler.setShutdown(true)
handler.processCompletedContainers(Seq(failedAfterShutdownStatus))
handler.getNumExecutorsFailed should be(1)
}

test("SPARK-28577#YarnAllocator.resource.memory should include offHeapSize " +
"when offHeapEnabled is true.") {
val originalOffHeapEnabled = sparkConf.get(MEMORY_OFFHEAP_ENABLED)
Expand Down