-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24820][SPARK-24821][Core] Fail fast when submitted job contains a barrier stage with unsupported RDD chain pattern #21927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@jiangxb1987, thanks! I am a bot who has found some folks who might be able to help with the review:@squito, @mateiz and @rxin |
|
Test build #93820 has finished for PR 21927 at commit
|
|
retest this please |
|
Test build #93827 has finished for PR 21927 at commit
|
mengxr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made one pass.
| } | ||
|
|
||
| /** | ||
| * Traverse all the parent RDDs within the same stage with the given RDD, check whether all the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also checks the RDD itself. "Traverses the given RDD and its ancestors within the same stage and checks whether all of the RDDs satisfy a given predicate."
| ) | ||
|
|
||
| val error = intercept[SparkException] { | ||
| ThreadUtils.awaitResult(futureAction, 1 seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make the timeout slightly larger like 5 seconds to buffer unexpected pause/slow down.
| val conf = new SparkConf() | ||
| .setMaster("local[4]") | ||
| .setAppName("test") | ||
| sc = new SparkContext(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LocalSparkContext already provides an active context.
| } | ||
| visited += toVisit | ||
| toVisit.dependencies.foreach { | ||
| case shuffleDep: ShuffleDependency[_, _, _] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: shuffleDep is not used. You can use _.
| class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext { | ||
|
|
||
| private def testSubmitJob(sc: SparkContext, rdd: RDD[Int], message: String): Unit = { | ||
| val futureAction = sc.submitJob( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it the same as the following?
private def testSubmitJob(rdd, message): Unit = {
val err = intercept[SparkException] {
rdd.count()
}.getCause.getMessage
assert(err.contain(message)
}Okay to keep the current version if we do want to ensure this is from submitJob().
| private def checkBarrierStageWithPartitionPruningRDD(rdd: RDD[_]): Unit = { | ||
| if (rdd.isBarrier() && | ||
| !traverseParentRDDsWithinStage(rdd, (r => !r.isInstanceOf[PartitionPruningRDD[_]]))) { | ||
| throw new SparkException("Don't support run a barrier stage that contains " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Since the error message is used in the test, it would be nice to make it a package private constant and use it in the test.
- "Barrier execution mode does not support partition pruning (PartitionPruningRDD)." should be sufficient.
| .barrier() | ||
| .mapPartitions((iter, context) => iter) | ||
| testSubmitJob(sc, rdd, | ||
| "Don't support run a barrier stage that contains PartitionPruningRDD") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. Define the message as a constant.
|
@jiangxb1987 Second thought: Btw, we should provide more info to users in the error message. For example, user might use "first()" without understanding "partition pruning". cc: @gatorsmile |
yeah thats a good point, but what about |
| * Check to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The | ||
| * following patterns are not supported: | ||
| * 1. Ancestor RDDs that have different number of partitions from the resulting RDD (eg. | ||
| * union()/coalesce()/first()/PartitionPruningRDD); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but coalesce should be OK, right? Is it just too fragile to allow coalesce while excluding the others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coalesce() is not safe when shuffle is false because it may cause the number of tasks doesn't match the number of partitions for the RDD that uses barrier mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I see that it'll be a different number of partitions, but conceptually it should be OK, right? the user just wants all tasks launched together, even if its a different number of tasks than the number of partitions in the original barrier rdd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but anyway, I guess its also fine to not support this case, I was just trying to understand myself.
|
Test build #93880 has finished for PR 21927 at commit
|
mengxr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except some minor inline comments.
| * union()/coalesce()/first()/PartitionPruningRDD); | ||
| * 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)). | ||
| */ | ||
| private def checkBarrierStageWithRDDChainPattern(rdd: RDD[_], numPartitions: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to rename numPartitions to numTasksInStage (or a better name).
| val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN = | ||
| "[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " + | ||
| "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " + | ||
| "partitions from the resulting RDD (eg. union()/coalesce()/first()/PartitionPruningRDD);\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also list take(). It would be nice to provide a workaround for first() and take(): barrierRdd.collect().head (scala), barrierRdd.collect()[0] (python)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collect() is expensive though?
|
Test build #93963 has finished for PR 21927 at commit
|
|
retest this please |
|
Test build #93970 has finished for PR 21927 at commit
|
|
LGTM. Merged into master. Thanks! |
What changes were proposed in this pull request?
Check on job submit to make sure we don't launch a barrier stage with unsupported RDD chain pattern. The following patterns are not supported:
How was this patch tested?
Add test cases in
BarrierStageOnSubmittedSuite.