Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ class DAGScheduler(
stage.clearFailures()
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}")
}

outputCommitCoordinator.stageEnd(stage.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[spark] class DirectTaskResult[T](

val numUpdates = in.readInt
if (numUpdates == 0) {
accumUpdates = null
accumUpdates = Seq()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little surprised that we were getting away with that, given the number of places that are using DirectTaskResult#accumUpdates without checking for null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, likewise. I guess in all real spark jobs, the number of accum updates is > 0, which is why this didn't particularly matter except for these mocks. OTOH if this has any real danger we should just fix this separately (and be sure to include it in 2.0)

} else {
val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]
for (i <- 0 until numUpdates) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.apache.spark.scheduler.DummyExternalClusterManager
org.apache.spark.scheduler.DummyExternalClusterManager
org.apache.spark.scheduler.MockExternalClusterManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you ever look at combining DummyExternalClusterManager and MockExternalClusterManager? They are just two variations on a fake ExternalClusterManager for use in tests. I realize that the focus of the tests for Dummy... and Mock... are different, so the two variations may not be easy or clean to combine, but if we could have just one fake ExternalClusterManager that still had a relatively clean implementation, I think that would be better than maintaining two. OTOH, if combining them gets messy, then just go with what you've already got.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not, mostly because I thought Dummy should be left as is, to keep ExternalClusterManagerSuite cleanly focused. It would be easy to combine them, though. Dummy isn't covering any cases that Mock isn't -- I could remove Dummy, and make ExternalClusterManagerSuite reference Mock, without making any changes to Mock.

Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler

import scala.concurrent.Await
import scala.concurrent.duration._

import org.apache.spark._

class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{

val badHost = "host-0"

/**
* This backend just always fails if the task is executed on a bad host, but otherwise succeeds
* all tasks.
*/
def badHostBackend(): Unit = {
val task = backend.beginTask()
val host = backend.executorIdToExecutor(task.executorId).host
if (host == badHost) {
backend.taskFailed(task, new RuntimeException("I'm a bad host!"))
} else {
backend.taskSuccess(task, 42)
}
}

// Test demonstrating the issue -- without a config change, the scheduler keeps scheduling
// according to locality preferences, and so the job fails
testScheduler("If preferred node is bad, without blacklist job will fail") {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
}
assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}

// even with the blacklist turned on, if maxTaskFailures is not more than the number
// of executors on the bad node, then locality preferences will lead to us cycling through
// the executors on the bad node, and still failing the job
testScheduler(
"With blacklist on, job will still fail if there are too many bad executors on bad host",
extraConfs = Seq(
// just set this to something much longer than the test duration
("spark.scheduler.executorTaskBlacklistTime", "10000000")
)
) {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(3, SECONDS)
Await.ready(jobFuture, duration)
}
assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}

// Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually
// schedule on a good node and succeed the job
testScheduler(
"Bad node with multiple executors, job will still succeed with the right confs",
extraConfs = Seq(
// just set this to something much longer than the test duration
("spark.scheduler.executorTaskBlacklistTime", "10000000"),
// this has to be higher than the number of executors on the bad host
("spark.task.maxFailures", "5"),
// just to avoid this test taking too long
("spark.locality.wait", "10ms")
)
) {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
assertDataStructuresEmpty(noFailure = true)
}

}

class MultiExecutorMockBackend(
conf: SparkConf,
taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) {

val nHosts = conf.getInt("spark.testing.nHosts", 5)
val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4)
val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2)

override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = {
(0 until nHosts).flatMap { hostIdx =>
val hostName = "host-" + hostIdx
(0 until nExecutorsPerHost).map { subIdx =>
val executorId = (hostIdx * nExecutorsPerHost + subIdx).toString
executorId ->
ExecutorTaskStatus(host = hostName, executorId = executorId, nCoresPerExecutor)
}
}.toMap
}

override def defaultParallelism(): Int = nHosts * nExecutorsPerHost * nCoresPerExecutor
}

class MockRDDWithLocalityPrefs(
sc: SparkContext,
numPartitions: Int,
shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps) {
override def getPreferredLocations(split: Partition): Seq[String] = {
Seq(preferredLoc)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class DAGSchedulerSuiteDummyException extends Exception

class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeouts {

import DAGSchedulerSuite._

val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
Expand Down Expand Up @@ -2027,12 +2029,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
}

private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))

private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)

private def assertDataStructuresEmpty(): Unit = {
assert(scheduler.activeJobs.isEmpty)
assert(scheduler.failedStages.isEmpty)
Expand Down Expand Up @@ -2072,5 +2068,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo)
}
}

object DAGSchedulerSuite {
def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))

def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,24 @@ class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
val conf = new SparkConf().setMaster("myclusterManager").
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
sc = new SparkContext(conf)
// check if the scheduler components are created
assert(sc.schedulerBackend.isInstanceOf[DummySchedulerBackend])
assert(sc.taskScheduler.isInstanceOf[DummyTaskScheduler])
// check if the scheduler components are created and initialized
sc.schedulerBackend match {
case dummy: DummySchedulerBackend => assert(dummy.initialized)
case other => fail(s"wrong scheduler backend: ${other}")
}
sc.taskScheduler match {
case dummy: DummyTaskScheduler => assert(dummy.initialized)
case other => fail(s"wrong task scheduler: ${other}")
}
}
}

/**
* Super basic ExternalClusterManager, just to verify ExternalClusterManagers can be configured.
*
* Note that if you want a special ClusterManager for tests, you are probably much more interested
* in [[MockExternalClusterManager]] and the corresponding [[SchedulerIntegrationSuite]]
*/
private class DummyExternalClusterManager extends ExternalClusterManager {

def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"
Expand All @@ -44,18 +56,23 @@ private class DummyExternalClusterManager extends ExternalClusterManager {
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = new DummySchedulerBackend()

def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {}
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[DummyTaskScheduler].initialized = true
backend.asInstanceOf[DummySchedulerBackend].initialized = true
}

}

private class DummySchedulerBackend extends SchedulerBackend {
var initialized = false
def start() {}
def stop() {}
def reviveOffers() {}
def defaultParallelism(): Int = 1
}

private class DummyTaskScheduler extends TaskScheduler {
var initialized = false
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start(): Unit = {}
Expand Down
Loading