From 9a80fad6b2f5d73e206aacf72cd5e075fa8bcba0 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 22 May 2015 17:31:02 +0900 Subject: [PATCH 1/9] Suppress extra calling getCacheLocs. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5d812918a13d1..0eb3d9d906ae4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -386,13 +386,15 @@ class DAGScheduler( def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd - if (getCacheLocs(rdd).contains(Nil)) { + if (rdd.dependencies.size < 2 || getCacheLocs(rdd).contains(Nil)) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => - val mapStage = getShuffleMapStage(shufDep, stage.jobId) - if (!mapStage.isAvailable) { - missing += mapStage + if (getCacheLocs(rdd).contains(Nil)) { + val mapStage = getShuffleMapStage(shufDep, stage.jobId) + if (!mapStage.isAvailable) { + missing += mapStage + } } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) From a4d944adb874feb14d6c8c58ae6494f5d2bafbcf Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 23 May 2015 16:51:23 +0900 Subject: [PATCH 2/9] Add an unit test. --- .../apache/spark/scheduler/DAGScheduler.scala | 3 +- .../spark/scheduler/DAGSchedulerSuite.scala | 29 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0eb3d9d906ae4..3fc5fa3974c05 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -377,7 +377,8 @@ class DAGScheduler( parents } - private def getMissingParentStages(stage: Stage): List[Stage] = { + private[scheduler] + def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6a8ae29aae675..d3b45b03bc8df 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -342,6 +342,35 @@ class DAGSchedulerSuite assert(locs === Seq(Seq("hostA", "hostB"), Seq("hostB", "hostC"), Seq("hostC", "hostD"))) } + /** + * ┌───┐ shuffle ┌───┐ ┌───┐ + * │ A │< ─ ─ ─ ─│ B │< ─ │ C │< ─┐ + * └───┘ └───┘ └───┘ │ ┌───┐ + * ├──│ E │ + * ┌───┐ │ └───┘ + * │ D │< ─┘ + * └───┘ + * Here, E has one-to-one dependencies on C and D. C is derived from A by performing a shuffle + * and then a map. If we're trying to determine which ancestor stages need to be computed in + * order to compute E, we need to figure out whether the shuffle A -> B should be performed. + * If the RDD C, which has only one ancestor via a narrow dependency, is cached, then we won't + * need to compute A, even if it has some unavailable output partitions. The same goes for B: + * if B is 100% cached, then we can avoid the shuffle on A. + */ + test("SPARK-7826: regression test for getMissingParentStages") { + val rddA = new MyRDD(sc, 1, Nil) + val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, null))) + val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))) + val rddD = new MyRDD(sc, 1, Nil) + val rddE = new MyRDD(sc, 1, + List(new OneToOneDependency(rddC), new OneToOneDependency(rddD))) + cacheLocations(rddC.id -> 0) = + Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) + val jobId = submit(rddE, Array(0)) + val finalStage = scheduler.jobIdToActiveJob(jobId).finalStage + assert(scheduler.getMissingParentStages(finalStage).size === 0) + } + test("avoid exponential blowup when getting preferred locs list") { // Build up a complex dependency graph with repeated zip operations, without preferred locations var rdd: RDD[_] = new MyRDD(sc, 1, Nil) From 824838670c6375d3feed251287733e55e6f7bb44 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 23 May 2015 15:33:37 +0900 Subject: [PATCH 3/9] Revert "Suppress extra calling getCacheLocs." This reverts commit 9a80fad6b2f5d73e206aacf72cd5e075fa8bcba0. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3fc5fa3974c05..63e702964c3a3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -387,15 +387,13 @@ class DAGScheduler( def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd - if (rdd.dependencies.size < 2 || getCacheLocs(rdd).contains(Nil)) { + if (getCacheLocs(rdd).contains(Nil)) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => - if (getCacheLocs(rdd).contains(Nil)) { - val mapStage = getShuffleMapStage(shufDep, stage.jobId) - if (!mapStage.isAvailable) { - missing += mapStage - } + val mapStage = getShuffleMapStage(shufDep, stage.jobId) + if (!mapStage.isAvailable) { + missing += mapStage } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) From f87f2ecaa16c05249a49499511ed6d8c301b5e06 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 23 May 2015 15:35:26 +0900 Subject: [PATCH 4/9] Get cached locations from block manager only if the storage level of the RDD is not StorageLevel.NONE. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 4 ++++ .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 10 +++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 63e702964c3a3..e064ef58b1485 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -191,6 +191,10 @@ class DAGScheduler( private[scheduler] def getCacheLocs(rdd: RDD[_]): Seq[Seq[TaskLocation]] = cacheLocs.synchronized { + // Note: if the storage level is NONE, we don't need to get locations from block manager. + if (rdd.getStorageLevel == StorageLevel.NONE) + return Seq.fill(rdd.partitions.size)(Nil) + // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d3b45b03bc8df..c42e110a27ef2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -318,7 +318,7 @@ class DAGSchedulerSuite } test("cache location preferences w/ dependency") { - val baseRdd = new MyRDD(sc, 1, Nil) + val baseRdd = new MyRDD(sc, 1, Nil).cache() val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) cacheLocations(baseRdd.id -> 0) = Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) @@ -331,7 +331,7 @@ class DAGSchedulerSuite } test("regression test for getCacheLocs") { - val rdd = new MyRDD(sc, 3, Nil) + val rdd = new MyRDD(sc, 3, Nil).cache() cacheLocations(rdd.id -> 0) = Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) cacheLocations(rdd.id -> 1) = @@ -360,7 +360,7 @@ class DAGSchedulerSuite test("SPARK-7826: regression test for getMissingParentStages") { val rddA = new MyRDD(sc, 1, Nil) val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, null))) - val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))) + val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))).cache() val rddD = new MyRDD(sc, 1, Nil) val rddE = new MyRDD(sc, 1, List(new OneToOneDependency(rddC), new OneToOneDependency(rddD))) @@ -707,9 +707,9 @@ class DAGSchedulerSuite } test("cached post-shuffle") { - val shuffleOneRdd = new MyRDD(sc, 2, Nil) + val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache() val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) - val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)) + val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache() val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo)) submit(finalRdd, Array(0)) From b9c835c5c627faee42837b73845588b0bf6a9039 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 23 May 2015 15:40:07 +0900 Subject: [PATCH 5/9] Put the condition that checks if the RDD has uncached partition or not into variable for readability. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e064ef58b1485..776768ee66f48 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -391,7 +391,8 @@ class DAGScheduler( def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd - if (getCacheLocs(rdd).contains(Nil)) { + val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) + if (rddHasUncachedPartitions) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => From 6f3125ce48413e954f92f0cd126a27cdae6ccb80 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 23 May 2015 17:19:50 +0900 Subject: [PATCH 6/9] Fix scalastyle. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 14 +++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 776768ee66f48..bf173abe31e9b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -192,8 +192,9 @@ class DAGScheduler( private[scheduler] def getCacheLocs(rdd: RDD[_]): Seq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: if the storage level is NONE, we don't need to get locations from block manager. - if (rdd.getStorageLevel == StorageLevel.NONE) + if (rdd.getStorageLevel == StorageLevel.NONE) { return Seq.fill(rdd.partitions.size)(Nil) + } // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times if (!cacheLocs.contains(rdd.id)) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index c42e110a27ef2..e0adef86db1e3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -343,13 +343,13 @@ class DAGSchedulerSuite } /** - * ┌───┐ shuffle ┌───┐ ┌───┐ - * │ A │< ─ ─ ─ ─│ B │< ─ │ C │< ─┐ - * └───┘ └───┘ └───┘ │ ┌───┐ - * ├──│ E │ - * ┌───┐ │ └───┘ - * │ D │< ─┘ - * └───┘ + * +---+ shuffle +---+ +---+ + * | A |<--------| B |<---| C |<--+ + * +---+ +---+ +---+ | +---+ + * +--| E | + * +---+ | +---+ + * | D |<--+ + * +---+ * Here, E has one-to-one dependencies on C and D. C is derived from A by performing a shuffle * and then a map. If we're trying to determine which ancestor stages need to be computed in * order to compute E, we need to figure out whether the shuffle A -> B should be performed. From d858b59928cbc173971406737a672c33524487f2 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sun, 24 May 2015 10:02:43 +0900 Subject: [PATCH 7/9] Move the storageLevel check inside the if (!cacheLocs.contains(rdd.id)) block. --- .../apache/spark/scheduler/DAGScheduler.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bf173abe31e9b..06943749fdae5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -191,16 +191,17 @@ class DAGScheduler( private[scheduler] def getCacheLocs(rdd: RDD[_]): Seq[Seq[TaskLocation]] = cacheLocs.synchronized { - // Note: if the storage level is NONE, we don't need to get locations from block manager. - if (rdd.getStorageLevel == StorageLevel.NONE) { - return Seq.fill(rdd.partitions.size)(Nil) - } - // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times if (!cacheLocs.contains(rdd.id)) { - val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] - val locs: Seq[Seq[TaskLocation]] = blockManagerMaster.getLocations(blockIds).map { bms => - bms.map(bm => TaskLocation(bm.host, bm.executorId)) + // Note: if the storage level is NONE, we don't need to get locations from block manager. + val locs: Seq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) { + Seq.fill(rdd.partitions.size)(Nil) + } else { + val blockIds = + rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] + blockManagerMaster.getLocations(blockIds).map { bms => + bms.map(bm => TaskLocation(bm.host, bm.executorId)) + } } cacheLocs(rdd.id) = locs } From 10b1b22d710be6deed8cf0931bd22ba26b688118 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 27 May 2015 12:13:09 +0900 Subject: [PATCH 8/9] Simplify the unit test. --- .../apache/spark/scheduler/DAGScheduler.scala | 3 +-- .../spark/scheduler/DAGSchedulerSuite.scala | 26 +++++++------------ 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 06943749fdae5..0f6eeb4e38723 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -383,8 +383,7 @@ class DAGScheduler( parents } - private[scheduler] - def getMissingParentStages(stage: Stage): List[Stage] = { + private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e0adef86db1e3..ef76a2fc8d5ed 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -343,32 +343,26 @@ class DAGSchedulerSuite } /** - * +---+ shuffle +---+ +---+ - * | A |<--------| B |<---| C |<--+ - * +---+ +---+ +---+ | +---+ - * +--| E | - * +---+ | +---+ - * | D |<--+ - * +---+ - * Here, E has one-to-one dependencies on C and D. C is derived from A by performing a shuffle + * +---+ shuffle +---+ +---+ +---+ + * | A |<--------| B |<---| C |<---| D | + * +---+ +---+ +---+ +---+ + * Here, D has one-to-one dependencies on C. C is derived from A by performing a shuffle * and then a map. If we're trying to determine which ancestor stages need to be computed in - * order to compute E, we need to figure out whether the shuffle A -> B should be performed. + * order to compute D, we need to figure out whether the shuffle A -> B should be performed. * If the RDD C, which has only one ancestor via a narrow dependency, is cached, then we won't * need to compute A, even if it has some unavailable output partitions. The same goes for B: * if B is 100% cached, then we can avoid the shuffle on A. */ - test("SPARK-7826: regression test for getMissingParentStages") { + test("SPARK-7826: getMissingParentStages should consider all ancestor RDDs' cache statuses") { val rddA = new MyRDD(sc, 1, Nil) val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, null))) val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))).cache() - val rddD = new MyRDD(sc, 1, Nil) - val rddE = new MyRDD(sc, 1, - List(new OneToOneDependency(rddC), new OneToOneDependency(rddD))) + val rddD = new MyRDD(sc, 1, List(new OneToOneDependency(rddC))) cacheLocations(rddC.id -> 0) = Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) - val jobId = submit(rddE, Array(0)) - val finalStage = scheduler.jobIdToActiveJob(jobId).finalStage - assert(scheduler.getMissingParentStages(finalStage).size === 0) + submit(rddD, Array(0)) + assert(scheduler.runningStages.size === 1) + assert(scheduler.runningStages.head.id === 1) } test("avoid exponential blowup when getting preferred locs list") { From 3d4d036aecbc7ad3b0a75f2ad8de9ce866a3131e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 28 May 2015 09:10:00 +0900 Subject: [PATCH 9/9] Modify a test and the documentation. --- .../spark/scheduler/DAGSchedulerSuite.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ef76a2fc8d5ed..46642236e454a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -343,17 +343,19 @@ class DAGSchedulerSuite } /** + * This test ensures that if a particular RDD is cached, RDDs earlier in the dependency chain + * are not computed. It constructs the following chain of dependencies: * +---+ shuffle +---+ +---+ +---+ * | A |<--------| B |<---| C |<---| D | * +---+ +---+ +---+ +---+ - * Here, D has one-to-one dependencies on C. C is derived from A by performing a shuffle - * and then a map. If we're trying to determine which ancestor stages need to be computed in - * order to compute D, we need to figure out whether the shuffle A -> B should be performed. - * If the RDD C, which has only one ancestor via a narrow dependency, is cached, then we won't - * need to compute A, even if it has some unavailable output partitions. The same goes for B: - * if B is 100% cached, then we can avoid the shuffle on A. + * Here, B is derived from A by performing a shuffle, C has a one-to-one dependency on B, + * and D similarly has a one-to-one dependency on C. If none of the RDDs were cached, this + * set of RDDs would result in a two stage job: one ShuffleMapStage, and a ResultStage that + * reads the shuffled data from RDD A. This test ensures that if C is cached, the scheduler + * doesn't perform a shuffle, and instead computes the result using a single ResultStage + * that reads C's cached data. */ - test("SPARK-7826: getMissingParentStages should consider all ancestor RDDs' cache statuses") { + test("getMissingParentStages should consider all ancestor RDDs' cache statuses") { val rddA = new MyRDD(sc, 1, Nil) val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, null))) val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))).cache() @@ -362,7 +364,9 @@ class DAGSchedulerSuite Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) submit(rddD, Array(0)) assert(scheduler.runningStages.size === 1) - assert(scheduler.runningStages.head.id === 1) + // Make sure that the scheduler is running the final result stage. + // Because C is cached, the shuffle map stage to compute A does not need to be run. + assert(scheduler.runningStages.head.isInstanceOf[ResultStage]) } test("avoid exponential blowup when getting preferred locs list") {