From 6211b75346c40d344fb3de394ada7dd343db13cd Mon Sep 17 00:00:00 2001 From: hushan Date: Sat, 26 Sep 2015 21:12:39 +0800 Subject: [PATCH 1/4] Duplicate stage --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e01a9609b9a0d..1f74e9c9e9aed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -283,7 +283,9 @@ class DAGScheduler( case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => - shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + if (!shuffleToMapStage.contains(dep.shuffleId)) { + shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + } } // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) From a11cdd92dd2fd39843ad6e99c1477b733de382ab Mon Sep 17 00:00:00 2001 From: hushan Date: Fri, 13 Nov 2015 20:55:48 +0800 Subject: [PATCH 2/4] Change getAncestorShuffleDependencies return type to Set --- .../org/apache/spark/scheduler/DAGScheduler.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1f74e9c9e9aed..2111f7acb55f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map -import scala.collection.mutable.{HashMap, HashSet, Stack} +import scala.collection.mutable.{HashMap, HashSet, Set, Stack} import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -283,9 +283,7 @@ class DAGScheduler( case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => - if (!shuffleToMapStage.contains(dep.shuffleId)) { - shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) - } + shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) } // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) @@ -403,8 +401,8 @@ class DAGScheduler( } /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ - private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { - val parents = new Stack[ShuffleDependency[_, _, _]] + private def getAncestorShuffleDependencies(rdd: RDD[_]): Set[ShuffleDependency[_, _, _]] = { + val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting @@ -416,7 +414,7 @@ class DAGScheduler( dep match { case shufDep: ShuffleDependency[_, _, _] => if (!shuffleToMapStage.contains(shufDep.shuffleId)) { - parents.push(shufDep) + parents += shufDep } case _ => } From 66ae245afdc25b441cb067e14dcc3793aa39963d Mon Sep 17 00:00:00 2001 From: hushan Date: Mon, 16 Nov 2015 11:58:10 +0800 Subject: [PATCH 3/4] Return immutableSet --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2111f7acb55f0..3ccffb25ecbda 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -23,7 +23,8 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map -import scala.collection.mutable.{HashMap, HashSet, Set, Stack} +import scala.collection.immutable.Set +import scala.collection.mutable.{HashMap, HashSet, Stack} import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -427,7 +428,7 @@ class DAGScheduler( while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } - parents + parents.toSet } private def getMissingParentStages(stage: Stage): List[Stage] = { From 513d0d898b3334cf38f54c9d5a813b542e56f46b Mon Sep 17 00:00:00 2001 From: hushan Date: Tue, 26 Apr 2016 15:36:25 +0800 Subject: [PATCH 4/4] Refine --- .../apache/spark/scheduler/DAGScheduler.scala | 12 ++++++----- .../spark/scheduler/DAGSchedulerSuite.scala | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3ccffb25ecbda..3f5f2d87c95b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -284,7 +284,9 @@ class DAGScheduler( case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => - shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + if (!shuffleToMapStage.contains(dep.shuffleId)) { + shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + } } // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) @@ -402,8 +404,8 @@ class DAGScheduler( } /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ - private def getAncestorShuffleDependencies(rdd: RDD[_]): Set[ShuffleDependency[_, _, _]] = { - val parents = new HashSet[ShuffleDependency[_, _, _]] + private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { + val parents = new Stack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting @@ -415,7 +417,7 @@ class DAGScheduler( dep match { case shufDep: ShuffleDependency[_, _, _] => if (!shuffleToMapStage.contains(shufDep.shuffleId)) { - parents += shufDep + parents.push(shufDep) } case _ => } @@ -428,7 +430,7 @@ class DAGScheduler( while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } - parents.toSet + parents } private def getMissingParentStages(stage: Stage): List[Stage] = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 653d41fc053c9..3a9bf28b372d5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1901,6 +1901,26 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + test("Eliminate creating duplicate stage") { + val rdd2 = new MyRDD(sc, 2, Nil) + val rdd1 = new MyRDD(sc, 2, Nil) + val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(1)) + val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(1)) + val rdd3 = new MyRDD(sc, 1, List(dep1, dep2), tracker = mapOutputTracker) + val dep3 = new ShuffleDependency(rdd3, new HashPartitioner(2)) + val dep4 = new ShuffleDependency(rdd3, new HashPartitioner(2)) + val rdd4 = new MyRDD(sc, 2, List(dep3), tracker = mapOutputTracker) + val rdd5 = new MyRDD(sc, 2, List(dep4), tracker = mapOutputTracker) + val dep5 = new ShuffleDependency(rdd4, new HashPartitioner(1)) + val dep6 = new ShuffleDependency(rdd5, new HashPartitioner(1)) + val rdd6 = new MyRDD(sc, 1, List(dep5, dep6), tracker = mapOutputTracker) + val dep7 = new ShuffleDependency(rdd6, new HashPartitioner(2)) + val rdd7 = new MyRDD(sc, 2, List(dep7), tracker = mapOutputTracker) + submit(rdd7, Array(0, 1)) + assert(scheduler.stageIdToStage.size == 8) + assert(scheduler.shuffleToMapStage.size == 7) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID.