From 8e667f532fa4509386ff6a6173b75a8e24cab40a Mon Sep 17 00:00:00 2001 From: w00228970 Date: Fri, 23 Sep 2016 15:17:46 +0800 Subject: [PATCH 1/9] test case --- .../spark/scheduler/DAGSchedulerSuite.scala | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6787b302614e..99434a83d46b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,9 +18,12 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.Executors import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.DurationConversions import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -32,8 +35,9 @@ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.MetadataFetchFailedException +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} +import org.apache.spark.util._ class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) extends DAGSchedulerEventProcessLoop(dagScheduler) { @@ -2105,6 +2109,47 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) } + test("The failed stage never resubmitted due to abort stage in another thread") { + implicit val executorContext = ExecutionContext + .fromExecutorService(Executors.newFixedThreadPool(5)) + val f1 = Future { + try { + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() + val shuffleHandle = + rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle + rdd1.map { x => + if (x._1 == 1) { + throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + } + x._1 + }.count() + } catch { + case e: Throwable => + logInfo("expected abort stage: " + e.getMessage) + } + } + Thread.sleep(10000) + val f2 = Future { + try { + val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() + val shuffleHandle = + rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle + rdd2.map { x => + if (x._1 == 1) { + throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + } + x._1 + }.count() + } catch { + case e: Throwable => + println("expected abort stage2: " + e.getMessage) + } + } + + val duration = 60.seconds + ThreadUtils.awaitResult(f2, duration) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. From 2bfa05b172c68d6aa52e66359e83e2e7c6033662 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Fri, 23 Sep 2016 15:30:15 +0800 Subject: [PATCH 2/9] The failed stage never resubmitted --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dd47c1dbbec0..026140d33263 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1235,6 +1235,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) + var abortStage = false if (failedStage.latestInfo.attemptId != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + @@ -1261,6 +1262,7 @@ class DAGScheduler( s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: ${failureMessage}", None) + abortStage = true } else if (failedStages.isEmpty) { // Don't schedule an event to resubmit failed stages if failed isn't empty, because // in that case the event will already have been scheduled. @@ -1271,8 +1273,10 @@ class DAGScheduler( override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) } - failedStages += failedStage - failedStages += mapStage + if (!abortStage) { + failedStages += failedStage + failedStages += mapStage + } // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) From d02cf93586d40035f6bb1f5b635917fe41bf6e99 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Fri, 23 Sep 2016 16:02:57 +0800 Subject: [PATCH 3/9] test case --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 99434a83d46b..9cf46db1094e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2112,6 +2112,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou test("The failed stage never resubmitted due to abort stage in another thread") { implicit val executorContext = ExecutionContext .fromExecutorService(Executors.newFixedThreadPool(5)) + val duration = 60.seconds + val f1 = Future { try { val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() @@ -2125,10 +2127,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou }.count() } catch { case e: Throwable => - logInfo("expected abort stage: " + e.getMessage) + logInfo("expected abort stage1: " + e.getMessage) } } - Thread.sleep(10000) + ThreadUtils.awaitResult(f1, duration) val f2 = Future { try { val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() @@ -2142,11 +2144,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou }.count() } catch { case e: Throwable => - println("expected abort stage2: " + e.getMessage) + logInfo("expected abort stage2: " + e.getMessage) } } - - val duration = 60.seconds ThreadUtils.awaitResult(f2, duration) } From 7056cd6efc301e8730546c6dd8b24c7f3b56c55a Mon Sep 17 00:00:00 2001 From: w00228970 Date: Fri, 23 Sep 2016 17:44:34 +0800 Subject: [PATCH 4/9] fix style --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +++--- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 026140d33263..9cb16a75b166 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1235,7 +1235,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) - var abortStage = false + var abortedStage = false if (failedStage.latestInfo.attemptId != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + @@ -1262,7 +1262,7 @@ class DAGScheduler( s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: ${failureMessage}", None) - abortStage = true + abortedStage = true } else if (failedStages.isEmpty) { // Don't schedule an event to resubmit failed stages if failed isn't empty, because // in that case the event will already have been scheduled. @@ -1273,7 +1273,7 @@ class DAGScheduler( override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) } - if (!abortStage) { + if (!abortedStage) { failedStages += failedStage failedStages += mapStage } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9cf46db1094e..811822bd0ebe 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -34,8 +34,8 @@ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.util._ @@ -2121,7 +2121,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle rdd1.map { x => if (x._1 == 1) { - throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + throw new FetchFailedException( + BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") } x._1 }.count() @@ -2138,7 +2139,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle rdd2.map { x => if (x._1 == 1) { - throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + throw new FetchFailedException( + BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") } x._1 }.count() From 1f7bd88950881cea252316538084dd88883d8d1a Mon Sep 17 00:00:00 2001 From: w00228970 Date: Fri, 23 Sep 2016 19:33:28 +0800 Subject: [PATCH 5/9] test case imporvement --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 811822bd0ebe..d083a663612f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2149,7 +2149,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou logInfo("expected abort stage2: " + e.getMessage) } } - ThreadUtils.awaitResult(f2, duration) + try { + ThreadUtils.awaitResult(f2, duration) + } catch { + case e: Throwable => fail("The failed stage never resubmitted") + } + executorContext.shutdown() } /** From d92adfcd2c51090dd472a995b5853cd6396aeee7 Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 24 Sep 2016 00:03:33 +0800 Subject: [PATCH 6/9] comment fix --- .../spark/scheduler/DAGSchedulerSuite.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d083a663612f..62be4c03842d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2119,12 +2119,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() val shuffleHandle = rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle - rdd1.map { x => - if (x._1 == 1) { - throw new FetchFailedException( - BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") - } - x._1 + rdd1.map { + case (x, _) if (x == 1) => + throw new FetchFailedException( + BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + case (x, _) => x }.count() } catch { case e: Throwable => @@ -2137,12 +2136,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() val shuffleHandle = rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle - rdd2.map { x => - if (x._1 == 1) { + rdd2.map { + case (x, _) if (x == 1) => throw new FetchFailedException( BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") - } - x._1 + case (x, _) => x }.count() } catch { case e: Throwable => From 1127ca1538e9a9ded9e91ead65af8c710e99003d Mon Sep 17 00:00:00 2001 From: wangfei Date: Sat, 24 Sep 2016 07:03:22 +0800 Subject: [PATCH 7/9] address comments of zsxwing --- .../apache/spark/scheduler/DAGScheduler.scala | 3 +- .../spark/scheduler/DAGSchedulerSuite.scala | 35 ++++++------------- 2 files changed, 12 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9cb16a75b166..d85c16ee3dc6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1235,13 +1235,13 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) - var abortedStage = false if (failedStage.latestInfo.attemptId != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ID ${failedStage.latestInfo.attemptId}) running") } else { + var abortedStage = false // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is // possible the fetch failure has already been handled by the scheduler. @@ -1257,6 +1257,7 @@ class DAGScheduler( if (disallowStageRetryForTest) { abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None) + abortedStage = true } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { abortStage(failedStage, s"$failedStage (${failedStage.name}) " + s"has failed the maximum allowable number of " + diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 62be4c03842d..0e19f29ea278 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,12 +18,9 @@ package org.apache.spark.scheduler import java.util.Properties -import java.util.concurrent.Executors import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} -import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.DurationConversions import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -37,7 +34,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util._ +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) extends DAGSchedulerEventProcessLoop(dagScheduler) { @@ -2110,12 +2107,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } test("The failed stage never resubmitted due to abort stage in another thread") { - implicit val executorContext = ExecutionContext - .fromExecutorService(Executors.newFixedThreadPool(5)) - val duration = 60.seconds - - val f1 = Future { - try { + failAfter(60.seconds) { + val e = intercept[SparkException] { val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() val shuffleHandle = rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle @@ -2125,14 +2118,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") case (x, _) => x }.count() - } catch { - case e: Throwable => - logInfo("expected abort stage1: " + e.getMessage) } + assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } - ThreadUtils.awaitResult(f1, duration) - val f2 = Future { - try { + + // The following job that fails due to fetching failure will hang without + // the fix for SPARK-17644 + failAfter(60.seconds) { + val e = intercept[SparkException] { val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() val shuffleHandle = rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle @@ -2142,17 +2135,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") case (x, _) => x }.count() - } catch { - case e: Throwable => - logInfo("expected abort stage2: " + e.getMessage) } + assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } - try { - ThreadUtils.awaitResult(f2, duration) - } catch { - case e: Throwable => fail("The failed stage never resubmitted") - } - executorContext.shutdown() } /** From f91d86f92a7070b0e8ed63773ecf1020975bc2fb Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 28 Sep 2016 08:54:06 +0800 Subject: [PATCH 8/9] improve test --- .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++++++++++++------ 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0e19f29ea278..23ed4e67c643 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} @@ -2106,18 +2107,37 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) } - test("The failed stage never resubmitted due to abort stage in another thread") { + test("After one stage is aborted for too many failed attempts, subsequent stages" + + "still behave correctly on fetch failures") { + def fetchFailJob: Unit = { + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() + val shuffleHandle = + rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle + rdd1.map { + case (x, _) if (x == 1) => + throw new FetchFailedException( + BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + case (x, _) => x + }.count() + } + + def successJob: Unit = { + object FailThisAttempt { + val _fail = new AtomicBoolean(true) + } + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() + val shuffleHandle = + rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle + rdd1.map { + case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) => + throw new FetchFailedException( + BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") + } + } + failAfter(60.seconds) { val e = intercept[SparkException] { - val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() - val shuffleHandle = - rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle - rdd1.map { - case (x, _) if (x == 1) => - throw new FetchFailedException( - BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") - case (x, _) => x - }.count() + fetchFailJob } assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } @@ -2126,18 +2146,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // the fix for SPARK-17644 failAfter(60.seconds) { val e = intercept[SparkException] { - val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() - val shuffleHandle = - rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle - rdd2.map { - case (x, _) if (x == 1) => - throw new FetchFailedException( - BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") - case (x, _) => x - }.count() + fetchFailJob } assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } + + failAfter(60.seconds) { + try { + successJob + } catch { + case e: Throwable => fail("this job should success") + } + } } /** From 09077cb855d961c27a63c26468f03aadaf75316a Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 28 Sep 2016 16:22:50 +0800 Subject: [PATCH 9/9] comment fix --- .../apache/spark/scheduler/DAGScheduler.scala | 25 +++++++--------- .../spark/scheduler/DAGSchedulerSuite.scala | 29 ++++++++++--------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d85c16ee3dc6..fb0e53b2642d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1241,7 +1241,6 @@ class DAGScheduler( s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ID ${failedStage.latestInfo.attemptId}) running") } else { - var abortedStage = false // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is // possible the fetch failure has already been handled by the scheduler. @@ -1257,24 +1256,22 @@ class DAGScheduler( if (disallowStageRetryForTest) { abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None) - abortedStage = true } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { abortStage(failedStage, s"$failedStage (${failedStage.name}) " + s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: ${failureMessage}", None) - abortedStage = true - } else if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) - } - if (!abortedStage) { + } else { + if (failedStages.isEmpty) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. + // TODO: Cancel running tasks in the stage + logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + + s"$failedStage (${failedStage.name}) due to fetch failure") + messageScheduler.schedule(new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } failedStages += failedStage failedStages += mapStage } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 23ed4e67c643..bec95d13d193 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -32,8 +32,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.shuffle.MetadataFetchFailedException +import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} @@ -2107,9 +2106,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) } - test("After one stage is aborted for too many failed attempts, subsequent stages" + + test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" + "still behave correctly on fetch failures") { - def fetchFailJob: Unit = { + // Runs a job that always encounters a fetch failure, so should eventually be aborted + def runJobWithPersistentFetchFailure: Unit = { val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() val shuffleHandle = rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle @@ -2121,7 +2121,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou }.count() } - def successJob: Unit = { + // Runs a job that encounters a single fetch failure but succeeds on the second attempt + def runJobWithTemporaryFetchFailure: Unit = { object FailThisAttempt { val _fail = new AtomicBoolean(true) } @@ -2135,27 +2136,27 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } - failAfter(60.seconds) { + failAfter(10.seconds) { val e = intercept[SparkException] { - fetchFailJob + runJobWithPersistentFetchFailure } assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } - // The following job that fails due to fetching failure will hang without - // the fix for SPARK-17644 - failAfter(60.seconds) { + // Run a second job that will fail due to a fetch failure. + // This job will hang without the fix for SPARK-17644. + failAfter(10.seconds) { val e = intercept[SparkException] { - fetchFailJob + runJobWithPersistentFetchFailure } assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) } - failAfter(60.seconds) { + failAfter(10.seconds) { try { - successJob + runJobWithTemporaryFetchFailure } catch { - case e: Throwable => fail("this job should success") + case e: Throwable => fail("A job with one fetch failure should eventually succeed") } } }