diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8052499ab752..4bb553f14f18 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1463,12 +1463,13 @@ object SparkContext extends Logging { // Regular expression for connection to Simr cluster val SIMR_REGEX = """simr://(.*)""".r - // When running locally, don't try to re-execute tasks on failure. + // When running locally, by default don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 master match { case "local" => - val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val maxTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES) + val scheduler = new TaskSchedulerImpl(sc, maxTaskFailures, isLocal = true) val backend = new LocalBackend(scheduler, 1) scheduler.initialize(backend) scheduler @@ -1477,7 +1478,8 @@ object SparkContext extends Logging { def localCpuCount = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt - val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val maxTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES) + val scheduler = new TaskSchedulerImpl(sc, maxTaskFailures, isLocal = true) val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) scheduler diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 67e3be21c3c9..83a1ea35813c 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -59,6 +59,22 @@ class SparkContextSchedulerCreationSuite } } + test("local-conf-failures") { + val defaultLocalMaxFailures = System.getProperty("spark.local.maxFailures") + System.setProperty("spark.local.maxFailures", "10") + val sched = createTaskScheduler("local") + assert(sched.maxTaskFailures === 10) + sched.backend match { + case s: LocalBackend => assert(s.totalCores === 1) + case _ => fail() + } + + Option(defaultLocalMaxFailures) match { + case Some(v) => System.setProperty("spark.local.maxFailures", v) + case _ => System.clearProperty("spark.local.maxFailures") + } + } + test("local-n") { val sched = createTaskScheduler("local[5]") assert(sched.maxTaskFailures === 1) @@ -68,6 +84,22 @@ class SparkContextSchedulerCreationSuite } } + test("local-n-conf-failures") { + val defaultLocalMaxFailures = System.getProperty("spark.local.maxFailures") + System.setProperty("spark.local.maxFailures", "10") + val sched = createTaskScheduler("local[5]") + assert(sched.maxTaskFailures === 10) + sched.backend match { + case s: LocalBackend => assert(s.totalCores === 5) + case _ => fail() + } + + Option(defaultLocalMaxFailures) match { + case Some(v) => System.setProperty("spark.local.maxFailures", v) + case _ => System.clearProperty("spark.local.maxFailures") + } + } + test("local-n-failures") { val sched = createTaskScheduler("local[4, 2]") assert(sched.maxTaskFailures === 2) diff --git a/docs/configuration.md b/docs/configuration.md index a70007c16544..88486aef18fb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -599,6 +599,15 @@ Apart from these, the following properties are also available, and may be useful Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1. + Does not apply to running Spark locally. + + + + spark.local.maxFailures + 1 + + Number of individual task failures before giving up on the job, when running Spark locally. + Should be greater than or equal to 1. Number of allowed retries = this value - 1.