From f87811d3916bdfcaccaa87b3c530aa319c272675 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 24 Aug 2015 15:45:05 -0500 Subject: [PATCH 1/8] wip on understanding skipped stages --- .../apache/spark/scheduler/DAGScheduler.scala | 31 +++++----- .../spark/scheduler/SkippedStageSuite.scala | 56 +++++++++++++++++++ 2 files changed, 73 insertions(+), 14 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index daf9b0f95273..5f0de2f30fab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -89,6 +89,7 @@ class DAGScheduler( private val nextStageId = new AtomicInteger(0) private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] + private[scheduler] val shuffleIdToShuffleMapStage = new TimeStampedHashMap[Int, ShuffleMapStage]() private[scheduler] val stageIdToStage = new HashMap[Int, Stage] private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] @@ -316,21 +317,23 @@ class DAGScheduler( firstJobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd val numTasks = rdd.partitions.length - val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) - if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { - val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) - val locs = MapOutputTracker.deserializeMapStatuses(serLocs) - for (i <- 0 until locs.length) { - stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing + shuffleIdToShuffleMapStage.getOrElseUpdate(shuffleDep.shuffleId, { + val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) + if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { + val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) + val locs = MapOutputTracker.deserializeMapStatuses(serLocs) + for (i <- 0 until locs.length) { + stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing + } + stage.numAvailableOutputs = locs.count(_ != null) + } else { + // Kind of ugly: need to register RDDs with the cache and map output tracker here + // since we can't do it in the RDD constructor because # of partitions is unknown + logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") + mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } - stage.numAvailableOutputs = locs.count(_ != null) - } else { - // Kind of ugly: need to register RDDs with the cache and map output tracker here - // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") - mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) - } - stage + stage + }) } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala new file mode 100644 index 000000000000..81a5c25e3414 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler + +import org.apache.spark.{SparkContext, LocalSparkContext, SparkFunSuite} + +class SkippedStageSuite extends SparkFunSuite with LocalSparkContext { + + class PrintStageSubmission extends SparkListener { + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + println("submitting stage " + stageSubmitted.stageInfo.stageId) + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + println("completed stage " + stageCompleted.stageInfo.stageId) + } + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + println("submitting job w/ stages: " + jobStart.stageIds) + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + println("completed job") + } + + + } + + test("skipped stages") { + sc = new SparkContext("local", "test") + sc.addSparkListener(new PrintStageSubmission) + val partitioner = new org.apache.spark.HashPartitioner(10) + val d3 = sc.parallelize(1 to 100).map { x => (x % 10) -> x}.partitionBy(partitioner) + (0 until 5).foreach { idx => + val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) -> x}.partitionBy(partitioner) + val joined = otherData.join(d3) + println("debug string: " + joined.toDebugString) + println(idx + " ---> " + joined.count()) + } + } + +} From 1fdee5238bb017406d55a5bb5ad9203d1b3d7985 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 24 Aug 2015 16:34:41 -0500 Subject: [PATCH 2/8] more wip --- .../org/apache/spark/scheduler/DAGScheduler.scala | 7 +++---- .../apache/spark/scheduler/SkippedStageSuite.scala | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5f0de2f30fab..62c61a3fffa0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -89,7 +89,6 @@ class DAGScheduler( private val nextStageId = new AtomicInteger(0) private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] - private[scheduler] val shuffleIdToShuffleMapStage = new TimeStampedHashMap[Int, ShuffleMapStage]() private[scheduler] val stageIdToStage = new HashMap[Int, Stage] private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] @@ -317,7 +316,7 @@ class DAGScheduler( firstJobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd val numTasks = rdd.partitions.length - shuffleIdToShuffleMapStage.getOrElseUpdate(shuffleDep.shuffleId, { +// shuffleIdToShuffleMapStage.getOrElseUpdate(shuffleDep.shuffleId, { val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) @@ -333,7 +332,7 @@ class DAGScheduler( mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } stage - }) +// }) } /** @@ -373,7 +372,7 @@ class DAGScheduler( val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) while (parentsWithNoMapStage.nonEmpty) { val currentShufDep = parentsWithNoMapStage.pop() - val stage = newOrUsedShuffleStage(currentShufDep, firstJobId) + val stage = getShuffleMapStage(currentShufDep, firstJobId) shuffleToMapStage(currentShufDep.shuffleId) = stage } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala index 81a5c25e3414..abae5224c6cd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala @@ -53,4 +53,18 @@ class SkippedStageSuite extends SparkFunSuite with LocalSparkContext { } } + + test("job shares long lineage w/ caching") { + sc = new SparkContext("local", "test") + sc.addSparkListener(new PrintStageSubmission) + val partitioner = new org.apache.spark.HashPartitioner(10) + + val d1 = sc.parallelize(1 to 100).map { x => (x % 10) -> x}.partitionBy(partitioner) + val d2 = d1.mapPartitions{itr => itr.map{ case(x,y) => x -> (y + 1)}}.partitionBy(partitioner) + val d3 = d2.mapPartitions{itr => itr.map{ case(x,y) => x -> (y + 1)}}.partitionBy(partitioner) + d3.cache() + d3.count() + d3.count() + } + } From eb87626d7d214a07518c87ab72b41f1c0a0a64fb Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 24 Aug 2015 17:25:36 -0500 Subject: [PATCH 3/8] dont remove shuffleMapStages on job completion, leave it for the context cleaner --- .../scala/org/apache/spark/SparkContext.scala | 4 ++- .../apache/spark/scheduler/DAGScheduler.scala | 26 +++++++++++++++---- .../spark/scheduler/DAGSchedulerSuite.scala | 1 - 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f3da04a7f55d..d0f52b9453b4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -552,7 +552,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { - Some(new ContextCleaner(this)) + val cleaner = new ContextCleaner(this) + cleaner.attachListener(dagScheduler) + Some(cleaner) } else { None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 62c61a3fffa0..bbc751447ce4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -68,7 +68,7 @@ class DAGScheduler( blockManagerMaster: BlockManagerMaster, env: SparkEnv, clock: Clock = new SystemClock()) - extends Logging { + extends Logging with CleanerListener { def this(sc: SparkContext, taskScheduler: TaskScheduler) = { this( @@ -246,14 +246,18 @@ class DAGScheduler( private def getShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { + logInfo(s"trying to get shuffle map stage for ${shuffleDep.shuffleId}") shuffleToMapStage.get(shuffleDep.shuffleId) match { - case Some(stage) => stage + case Some(stage) => + logInfo(s"for ${shuffleDep.shuffleId} found $stage") + stage case None => // We are going to register ancestor shuffle dependencies registerShuffleDependencies(shuffleDep, firstJobId) // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) shuffleToMapStage(shuffleDep.shuffleId) = stage + logInfo(s"setting shuffleToMapStage = $shuffleToMapStage") stage } @@ -297,6 +301,7 @@ class DAGScheduler( numTasks: Int, jobId: Int, callSite: CallSite): ResultStage = { + logInfo(s"getting parent stages and id for job $jobId with shuffleToMapStage = $shuffleToMapStage") val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite) @@ -485,9 +490,6 @@ class DAGScheduler( logDebug("Removing running stage %d".format(stageId)) runningStages -= stage } - for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) { - shuffleToMapStage.remove(k) - } if (waitingStages.contains(stage)) { logDebug("Removing stage %d from waiting set.".format(stageId)) waitingStages -= stage @@ -1438,6 +1440,20 @@ class DAGScheduler( taskScheduler.stop() } + /** + * Called by the context cleaner when a shuffle is removed + * @param shuffleId + */ + override def shuffleCleaned(shuffleId: Int): Unit = { + shuffleToMapStage.remove(shuffleId) + } + + // These are all called by the context cleaner but we don't need them + override def accumCleaned(accId: Long): Unit = {} + override def broadcastCleaned(broadcastId: Long): Unit = {} + override def checkpointCleaned(rddId: Long): Unit = {} + override def rddCleaned(rddId: Int): Unit = {} + // Start the event thread and register the metrics source at the end of the constructor env.metricsSystem.registerSource(metricsSource) eventProcessLoop.start() diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2e8688cf41d9..563adf79b6ba 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1014,7 +1014,6 @@ class DAGSchedulerSuite assert(scheduler.jobIdToStageIds.isEmpty) assert(scheduler.stageIdToStage.isEmpty) assert(scheduler.runningStages.isEmpty) - assert(scheduler.shuffleToMapStage.isEmpty) assert(scheduler.waitingStages.isEmpty) assert(scheduler.outputCommitCoordinator.isEmpty) } From dd18805bde36e42a063c6322e1d35b8e67b9742a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 24 Aug 2015 16:10:48 -0500 Subject: [PATCH 4/8] simple test w/ failure involving a shared dependency --- .../spark/scheduler/DAGSchedulerSuite.scala | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 563adf79b6ba..db4026ea8184 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -695,6 +695,126 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + + // Helper function to validate state when creating tests for task failures + def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { + assert(stageAttempt.stageId === stageId, + s": expected stage $stageId, got ${stageAttempt.stageId}") + assert(stageAttempt.stageAttemptId == attempt, + s": expected stage attempt $attempt, got ${stageAttempt.stageAttemptId}") + } + + def makeCompletions(stageAttempt: TaskSet, reduceParts: Int): Seq[(Success.type, MapStatus)] = { + stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, reduceParts)) + }.toSeq + } + + def setupStageAbortTest(sc: SparkContext) { + sc.listenerBus.addListener(new EndListener()) + ended = false + jobResult = null + } + + // Create a new Listener to confirm that the listenerBus sees the JobEnd message + // when we abort the stage. This message will also be consumed by the EventLoggingListener + // so this will propagate up to the user. + var ended = false + var jobResult : JobResult = null + + class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobResult = jobEnd.jobResult + ended = true + } + } + + // Helper functions to extract commonly used code in Fetch Failure test cases + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * succesfullly. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + * @param numShufflePartitions - The number of partitions in the next stage + */ + def completeNextShuffleMapSuccesfully( + stageId: Int, + attemptIdx: Int, + numShufflePartitions: Int): Unit = { + val stageAttempt = taskSets.last + checkStageId(stageId, attemptIdx, stageAttempt) + complete(stageAttempt, makeCompletions(stageAttempt, numShufflePartitions)) + } + + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * with all FetchFailure. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + * @param shuffleDep - The shuffle dependency of the stage with a fetch failure + */ + def completeNextStageWithFetchFailure( + stageId: Int, + attemptIdx: Int, + shuffleDep: ShuffleDependency[_, _, _]): Unit = { + val stageAttempt = taskSets.last + checkStageId(stageId, attemptIdx, stageAttempt) + + complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map{ case (task, idx) => + (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null) + }.toSeq) + } + + /** + * Common code to get the next result stage attempt, confirm it's the one we expect, and + * complete it with a success where we return 42. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + */ + def completeNextResultStageWithSuccess ( + stageId: Int, + attemptIdx: Int, + resultFunc: Int => Int = _ => 42): Unit = { + val stageAttempt = taskSets.last + checkStageId(stageId, attemptIdx, stageAttempt) + assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) + complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (_, idx) => + (Success, resultFunc(idx)) + }.toSeq) + } + + + + test("shuffle fetch failure in a reused shuffle dependency") { + // Run the first job successfully, which creates one shuffle dependency + + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduceRdd, Array(0, 1)) + + completeNextShuffleMapSuccesfully(0, 0, 2) + completeNextResultStageWithSuccess(1, 0) + assert(results === Map(0 -> 42, 1 -> 42)) + assertDataStructuresEmpty() + + // submit another job w/ the shared dependency, and have a fetch failure + val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduce2, Array(0,1)) + // Note that the numbering here is only b/c the shared dependency produces a new, skipped + // stage. If instead it reused the existing stage, then the numbering would be different + completeNextStageWithFetchFailure(3, 0, shuffleDep) + scheduler.resubmitFailedStages() + completeNextShuffleMapSuccesfully(2, 0, 2) // really the same as stage 0, but gets its own stage + completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) + assert(results === Map(0 -> 1234, 1 -> 1235)) + + assertDataStructuresEmpty() + } + /** * Makes sure that failures of stage used by multiple jobs are correctly handled. * From 6763a6fd126db5b27353b61ad7d276353988d3b4 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 25 Aug 2015 13:42:22 -0500 Subject: [PATCH 5/8] fixes for cleaning of multiple stages --- .../apache/spark/scheduler/DAGScheduler.scala | 12 +- .../spark/scheduler/DAGSchedulerSuite.scala | 109 ++++++++++++++++-- 2 files changed, 107 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bbc751447ce4..416d6acafa7e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -500,9 +500,12 @@ class DAGScheduler( } } // data structures based on StageId - stageIdToStage -= stageId - logDebug("After removal of stage %d, remaining stages = %d" - .format(stageId, stageIdToStage.size)) + // ShuffleMapStages aren't removed until the shuffle is cleaned + if (stage.isInstanceOf[ResultStage]) { + stageIdToStage -= stageId + logDebug("After removal of stage %d, remaining stages = %d" + .format(stageId, stageIdToStage.size)) + } } jobSet -= job.jobId @@ -1445,7 +1448,8 @@ class DAGScheduler( * @param shuffleId */ override def shuffleCleaned(shuffleId: Int): Unit = { - shuffleToMapStage.remove(shuffleId) + val stageOpt = shuffleToMapStage.remove(shuffleId) + stageOpt.foreach { stage => stageIdToStage -= stage.id} } // These are all called by the context cleaner but we don't need them diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index db4026ea8184..403750ea7e8c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -163,7 +164,7 @@ class DAGSchedulerSuite cancelledStages.clear() cacheLocations.clear() results.clear() - mapOutputTracker = new MapOutputTrackerMaster(conf) + mapOutputTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] scheduler = new DAGScheduler( sc, taskScheduler, @@ -171,6 +172,9 @@ class DAGSchedulerSuite mapOutputTracker, blockManagerMaster, sc.env) + // this is normally done in the SparkContext creation, but since we are ignoring the + // scheduler in the SparkContext and creating our own, need to re-register here + sc.cleaner.foreach{_.attachListener(scheduler)} dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) } @@ -699,9 +703,9 @@ class DAGSchedulerSuite // Helper function to validate state when creating tests for task failures def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { assert(stageAttempt.stageId === stageId, - s": expected stage $stageId, got ${stageAttempt.stageId}") + s": expected stage $stageId, instead was ${stageAttempt.stageId}") assert(stageAttempt.stageAttemptId == attempt, - s": expected stage attempt $attempt, got ${stageAttempt.stageAttemptId}") + s": expected stage attempt $attempt, instead was ${stageAttempt.stageAttemptId}") } def makeCompletions(stageAttempt: TaskSet, reduceParts: Int): Seq[(Success.type, MapStatus)] = { @@ -791,10 +795,20 @@ class DAGSchedulerSuite test("shuffle fetch failure in a reused shuffle dependency") { // Run the first job successfully, which creates one shuffle dependency + val jobIdToStageIds = new mutable.HashMap[Int, Set[Int]]() + val listener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobIdToStageIds(jobStart.jobId) = jobStart.stageIds.toSet + } + } + sc.addSparkListener(listener) + val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) + sc.listenerBus.waitUntilEmpty(1000) + assert(jobIdToStageIds(0) === Set(0, 1)) completeNextShuffleMapSuccesfully(0, 0, 2) completeNextResultStageWithSuccess(1, 0) @@ -803,16 +817,84 @@ class DAGSchedulerSuite // submit another job w/ the shared dependency, and have a fetch failure val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) - submit(reduce2, Array(0,1)) - // Note that the numbering here is only b/c the shared dependency produces a new, skipped - // stage. If instead it reused the existing stage, then the numbering would be different - completeNextStageWithFetchFailure(3, 0, shuffleDep) + submit(reduce2, Array(0, 1)) + sc.listenerBus.waitUntilEmpty(1000) + assert(jobIdToStageIds(1) === Set(0, 2)) + completeNextStageWithFetchFailure(2, 0, shuffleDep) scheduler.resubmitFailedStages() - completeNextShuffleMapSuccesfully(2, 0, 2) // really the same as stage 0, but gets its own stage - completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) + logInfo("taskSets after fetch failure = " + taskSets) + completeNextShuffleMapSuccesfully(0, 1, 2) + logInfo("taskSets after recomplete stage 0 = " + taskSets) + completeNextResultStageWithSuccess(2, 1, idx => idx + 1234) assert(results === Map(0 -> 1234, 1 -> 1235)) assertDataStructuresEmpty() + emptyAfterContextCleaner() + } + + + test("reused dependency with long lineage", ActiveTag) { + val jobIdToStageIds = new mutable.HashMap[Int, Set[Int]]() + val listener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobIdToStageIds(jobStart.jobId) = jobStart.stageIds.toSet + } + } + sc.addSparkListener(listener) + + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep1 = new ShuffleDependency(shuffleMapRdd, null) + val reduceRdd = new MyRDD(sc, 4, List(shuffleDep1)) + val shuffleDep2 = new ShuffleDependency(reduceRdd, null) + val reduceRdd2 = new MyRDD(sc, 6, List(shuffleDep2)) + val shuffleDep3 = new ShuffleDependency(reduceRdd2, null) + val reduceRdd3 = new MyRDD(sc, 8, List(shuffleDep3)) + submit(reduceRdd3, (0 until 8).toArray) + sc.listenerBus.waitUntilEmpty(1000) + assert(jobIdToStageIds(0) === Set(0, 1, 2, 3)) + + completeNextShuffleMapSuccesfully(0, 0, 4) + completeNextShuffleMapSuccesfully(1, 0, 6) + completeNextShuffleMapSuccesfully(2, 0, 8) + completeNextResultStageWithSuccess(3, 0) + assert(results === (0 until 8).map{_ -> 42}.toMap) + results.clear() + assertDataStructuresEmpty() + + // submit another job w/ the shared dependency, and have a fetch failure + val reduce4 = new MyRDD(sc, 2, List(shuffleDep3)) + submit(reduce4, Array(0, 1)) + sc.listenerBus.waitUntilEmpty(1000) + assert(jobIdToStageIds(1) === Set(0, 1, 2, 4)) + completeNextStageWithFetchFailure(4, 0, shuffleDep3) + scheduler.resubmitFailedStages() + completeNextShuffleMapSuccesfully(0, 1, 4) + completeNextShuffleMapSuccesfully(1, 1, 6) + completeNextShuffleMapSuccesfully(2, 1, 8) + completeNextResultStageWithSuccess(4, 1, idx => idx + 1234) + assert(results === Map(0 -> 1234, 1 -> 1235)) + results.clear() + assertDataStructuresEmpty() + emptyAfterContextCleaner() + + // now try submitting again, after we've cleaned out the shuffle data. Should be fine, + // we just need to rerun everything + + val reduce5 = new MyRDD(sc, 8, List(shuffleDep3)) + submit(reduce5, (0 until 8).toArray) + sc.listenerBus.waitUntilEmpty(1000) + assert(jobIdToStageIds(2) === Set(5, 6, 7, 8)) // new stages this time + completeNextShuffleMapSuccesfully(5, 0, 4) + completeNextShuffleMapSuccesfully(6, 0, 6) + println(taskSets) + completeNextShuffleMapSuccesfully(7, 0, 8) + println(taskSets) + logInfo("tasksets = " + taskSets) + completeNextResultStageWithSuccess(8, 0, idx => idx + 4321) + assert(results === (0 until 8).map{idx => idx -> (idx + 4321)}.toMap) + + assertDataStructuresEmpty() + emptyAfterContextCleaner() } /** @@ -1132,12 +1214,19 @@ class DAGSchedulerSuite assert(scheduler.failedStages.isEmpty) assert(scheduler.jobIdToActiveJob.isEmpty) assert(scheduler.jobIdToStageIds.isEmpty) - assert(scheduler.stageIdToStage.isEmpty) assert(scheduler.runningStages.isEmpty) assert(scheduler.waitingStages.isEmpty) assert(scheduler.outputCommitCoordinator.isEmpty) } + private def emptyAfterContextCleaner(): Unit = { + scheduler.shuffleToMapStage.foreach { case (shuffleId, _) => + sc.cleaner.get.doCleanupShuffle(shuffleId, blocking=true) + } + assert(scheduler.stageIdToStage.isEmpty) + assert(scheduler.shuffleToMapStage.isEmpty) + } + // Nothing in this test should break if the task info's fields are null, but // OutputCommitCoordinator requires the task info itself to not be null. private def createFakeTaskInfo(): TaskInfo = { From 5123b4d1cc56f4707abeb11467819d3a1492ebc2 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 25 Aug 2015 13:47:59 -0500 Subject: [PATCH 6/8] cleanup --- .../apache/spark/scheduler/DAGScheduler.scala | 37 ++++------ .../spark/scheduler/DAGSchedulerSuite.scala | 15 ++-- .../spark/scheduler/SkippedStageSuite.scala | 70 ------------------- 3 files changed, 20 insertions(+), 102 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 416d6acafa7e..6c06a6a39374 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -246,18 +246,14 @@ class DAGScheduler( private def getShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { - logInfo(s"trying to get shuffle map stage for ${shuffleDep.shuffleId}") shuffleToMapStage.get(shuffleDep.shuffleId) match { - case Some(stage) => - logInfo(s"for ${shuffleDep.shuffleId} found $stage") - stage + case Some(stage) => stage case None => // We are going to register ancestor shuffle dependencies registerShuffleDependencies(shuffleDep, firstJobId) // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) shuffleToMapStage(shuffleDep.shuffleId) = stage - logInfo(s"setting shuffleToMapStage = $shuffleToMapStage") stage } @@ -301,7 +297,6 @@ class DAGScheduler( numTasks: Int, jobId: Int, callSite: CallSite): ResultStage = { - logInfo(s"getting parent stages and id for job $jobId with shuffleToMapStage = $shuffleToMapStage") val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite) @@ -321,23 +316,21 @@ class DAGScheduler( firstJobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd val numTasks = rdd.partitions.length -// shuffleIdToShuffleMapStage.getOrElseUpdate(shuffleDep.shuffleId, { - val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) - if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { - val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) - val locs = MapOutputTracker.deserializeMapStatuses(serLocs) - for (i <- 0 until locs.length) { - stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing - } - stage.numAvailableOutputs = locs.count(_ != null) - } else { - // Kind of ugly: need to register RDDs with the cache and map output tracker here - // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") - mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) + val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) + if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { + val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) + val locs = MapOutputTracker.deserializeMapStatuses(serLocs) + for (i <- 0 until locs.length) { + stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing } - stage -// }) + stage.numAvailableOutputs = locs.count(_ != null) + } else { + // Kind of ugly: need to register RDDs with the cache and map output tracker here + // since we can't do it in the RDD constructor because # of partitions is unknown + logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") + mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) + } + stage } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 403750ea7e8c..f7cfd9f59eb9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -822,18 +822,16 @@ class DAGSchedulerSuite assert(jobIdToStageIds(1) === Set(0, 2)) completeNextStageWithFetchFailure(2, 0, shuffleDep) scheduler.resubmitFailedStages() - logInfo("taskSets after fetch failure = " + taskSets) completeNextShuffleMapSuccesfully(0, 1, 2) - logInfo("taskSets after recomplete stage 0 = " + taskSets) completeNextResultStageWithSuccess(2, 1, idx => idx + 1234) assert(results === Map(0 -> 1234, 1 -> 1235)) assertDataStructuresEmpty() - emptyAfterContextCleaner() + assertEmptyAfterContextCleaner() } - test("reused dependency with long lineage", ActiveTag) { + test("reused dependency with long lineage") { val jobIdToStageIds = new mutable.HashMap[Int, Set[Int]]() val listener = new SparkListener { override def onJobStart(jobStart: SparkListenerJobStart): Unit = { @@ -875,7 +873,7 @@ class DAGSchedulerSuite assert(results === Map(0 -> 1234, 1 -> 1235)) results.clear() assertDataStructuresEmpty() - emptyAfterContextCleaner() + assertEmptyAfterContextCleaner() // now try submitting again, after we've cleaned out the shuffle data. Should be fine, // we just need to rerun everything @@ -886,15 +884,12 @@ class DAGSchedulerSuite assert(jobIdToStageIds(2) === Set(5, 6, 7, 8)) // new stages this time completeNextShuffleMapSuccesfully(5, 0, 4) completeNextShuffleMapSuccesfully(6, 0, 6) - println(taskSets) completeNextShuffleMapSuccesfully(7, 0, 8) - println(taskSets) - logInfo("tasksets = " + taskSets) completeNextResultStageWithSuccess(8, 0, idx => idx + 4321) assert(results === (0 until 8).map{idx => idx -> (idx + 4321)}.toMap) assertDataStructuresEmpty() - emptyAfterContextCleaner() + assertEmptyAfterContextCleaner() } /** @@ -1219,7 +1214,7 @@ class DAGSchedulerSuite assert(scheduler.outputCommitCoordinator.isEmpty) } - private def emptyAfterContextCleaner(): Unit = { + private def assertEmptyAfterContextCleaner(): Unit = { scheduler.shuffleToMapStage.foreach { case (shuffleId, _) => sc.cleaner.get.doCleanupShuffle(shuffleId, blocking=true) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala deleted file mode 100644 index abae5224c6cd..000000000000 --- a/core/src/test/scala/org/apache/spark/scheduler/SkippedStageSuite.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.scheduler - -import org.apache.spark.{SparkContext, LocalSparkContext, SparkFunSuite} - -class SkippedStageSuite extends SparkFunSuite with LocalSparkContext { - - class PrintStageSubmission extends SparkListener { - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { - println("submitting stage " + stageSubmitted.stageInfo.stageId) - } - - override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { - println("completed stage " + stageCompleted.stageInfo.stageId) - } - - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { - println("submitting job w/ stages: " + jobStart.stageIds) - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { - println("completed job") - } - - - } - - test("skipped stages") { - sc = new SparkContext("local", "test") - sc.addSparkListener(new PrintStageSubmission) - val partitioner = new org.apache.spark.HashPartitioner(10) - val d3 = sc.parallelize(1 to 100).map { x => (x % 10) -> x}.partitionBy(partitioner) - (0 until 5).foreach { idx => - val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) -> x}.partitionBy(partitioner) - val joined = otherData.join(d3) - println("debug string: " + joined.toDebugString) - println(idx + " ---> " + joined.count()) - } - } - - - test("job shares long lineage w/ caching") { - sc = new SparkContext("local", "test") - sc.addSparkListener(new PrintStageSubmission) - val partitioner = new org.apache.spark.HashPartitioner(10) - - val d1 = sc.parallelize(1 to 100).map { x => (x % 10) -> x}.partitionBy(partitioner) - val d2 = d1.mapPartitions{itr => itr.map{ case(x,y) => x -> (y + 1)}}.partitionBy(partitioner) - val d3 = d2.mapPartitions{itr => itr.map{ case(x,y) => x -> (y + 1)}}.partitionBy(partitioner) - d3.cache() - d3.count() - d3.count() - } - -} From 830e0c896be4102f5bbf7a7c646c920093f03df4 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 25 Aug 2015 14:08:16 -0500 Subject: [PATCH 7/8] revert unnecessary change --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6c06a6a39374..dc9fe3a371f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -370,7 +370,7 @@ class DAGScheduler( val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) while (parentsWithNoMapStage.nonEmpty) { val currentShufDep = parentsWithNoMapStage.pop() - val stage = getShuffleMapStage(currentShufDep, firstJobId) + val stage = newOrUsedShuffleStage(currentShufDep, firstJobId) shuffleToMapStage(currentShufDep.shuffleId) = stage } } From 9c86e4718066a8020e2d0fc6560d0850d4cc81ae Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 26 Aug 2015 09:37:42 -0500 Subject: [PATCH 8/8] style --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f7cfd9f59eb9..d5ee24414073 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1216,7 +1216,7 @@ class DAGSchedulerSuite private def assertEmptyAfterContextCleaner(): Unit = { scheduler.shuffleToMapStage.foreach { case (shuffleId, _) => - sc.cleaner.get.doCleanupShuffle(shuffleId, blocking=true) + sc.cleaner.get.doCleanupShuffle(shuffleId, blocking = true) } assert(scheduler.stageIdToStage.isEmpty) assert(scheduler.shuffleToMapStage.isEmpty)