Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
Expand Down Expand Up @@ -859,7 +859,8 @@ private[yarn] class YarnAllocator(
// .com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had
// oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap
// ache/hadoop/yarn/util/Apps.java#L273 for details)
if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status) ||
SparkContext.getActive.forall(_.isStopped)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to know we are in shutdown vs checking the spark context active level but I need to refresh my memory on that shutdown sequence

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SparkContext set stopped to false at the beginning of the stop procedure, do you have other suggestions to check if spark context is stopped?

def stop(): Unit = {
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(s"Cannot stop SparkContext within listener bus thread.")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
if (!stopped.compareAndSet(false, true)) {
logInfo("SparkContext already stopped.")
return
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't going to work in Cluster mode on yarn where the application master and yarn allocator are not in the same process as the SparkContext. I assume in that case the getActive is returning None and we would do this all the time when we really shouldn't.

Can we tell the allocator we are shutting down when the ApplicationMaster is told to shutdown and do a similar check to prevent this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion, let me try.

(false, s"Container marked as failed: $containerId$onHostStr" +
s". Exit status: ${completedContainer.getExitStatus}" +
s". Diagnostics: ${completedContainer.getDiagnostics}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.config._
Expand All @@ -63,7 +63,7 @@ class MockResolver extends SparkRackResolver(SparkHadoopUtil.get.conf) {

class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
val conf = new YarnConfiguration()
val sparkConf = new SparkConf()
val sparkConf = new SparkConf().setMaster("local")
sparkConf.set(DRIVER_HOST_ADDRESS, "localhost")
sparkConf.set(DRIVER_PORT, 4040)
sparkConf.set(SPARK_JARS, Seq("notarealjar.jar"))
Expand Down Expand Up @@ -547,30 +547,37 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}

test("lost executor removed from backend") {
val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumContainersPendingAllocate should be (4)
val sc = SparkContext.getOrCreate(
sparkConf.clone.setAppName("lost executor removed from backend"))
try {
val (handler, _) = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be(0)
handler.getNumContainersPendingAllocate should be(4)

val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))

val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty)
val resourceProfileToTotalExecs = mutable.HashMap(defaultRP -> 2)
val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(defaultRPId -> 0)
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap, Map(), Set.empty)

val statuses = Seq(container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
val statuses = Seq(container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
}
handler.updateResourceRequests()
handler.processCompletedContainers(statuses)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be(0)
handler.getNumContainersPendingAllocate should be(2)
handler.getNumExecutorsFailed should be(2)
handler.getNumUnexpectedContainerRelease should be(2)
} finally {
sc.stop()
SparkContext.clearActiveContext()
}
handler.updateResourceRequests()
handler.processCompletedContainers(statuses)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumContainersPendingAllocate should be (2)
handler.getNumExecutorsFailed should be (2)
handler.getNumUnexpectedContainerRelease should be (2)
}

test("excluded nodes reflected in amClient requests") {
Expand Down Expand Up @@ -602,96 +609,122 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter

test("window based failure executor counting") {
sparkConf.set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100 * 1000L)
val (handler, _) = createAllocator(4)
val sc = SparkContext.getOrCreate(
sparkConf.clone.setAppName("lost executor removed from backend"))
try {
val (handler, _) = createAllocator(4)

handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumContainersPendingAllocate should be (4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be(0)
handler.getNumContainersPendingAllocate should be(4)

val containers = Seq(
createContainer("host1"),
createContainer("host2"),
createContainer("host3"),
createContainer("host4")
)
handler.handleAllocatedContainers(containers)

val failedStatuses = containers.map { c =>
ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1)
}

val containers = Seq(
createContainer("host1"),
createContainer("host2"),
createContainer("host3"),
createContainer("host4")
)
handler.handleAllocatedContainers(containers)
handler.getNumExecutorsFailed should be(0)

val failedStatuses = containers.map { c =>
ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1)
}
clock.advance(100 * 1000L)
handler.processCompletedContainers(failedStatuses.slice(0, 1))
handler.getNumExecutorsFailed should be(1)

handler.getNumExecutorsFailed should be (0)
clock.advance(101 * 1000L)
handler.getNumExecutorsFailed should be(0)

clock.advance(100 * 1000L)
handler.processCompletedContainers(failedStatuses.slice(0, 1))
handler.getNumExecutorsFailed should be (1)
handler.processCompletedContainers(failedStatuses.slice(1, 3))
handler.getNumExecutorsFailed should be(2)

clock.advance(101 * 1000L)
handler.getNumExecutorsFailed should be (0)
clock.advance(50 * 1000L)
handler.processCompletedContainers(failedStatuses.slice(3, 4))
handler.getNumExecutorsFailed should be(3)

handler.processCompletedContainers(failedStatuses.slice(1, 3))
handler.getNumExecutorsFailed should be (2)
clock.advance(51 * 1000L)
handler.getNumExecutorsFailed should be(1)

clock.advance(50 * 1000L)
handler.processCompletedContainers(failedStatuses.slice(3, 4))
handler.getNumExecutorsFailed should be (3)
clock.advance(50 * 1000L)
handler.getNumExecutorsFailed should be(0)
} finally {
sc.stop()
SparkContext.clearActiveContext()
}
}

clock.advance(51 * 1000L)
handler.getNumExecutorsFailed should be (1)
test("SPARK-26269 SPARK-39601: YarnAllocator excludeOnFailure behaviour") {
val sc = SparkContext.getOrCreate(
sparkConf.clone.setAppName("YarnAllocator excludeOnFailure behaviour"))
try {
val rmClientSpy = spy(rmClient)
val maxExecutors = 13

val (handler, _) = createAllocator(
maxExecutors,
rmClientSpy,
Map(
YARN_EXECUTOR_LAUNCH_EXCLUDE_ON_FAILURE_ENABLED.key -> "true",
MAX_FAILED_EXEC_PER_NODE.key -> "0"))
handler.updateResourceRequests()

clock.advance(50 * 1000L)
handler.getNumExecutorsFailed should be (0)
}
val hosts = (0 until maxExecutors).map(i => s"host$i")
val ids = 0 to maxExecutors
val containers = createContainers(hosts, ids)

val nonExcludedStatuses = Seq(
ContainerExitStatus.SUCCESS,
ContainerExitStatus.PREEMPTED,
ContainerExitStatus.KILLED_EXCEEDED_VMEM,
ContainerExitStatus.KILLED_EXCEEDED_PMEM,
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
ContainerExitStatus.KILLED_BY_APPMASTER,
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
ContainerExitStatus.ABORTED,
ContainerExitStatus.DISKS_FAILED)

val nonExcludedContainerStatuses = nonExcludedStatuses.zipWithIndex.map {
case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
}

test("SPARK-26269: YarnAllocator should have same excludeOnFailure behaviour with YARN") {
val rmClientSpy = spy(rmClient)
val maxExecutors = 11

val (handler, _) = createAllocator(
maxExecutors,
rmClientSpy,
Map(
YARN_EXECUTOR_LAUNCH_EXCLUDE_ON_FAILURE_ENABLED.key -> "true",
MAX_FAILED_EXEC_PER_NODE.key -> "0"))
handler.updateResourceRequests()
val EXCLUDED_EXIT_CODE = 1
val excludedStatuses = Seq(ContainerExitStatus.INVALID, EXCLUDED_EXIT_CODE)

val hosts = (0 until maxExecutors).map(i => s"host$i")
val ids = 0 to maxExecutors
val containers = createContainers(hosts, ids)

val nonExcludedStatuses = Seq(
ContainerExitStatus.SUCCESS,
ContainerExitStatus.PREEMPTED,
ContainerExitStatus.KILLED_EXCEEDED_VMEM,
ContainerExitStatus.KILLED_EXCEEDED_PMEM,
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
ContainerExitStatus.KILLED_BY_APPMASTER,
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
ContainerExitStatus.ABORTED,
ContainerExitStatus.DISKS_FAILED)

val nonExcludedContainerStatuses = nonExcludedStatuses.zipWithIndex.map {
case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
}
val preStopExcludedContainerStatuses = excludedStatuses.zip(9 until 12).map {
case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
}

val EXCLUDED_EXIT_CODE = 1
val excludedStatuses = Seq(ContainerExitStatus.INVALID, EXCLUDED_EXIT_CODE)
val afterStopExcludedContainerStatuses = excludedStatuses.zip(12 until maxExecutors).map {
case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
}

val excludedContainerStatuses = excludedStatuses.zip(9 until maxExecutors).map {
case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus)
handler.handleAllocatedContainers(containers.slice(0, 9))
handler.processCompletedContainers(nonExcludedContainerStatuses)
verify(rmClientSpy, never())
.updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList())

handler.handleAllocatedContainers(containers.slice(9, 11))
handler.processCompletedContainers(preStopExcludedContainerStatuses)
verify(rmClientSpy)
.updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList())
verify(rmClientSpy)
.updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())

sc.stop()
handler.handleAllocatedContainers(containers.slice(11, 13))
handler.processCompletedContainers(afterStopExcludedContainerStatuses)
verify(rmClientSpy, never())
.updateBlacklist(hosts.slice(11, 12).asJava, Collections.emptyList())
verify(rmClientSpy, never())
.updateBlacklist(hosts.slice(12, 13).asJava, Collections.emptyList())
} finally {
sc.stop()
SparkContext.clearActiveContext()
}

handler.handleAllocatedContainers(containers.slice(0, 9))
handler.processCompletedContainers(nonExcludedContainerStatuses)
verify(rmClientSpy, never())
.updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList())

handler.handleAllocatedContainers(containers.slice(9, 11))
handler.processCompletedContainers(excludedContainerStatuses)
verify(rmClientSpy)
.updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList())
verify(rmClientSpy)
.updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
}

test("SPARK-28577#YarnAllocator.resource.memory should include offHeapSize " +
Expand Down