Skip to content

Commit 38e4699

Browse files
jiangxb1987mengxr
authored andcommitted
[SPARK-24820][SPARK-24821][CORE] Fail fast when submitted job contains a barrier stage with unsupported RDD chain pattern
## What changes were proposed in this pull request? Check on job submit to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The following patterns are not supported: - Ancestor RDDs that have different number of partitions from the resulting RDD (eg. union()/coalesce()/first()/PartitionPruningRDD); - An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). ## How was this patch tested? Add test cases in `BarrierStageOnSubmittedSuite`. Author: Xingbo Jiang <[email protected]> Closes #21927 from jiangxb1987/SPARK-24820.
1 parent ad2e636 commit 38e4699

File tree

2 files changed

+207
-1
lines changed

2 files changed

+207
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
3939
import org.apache.spark.internal.config
4040
import org.apache.spark.network.util.JavaUtils
4141
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
42-
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
42+
import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData}
4343
import org.apache.spark.rpc.RpcTimeout
4444
import org.apache.spark.storage._
4545
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -340,6 +340,22 @@ class DAGScheduler(
340340
}
341341
}
342342

343+
/**
344+
* Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The
345+
* following patterns are not supported:
346+
* 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg.
347+
* union()/coalesce()/first()/take()/PartitionPruningRDD);
348+
* 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)).
349+
*/
350+
private def checkBarrierStageWithRDDChainPattern(rdd: RDD[_], numTasksInStage: Int): Unit = {
351+
val predicate: RDD[_] => Boolean = (r =>
352+
r.getNumPartitions == numTasksInStage && r.dependencies.filter(_.rdd.isBarrier()).size <= 1)
353+
if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) {
354+
throw new SparkException(
355+
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
356+
}
357+
}
358+
343359
/**
344360
* Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
345361
* previously run stage generated the same shuffle data, this function will copy the output
@@ -348,6 +364,7 @@ class DAGScheduler(
348364
*/
349365
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
350366
val rdd = shuffleDep.rdd
367+
checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
351368
val numTasks = rdd.partitions.length
352369
val parents = getOrCreateParentStages(rdd, jobId)
353370
val id = nextStageId.getAndIncrement()
@@ -376,6 +393,7 @@ class DAGScheduler(
376393
partitions: Array[Int],
377394
jobId: Int,
378395
callSite: CallSite): ResultStage = {
396+
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
379397
val parents = getOrCreateParentStages(rdd, jobId)
380398
val id = nextStageId.getAndIncrement()
381399
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
@@ -451,6 +469,32 @@ class DAGScheduler(
451469
parents
452470
}
453471

472+
/**
473+
* Traverses the given RDD and its ancestors within the same stage and checks whether all of the
474+
* RDDs satisfy a given predicate.
475+
*/
476+
private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => Boolean): Boolean = {
477+
val visited = new HashSet[RDD[_]]
478+
val waitingForVisit = new ArrayStack[RDD[_]]
479+
waitingForVisit.push(rdd)
480+
while (waitingForVisit.nonEmpty) {
481+
val toVisit = waitingForVisit.pop()
482+
if (!visited(toVisit)) {
483+
if (!predicate(toVisit)) {
484+
return false
485+
}
486+
visited += toVisit
487+
toVisit.dependencies.foreach {
488+
case _: ShuffleDependency[_, _, _] =>
489+
// Not within the same stage with current rdd, do nothing.
490+
case dependency =>
491+
waitingForVisit.push(dependency.rdd)
492+
}
493+
}
494+
}
495+
true
496+
}
497+
454498
private def getMissingParentStages(stage: Stage): List[Stage] = {
455499
val missing = new HashSet[Stage]
456500
val visited = new HashSet[RDD[_]]
@@ -1948,4 +1992,13 @@ private[spark] object DAGScheduler {
19481992

19491993
// Number of consecutive stage attempts allowed before a stage is aborted
19501994
val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
1995+
1996+
// Error message when running a barrier stage that have unsupported RDD chain pattern.
1997+
val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN =
1998+
"[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " +
1999+
"RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " +
2000+
"partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" +
2001+
"PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " +
2002+
"(scala) or barrierRdd.collect()[0] (python).\n" +
2003+
"2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))."
19512004
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import scala.concurrent.duration._
21+
import scala.language.postfixOps
22+
23+
import org.scalatest.BeforeAndAfterEach
24+
25+
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
26+
import org.apache.spark.scheduler.DAGScheduler
27+
import org.apache.spark.util.ThreadUtils
28+
29+
/**
30+
* This test suite covers all the cases that shall fail fast on job submitted that contains one
31+
* of more barrier stages.
32+
*/
33+
class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach
34+
with LocalSparkContext {
35+
36+
override def beforeEach(): Unit = {
37+
super.beforeEach()
38+
39+
val conf = new SparkConf()
40+
.setMaster("local[4]")
41+
.setAppName("test")
42+
sc = new SparkContext(conf)
43+
}
44+
45+
private def testSubmitJob(
46+
sc: SparkContext,
47+
rdd: RDD[Int],
48+
partitions: Option[Seq[Int]] = None,
49+
message: String): Unit = {
50+
val futureAction = sc.submitJob(
51+
rdd,
52+
(iter: Iterator[Int]) => iter.toArray,
53+
partitions.getOrElse(0 until rdd.partitions.length),
54+
{ case (_, _) => return }: (Int, Array[Int]) => Unit,
55+
{ return }
56+
)
57+
58+
val error = intercept[SparkException] {
59+
ThreadUtils.awaitResult(futureAction, 5 seconds)
60+
}.getCause.getMessage
61+
assert(error.contains(message))
62+
}
63+
64+
test("submit a barrier ResultStage that contains PartitionPruningRDD") {
65+
val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1)
66+
val rdd = prunedRdd
67+
.barrier()
68+
.mapPartitions((iter, context) => iter)
69+
testSubmitJob(sc, rdd,
70+
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
71+
}
72+
73+
test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") {
74+
val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1)
75+
val rdd = prunedRdd
76+
.barrier()
77+
.mapPartitions((iter, context) => iter)
78+
.repartition(2)
79+
.map(x => x + 1)
80+
testSubmitJob(sc, rdd,
81+
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
82+
}
83+
84+
test("submit a barrier stage that doesn't contain PartitionPruningRDD") {
85+
val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1)
86+
val rdd = prunedRdd
87+
.repartition(2)
88+
.barrier()
89+
.mapPartitions((iter, context) => iter)
90+
// Should be able to submit job and run successfully.
91+
val result = rdd.collect().sorted
92+
assert(result === Seq(6, 7, 8, 9, 10))
93+
}
94+
95+
test("submit a barrier stage with partial partitions") {
96+
val rdd = sc.parallelize(1 to 10, 4)
97+
.barrier()
98+
.mapPartitions((iter, context) => iter)
99+
testSubmitJob(sc, rdd, Some(Seq(1, 3)),
100+
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
101+
}
102+
103+
test("submit a barrier stage with union()") {
104+
val rdd1 = sc.parallelize(1 to 10, 2)
105+
.barrier()
106+
.mapPartitions((iter, context) => iter)
107+
val rdd2 = sc.parallelize(1 to 20, 2)
108+
val rdd3 = rdd1
109+
.union(rdd2)
110+
.map(x => x * 2)
111+
// Fail the job on submit because the barrier RDD (rdd1) may be not assigned Task 0.
112+
testSubmitJob(sc, rdd3,
113+
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
114+
}
115+
116+
test("submit a barrier stage with coalesce()") {
117+
val rdd = sc.parallelize(1 to 10, 4)
118+
.barrier()
119+
.mapPartitions((iter, context) => iter)
120+
.coalesce(1)
121+
// Fail the job on submit because the barrier RDD requires to run on 4 tasks, but the stage
122+
// only launches 1 task.
123+
testSubmitJob(sc, rdd,
124+
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
125+
}
126+
127+
test("submit a barrier stage that contains an RDD that depends on multiple barrier RDDs") {
128+
val rdd1 = sc.parallelize(1 to 10, 4)
129+
.barrier()
130+
.mapPartitions((iter, context) => iter)
131+
val rdd2 = sc.parallelize(11 to 20, 4)
132+
.barrier()
133+
.mapPartitions((iter, context) => iter)
134+
val rdd3 = rdd1
135+
.zip(rdd2)
136+
.map(x => x._1 + x._2)
137+
testSubmitJob(sc, rdd3,
138+
message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
139+
}
140+
141+
test("submit a barrier stage with zip()") {
142+
val rdd1 = sc.parallelize(1 to 10, 4)
143+
.barrier()
144+
.mapPartitions((iter, context) => iter)
145+
val rdd2 = sc.parallelize(11 to 20, 4)
146+
val rdd3 = rdd1
147+
.zip(rdd2)
148+
.map(x => x._1 + x._2)
149+
// Should be able to submit job and run successfully.
150+
val result = rdd3.collect().sorted
151+
assert(result === Seq(12, 14, 16, 18, 20, 22, 24, 26, 28, 30))
152+
}
153+
}

0 commit comments

Comments
 (0)