diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2e3e1cc9877d7..20a06c4199dec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import java.util.{Locale, Timer, TimerTask} +import java.util.{Timer, TimerTask} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong @@ -58,6 +58,11 @@ import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, ThreadUtils, Ut * scheduling * * task-result-getter threads * + * CAUTION: Any non fatal exception thrown within Spark RPC framework can be swallowed. + * Thus, throwing exception in methods like resourceOffers, statusUpdate won't fail + * the application, but could lead to undefined behavior. Instead, we shall use method like + * TaskSetManger.abort() to abort a stage and then fail the application (SPARK-31485). + * * Delay Scheduling: * Delay scheduling is an optimization that sacrifices job fairness for data locality in order to * improve cluster and workload throughput. One useful definition of "delay" is how much time @@ -401,9 +406,7 @@ private[spark] class TaskSchedulerImpl( // addresses are the same as that we allocated in taskResourceAssignments since it's // synchronized. We don't remove the exact addresses allocated because the current // approach produces the identical result with less time complexity. - availableResources(i).getOrElse(rName, - throw new SparkException(s"Try to acquire resource $rName that doesn't exist.")) - .remove(0, rInfo.addresses.size) + availableResources(i)(rName).remove(0, rInfo.addresses.size) } // Only update hosts for a barrier task. if (taskSet.isBarrier) { @@ -468,8 +471,9 @@ private[spark] class TaskSchedulerImpl( resourceProfileIds: Array[Int], availableCpus: Array[Int], availableResources: Array[Map[String, Buffer[String]]], - rpId: Int): Int = { - val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId) + taskSet: TaskSetManager): Int = { + val resourceProfile = sc.resourceProfileManager.resourceProfileFromId( + taskSet.taskSet.resourceProfileId) val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) => (id == resourceProfile.id) } @@ -484,9 +488,12 @@ private[spark] class TaskSchedulerImpl( numTasksPerExecCores } else { val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount) - .getOrElse(throw new SparkException("limitingResource returns from ResourceProfile" + - s" $resourceProfile doesn't actually contain that task resource!") - ) + .getOrElse { + val errorMsg = "limitingResource returns from ResourceProfile " + + s"$resourceProfile doesn't actually contain that task resource!" + taskSet.abort(errorMsg) + throw new SparkException(errorMsg) + } // available addresses already takes into account if there are fractional // task resource requests val availAddrs = availableResources(index).get(limitingResource).map(_.size).getOrElse(0) @@ -582,7 +589,7 @@ private[spark] class TaskSchedulerImpl( // value is -1 val numBarrierSlotsAvailable = if (taskSet.isBarrier) { val slots = calculateAvailableSlots(resourceProfileIds, availableCpus, availableResources, - taskSet.taskSet.resourceProfileId) + taskSet) slots } else { -1 @@ -675,11 +682,18 @@ private[spark] class TaskSchedulerImpl( // Check whether the barrier tasks are partially launched. // TODO SPARK-24818 handle the assert failure case (that can happen when some locality // requirements are not fulfilled, and we should revert the launched tasks). - require(addressesWithDescs.size == taskSet.numTasks, - s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + - s"because only ${addressesWithDescs.size} out of a total number of " + - s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " + - "been blacklisted or cannot fulfill task locality requirements.") + if (addressesWithDescs.size != taskSet.numTasks) { + val errorMsg = + s"Fail resource offers for barrier stage ${taskSet.stageId} because only " + + s"${addressesWithDescs.size} out of a total number of ${taskSet.numTasks}" + + s" tasks got resource offers. This happens because barrier execution currently " + + s"does not work gracefully with delay scheduling. We highly recommend you to " + + s"disable delay scheduling by setting spark.locality.wait=0 as a workaround if " + + s"you see this error frequently." + logWarning(errorMsg) + taskSet.abort(errorMsg) + throw new SparkException(errorMsg) + } // materialize the barrier coordinator. maybeInitBarrierCoordinator() @@ -741,8 +755,12 @@ private[spark] class TaskSchedulerImpl( if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, // where each executor corresponds to a single task, so mark the executor as failed. - val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( - "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) + val execId = taskIdToExecutorId.getOrElse(tid, { + val errorMsg = + "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)" + taskSet.abort(errorMsg) + throw new SparkException(errorMsg) + }) if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index bcf1fe2c2aa11..2242d2807652a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -26,11 +26,11 @@ import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { - def initLocalClusterSparkContext(): Unit = { + def initLocalClusterSparkContext(numWorker: Int = 4): Unit = { val conf = new SparkConf() // Init local cluster here so each barrier task runs in a separated process, thus `barrier()` // call is actually useful. - .setMaster("local-cluster[4, 1, 1024]") + .setMaster(s"local-cluster[$numWorker, 1, 1024]") .setAppName("test-cluster") .set(TEST_NO_STAGE_RETRY, true) sc = new SparkContext(conf) @@ -276,4 +276,20 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { initLocalClusterSparkContext() testBarrierTaskKilled(interruptOnKill = true) } + + test("SPARK-31485: barrier stage should fail if only partial tasks are launched") { + initLocalClusterSparkContext(2) + val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2) + val dep = new OneToOneDependency[Int](rdd0) + // set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for + // scheduling. So, one of tasks won't be scheduled in one round of resource offer. + val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"), Seq("executor_h_0"))) + val errorMsg = intercept[SparkException] { + rdd.barrier().mapPartitions { iter => + BarrierTaskContext.get().barrier() + iter + }.collect() + }.getMessage + assert(errorMsg.contains("Fail resource offers for barrier stage")) + } }