Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1471,8 +1471,10 @@ class DAGScheduler(
}

if (ableToCancelStages) {
job.listener.jobFailed(error)
// SPARK-15783 important to cleanup state first, just for tests where we have some asserts
// against the state. Otherwise we have a *little* bit of flakiness in the tests.
cleanupStateForJobAndIndependentStages(job)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark._
class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{

val badHost = "host-0"
val duration = Duration(10, SECONDS)

/**
* This backend just always fails if the task is executed on a bad host, but otherwise succeeds
Expand All @@ -39,13 +40,21 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
}
}

override def test(name: String, testTags: org.scalatest.Tag*)(body: => Unit): Unit = {
var lastThreads = Set[Long]()
(0 until 500).foreach { idx =>
super.test(s"$name: $idx", testTags: _*) {
body
}
}
}

// Test demonstrating the issue -- without a config change, the scheduler keeps scheduling
// according to locality preferences, and so the job fails
ignore("If preferred node is bad, without blacklist job will fail") {
testScheduler("If preferred node is bad, without blacklist job will fail") {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
}
assertDataStructuresEmpty(noFailure = false)
Expand All @@ -54,7 +63,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
// even with the blacklist turned on, if maxTaskFailures is not more than the number
// of executors on the bad node, then locality preferences will lead to us cycling through
// the executors on the bad node, and still failing the job
ignoreScheduler(
testScheduler(
"With blacklist on, job will still fail if there are too many bad executors on bad host",
extraConfs = Seq(
// just set this to something much longer than the test duration
Expand All @@ -64,15 +73,14 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(3, SECONDS)
Await.ready(jobFuture, duration)
}
assertDataStructuresEmpty(noFailure = false)
}

// Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually
// schedule on a good node and succeed the job
ignoreScheduler(
testScheduler(
"Bad node with multiple executors, job will still succeed with the right confs",
extraConfs = Seq(
// just set this to something much longer than the test duration
Expand All @@ -86,7 +94,6 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
results.clear()
securityMgr = new SecurityManager(conf)
broadcastManager = new BroadcastManager(true, conf, securityMgr)
mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true)
mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
override def sendTracker(message: Any): Unit = {
// no-op, just so we can stop this to avoid leaking threads
}
}
scheduler = new DAGScheduler(
sc,
taskScheduler,
Expand All @@ -228,6 +232,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def afterEach(): Unit = {
try {
scheduler.stop()
dagEventProcessLoopTester.stop()
mapOutputTracker.stop()
broadcastManager.stop()
} finally {
super.afterEach()
}
Expand Down Expand Up @@ -319,6 +326,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
runEvent(JobCancelled(jobId))
}

override def test(name: String, testTags: org.scalatest.Tag*)(body: => Unit): Unit = {
var lastThreads = Set[Long]()
(0 until 50).foreach { idx =>
super.test(s"$name: $idx", testTags: _*) {
body
}
}
}

test("[SPARK-3353] parent stage should have lower stage id") {
sparkListener.stageByOrderOfExecution.clear()
sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
Expand All @@ -31,7 +32,7 @@ import org.apache.spark._
import org.apache.spark.TaskState._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{CallSite, Utils}
import org.apache.spark.util.{CallSite, ThreadUtils, Utils}

/**
* Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets,
Expand Down Expand Up @@ -89,11 +90,6 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
}

// still a few races to work out in the blacklist tests, so ignore some tests
def ignoreScheduler(name: String, extraConfs: Seq[(String, String)])(testBody: => Unit): Unit = {
ignore(name)(testBody)
}

/**
* A map from partition -> results for all tasks of a job when you call this test framework's
* [[submit]] method. Two important considerations:
Expand Down Expand Up @@ -244,6 +240,17 @@ private[spark] abstract class MockBackend(
conf: SparkConf,
val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with Logging {

// Periodically revive offers to allow delay scheduling to work
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")

reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
reviveOffers()
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)

/**
* Test backends should call this to get a task that has been assigned to them by the scheduler.
* Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
Expand Down Expand Up @@ -309,7 +316,9 @@ private[spark] abstract class MockBackend(

override def start(): Unit = {}

override def stop(): Unit = {}
override def stop(): Unit = {
reviveThread.shutdown()
}

val env = SparkEnv.get

Expand All @@ -333,8 +342,9 @@ private[spark] abstract class MockBackend(
}

/**
* This is called by the scheduler whenever it has tasks it would like to schedule. It gets
* called in the scheduling thread, not the backend thread.
* This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks
* completes (which will be in a result-getter thread), and by the reviveOffers thread for delay
* scheduling.
*/
override def reviveOffers(): Unit = {
val offers: Seq[WorkerOffer] = generateOffers()
Expand Down