Skip to content

Conversation

@jiangxb1987
Copy link
Contributor

@jiangxb1987 jiangxb1987 commented Jul 2, 2018

What changes were proposed in this pull request?

The RDD repartition uses a round-robin way to distribute data, thus there may be data correctness issue if only a sub-set of partitions are recomputed on fetch failure and the input data sequence is not deterministic.

The RDD data type may be not sortable, so we cannot resolve the whole issue by insert a local sort before shuffle (while we shall still provide that solution as a optional choice for those RDDs with sortable data type). The approach proposed in this PR is to always recompute all the partitions before shuffle on fetch failure, thus we don't rely on certain input data sequence.

Please note that with the feather on you may observe a higher risk of job falling due to reach max consequence stage failure limit, esp. for large jobs running on a big cluster.

How was this patch tested?

TBD

  • Add unit tests.
  • Integration test on my own cluster.
  • Benchmark to ensure no performance regression when fetch failure doesn't happen.

@jiangxb1987 jiangxb1987 changed the title [SPARK-23243] Fix RDD.repartition() data correctness issue [SPARK-23243][Core] Fix RDD.repartition() data correctness issue Jul 2, 2018
@SparkQA
Copy link

SparkQA commented Jul 2, 2018

Test build #92526 has finished for PR 21698 at commit b14cd4b.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 2, 2018

Test build #92527 has finished for PR 21698 at commit b14cd4b.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndex(distributePartition, recomputeOnFailure),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid changing the existing mapPartitionsWithIndex, we can create MapPartitionsRDD directly here.


// include a shuffle step so that our upstream tasks are still distributed
val recomputeOnFailure =
conf.getBoolean("spark.shuffle.recomputeAllPartitionsOnRepartitionFailure", true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put it in object config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without sorting, it doesn't make sense to have this config: disabling it means users will get wrong result.

shuffleStatus.removeOutputsByFilter(x => true)
incrementEpoch()
case None =>
throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unregisterMapOutput -> unregisterAllMapOutput.

@jiangxb1987
Copy link
Contributor Author

Thanks @cloud-fan @viirya comments addressed :)

@SparkQA
Copy link

SparkQA commented Jul 4, 2018

Test build #92616 has finished for PR 21698 at commit 52e76e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {

if (mapStage.rdd.recomputeAllPartitionsOnFailure()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, what if we have a map after repartition? then the root RDD will return false on recomputeAllPartitionsOnFailure

* multiple tasks from the same stage attempt fail (SPARK-5945).
*/
val fetchFailedAttemptIds = new HashSet[Int]
val failedAttemptIds = new HashSet[Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why rename it? we only increase it on fetch failure, don't we?

@mridulm
Copy link
Contributor

mridulm commented Jul 7, 2018

I did not go over the PR itself in detail, but the proposal sounds very expensive - particularly given the cascading costs involved.

Also, I am not sure why we are special case'ing only coalasce/repartition here : any closure which is depending on ordering of tuples is bound to fail - for example, RDD.zip* variants, sampling in ML etc will suffer from same issue.

Either we fix shuffle itself to become deterministic (which I am not sure if we can do efficiently), or we could simply document this issue with coalasce/other relevant api - so that users do a sort when applicable : when they deem the additional cost is required to be borne.
Note that in a lot of cases, this is not an issue - for example when reading from external data stores, checkpointed data, persisted data, etc : which typically are reasons why coalasce gets used a lot (to minimize number of partitions).

@jiangxb1987
Copy link
Contributor Author

Thank you for your comments @mridulm !
We focus on resolving the RDD.repartition() correctness issue here in this PR, because it is most commonly used, and that we can still address the RDD.zip* issue using the similar approach. At first I was worried that the changes may be huge and trying to address the correctness issue for multiple operations may make it difficult to backport the PR. But now it turns out that the PR didn't change that much code, so maybe I can consider include the RDD.zip* fix in this PR too.

Since you are also deeply involved in the related discussion on the correctness issue caused by non-deterministic input for shuffle, you may also agree that there is actually no easy way to both guarantee correctness and don't cause serious performance drop-off. I have to insist that correctness always goes beyond performance concerns, and that we shall not expect users to always remember they may hit a known correctness bug in case of some use patterns.

As for the proposed solution, there are actually two ways to follow: Either you insert a local sort before a shuffle repartition (that's how we fixed the DataFrame.repartition()), or you always retry the whole stage with repartition on FetchFailure. The problem with the local-sort solution is that, it can't fix all the problems for RDD (since the data type of an RDD can be not sortable, and it's hard to construct a sorting for a generic type), also it can make the time consumption of repartition() increases by 3X ~ 5X. By applying the approach proposed in this PR, the performance shall keep the same in case no FetchFailure happens, and it shall works well for DataFrames as well as for RDDs.

I have to admit that if you have a big query running on a huge cluster, and the tasks can easily hit FetchFailure issues, then you may see the job takes more time to finish (or even fall due to reach max consequence stage failure limit). But again, your big query may be producing wrong result without a patch, and I have to say that is even more unacceptable :( . As for the cascading cost, you are right, it makes things worse, and I don't have good advice for that issue.

@mridulm
Copy link
Contributor

mridulm commented Jul 8, 2018

@jiangxb1987 Any closure sensitive to iteration order [1] is effected by this - under the set of circumstances.
If we cannot solve it in a principled manner (make shuffle repeatable which I believe you have investigated and found to be difficult ?) - next best thing until we have a performant solution, would be to expose it to user's and have them deal with it (which is what I did, for example) - with hints on how to accomplish it.

The proposed solution will cause cascading failures for non trivial applications (chain of shuffles) - and also introduce high cost - and can unfortunately cause application failures and unpredictable SLA's.

Having said that, if this is an attempt to explore solutions, I am all for it ! I assumed this was a proposal to get it merged.

[1] I gave example of zip* and sampling, but really - any user defined closure is affected; and we cannot special case for all of them.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 10, 2018

IMO RDD as a distributed data set, it should not guarantee any record order unless you sort it. So user functions and Spark internal functions should not expect a specific record order.

However, the round robin partitione(following with a shuffle) violates it. If the record order changes during retry, we may get wrong result. That's why we should fix repartition but not something else.

I agree with @mridulm that this may introduce a big perf penalty. But when a repartition task fails, we should pay the cost to get the corrected result, instead of producing wrong result and asking users to deal with it themselves.

I feel this is a better solution than the sort one. We should only pay the cost when we really need to, i.e. when the repartition task fails and it's not the final stage.

@mridulm
Copy link
Contributor

mridulm commented Jul 11, 2018

@cloud-fan The difference would be between a (user) defined record order (global sort or local sort) and expectation of repeatable record order on recomputation.
It might also be a good idea to explore how other frameworks handle this.

However, the round robin partitione(following with a shuffle) violates it.

This is is not limited to repartition : any closure which depends on input order has the same effect - repartition/coalesce is one instance of this issue - I gave a few examples from spark itself; and I am sure there are other examples from spark and user code.

It is possible this issue was initially identified via repartition - but modeling the solution only for one manifestation of the issue ignores all others and leaves them unfixed.

@cloud-fan
Copy link
Contributor

For zip, it's hard to define what result is "corrected", given the fact that RDD is unordered. I think sample should be similar.

repartition is special because we may change the number of outputs if this bug is triggered, which is obviously wrong result.

@mridulm
Copy link
Contributor

mridulm commented Jul 11, 2018

@cloud-fan We should not look at a particular stage in isolation, but rather what happens when there are failures in the middle of a job with multiple shuffle stages - and zip is one of the internal stages.
A synthetic example:
rdd1.zip(rdd2).map(v => (computeKey(v._1, v._2), computeValue(v._1, v._2))).groupByKey().map().save()

If relative ordering of rdd1 or rdd2 changes, the computed key would change - and we end up with data loss if some of the tasks in save have already completed.

@jiangxb1987
Copy link
Contributor Author

A synthetic example:
rdd1.zip(rdd2).map(v => (computeKey(v._1, v._2), computeValue(v._1, v._2))).groupByKey().map().save()

The above example may create some different output when retrying a subset of all the tasks. But I may not call it a data loss or data correctness issue. Let's image you run the query twice, each with different ordering of rdd1 and rdd2, each run shall produce different outputs (even different # of output rows). The result produced by retrying a subset of all the tasks is still valid, it actually correspond to another input data representation, though not the same as the initial input.

Now I tend to believe there will not be data loss or data correctness issue, as long as you don't spread input data across partitions in a round robin way (or, in a way that is not related to the data itself), because on task retry you are guaranteed that all input data are covered (each row get recomputed exactly once, though maybe in different order).

@mridulm
Copy link
Contributor

mridulm commented Jul 11, 2018

@jiangxb1987 Different number of output rows is due to data loss - it is not another valid run.
A complete re-execution of the job in this case could result in a different ordering, but consistent output characterstics (number of rows for example).

@cloud-fan
Copy link
Contributor

@mridulm you provided a good example to show the indeterminacy of zip.

rdd1.zip(rdd2).map(v => (computeKey(v._1, v._2), computeValue(v._1, v._2))).groupByKey().map().save()

If rdd1 or rdd2 is unordered, then any result can be treated as a corrected result of this query. If rdd1 and rdd2 is ordered, we don't have a problem.

On the other hand, rdd.repartition is very clear about what is the corrected result. No matter rdd is ordered or not, repartition will not add/remove/update the existing records.

Basically user builds an RDD DAG and Spark should produce a result to meet user's expectation. For zip the user's expectation is: the # of output records of each partition is min(# of record of the corresponding partition in rdd1, # of record of the corresponding partition in rdd2). For map, the expectation can be very vague, zip(...).map(...) can produce any result. For repartition, the expectation is clear: not add/remove/update the existing records.

That's why we should fix repartition because it violates the user's expectation.

@mridulm
Copy link
Contributor

mridulm commented Jul 12, 2018

@cloud-fan There is no ambiguity in output of map - one record in, one record out.
In case of zip, as you said, number of output records is min of both.
Given this, there is no ambiguity in cardinality of zip().map() - I think @jiangxb1987's point was that which two tuples from rdd1 and rdd2 get zip'ed together can be arbitrary : and I agree about that.

Note that the problem I surfaced above will cause data loss even after the proposed fix in this PR by @jiangxb1987 btw.

@cloud-fan
Copy link
Contributor

Given this, there is no ambiguity in cardinality of zip().map() ... which two tuples from rdd1 and rdd2 get zip'ed together can be arbitrary : and I agree about that.

yes, but the following .groupByKey().map() has ambiguity in cardinality because the tulples get zipped can be arbitrary, isn't it?

@jiangxb1987
Copy link
Contributor Author

IIUC the output produced by rdd1.zip(rdd2).map(v => (computeKey(v._1, v._2), computeValue(v._1, v._2))) shall always have the same cardinality, no matter how many tasks are retried, so where is the data loss issue?

@mridulm
Copy link
Contributor

mridulm commented Jul 13, 2018

(Editing my previous response - not well thought out given it is late night :) )
@cloud-fan I used groupByKey.map for illustration purposes - it could be a groupByKey.flatMap instead of map on the values - if we strictly want to check cardinality.
Having said that, this again depends on what computeKey does and what the input distribution is - and even for map, there can be expectation in cardinality which get violated : developers using zip have some expectation from distribution and/or computation anyway.

@mridulm
Copy link
Contributor

mridulm commented Jul 13, 2018

@jiangxb1987 data loss comes because a re-execution of zip might generate a key for which corresponding reducer has already finished.
Hence re-execution of (zip) stage will not result in subsequent child stage's reducer partition getting re-executed : resulting in data loss.

@cloud-fan
Copy link
Contributor

OK we can treat it as a data loss. However, it's not caused by spark but by the user himself. If a user calls zip and then using a custom function to compute keys from the zipped pairs, and finally call groupByKey, there is nothing Spark can guarantee if the RDDs are unsorted. I think in this case the user should fix his business logic, Spark does nothing wrong on this. Even if the tasks never fail, the users can still get different result/cardinality if he runs his query multiple times.

repartition is different because the user's business logic is nothing wrong: he just wants to repartition the data, Spark should not add/remove/update the existing records.

Anyway if we do want to "fix" the zip problem, I think this should be a different topic: we would need to write all the input data to somewhere and make sure the retired task can get exactly same input, which is very expensive and very different from this approach.

@mridulm
Copy link
Contributor

mridulm commented Jul 14, 2018

Taking a step back and analyzing the solution for the problem at hand.
There are three main issues with the proposal:

  • It does not solve the problem in a general manner.
    • I gave example of zip, sample - it applies to any order sensitive closure.
  • It does not fix the issue when a child stage has one or more completed tasks.
    • Even if we assume it is a specific fix for repartition/coalasce - even there it does not solve the problem and can cause data loss.
  • It causes performance regression to existing workaround.
    • The common workaround for this issue is to checkpoint + action or do a local/global sort (I believe sql does the latter now ?).
    • The proposal causes performance regression for these existing workarounds.

The corner case where the proposal works is if :
a) order sensitive stage has finished and
b) no task in child stage has finished fetching its shuffle input before we invalidate parent and cause child stage(s) to fail on fetch failure to refetch.

This is a fairly narrow subset, and why I dont believe the current approach helps.
Having said that, if it is possible to enhance the approach, that would be great !
This is a fairly nasty issue which hurts users, and typically people who are aware of the problem tend to always pay a performance cost to avoid the corner case.

@tgravescs
Copy link
Contributor

tgravescs commented Aug 13, 2018

if you are looking at recomputing how are you going to handle if some tasks have already written output? This was brought up by @cloud-fan above and I didn't see a response. Some output formats have a task commit and then a job commit so it may work for those, but others might not have that.

Unfortunately we are not able to deliver them on 2.4, but I'm optimistic we may include them in 3.0 and of course backport them to all the active branches.

I really disagree with this. We need to fix this in some way before 2.4 release. If the sort way is a fix but performance regression we should do that as its at least fixed by default. We have the config for people who are ok with possible corruption and just want the performance. I wouldn't think its any worse then what is there for dataframes based on what you have said.
I totally understand if you won't have time to work on this but perhaps others will.
I plan on working on this now but would be nice for us to come up with a overall approach.

did anyone run benchmarks on the fix for dataframes? I'm really curious what the real performance implications are.

Note that Apache PIG also had a similar issue withe the round robin partitioner and they removed it and used a hash value partitioner. Spark is obviously different but the underlying issue is the same. I would actually prefer to see us just use the hash partitioner if we can't find a better solution. Our official docs I don't think says it repartitions evenly (http://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.rdd.RDD), but our programming guide does:
http://spark.apache.org/docs/latest/rdd-programming-guide.html

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

So I could see the argument that says we can't change that behavior.

@markhamstra
Copy link
Contributor

I really disagree with this.

I really agree with Tom. At this point, I think the working assumption should be that any 2.4.0 release candidate that doesn't deliver some fix for this issue will receive multiple -1 votes from PMC members.

@tgravescs
Copy link
Contributor

@jiangxb1987 can you clarify if you are working on this still or if you won't have time for a bit?

@mridulm @zsxwing @cloud-fan thoughts on @squito approach to "you can at least sort the serialized bytes of T"

@zsxwing
Copy link
Member

zsxwing commented Aug 13, 2018

"you can at least sort the serialized bytes of T"

I think this should work.

@jiangxb1987
Copy link
Contributor Author

@tgravescs I'm still working on this but I would be glad if you can also work on the "sort the serialized bytes of T" approach, actually the retry-all-tasks approach seems more complex than I expected when it involves commit protocol (currently a task can be only committed once, so if you already have some tasks committed and then hit a ExecutorLost then retry-all-tasks won't work), so I hope we can have other approaches like "sort the serialized bytes of T" get merged into 2.4 release.

I'll post the benchmark result of DF.repartition() fix later.

@squito
Copy link
Contributor

squito commented Aug 14, 2018

I also think @tgravescs solution of using the HashPartitioner is an acceptable one, though as you've noted it doesn't deal w/ skew (which may be a lot of the existing use of repartition()). I think we'd probably see a bunch of users complain that their jobs started crashing on upgrading 2.4 if thats the best we can offer, but IMO crash is way better than silent data loss.

@mridulm
Copy link
Contributor

mridulm commented Aug 14, 2018

@tgravescs I vaguely remember someone at y! labs telling me (more than a decade back) about MR always doing a sort as part of its shuffle to avoid a variant of this problem by design.
Essentially it boils down to Imran's suggestion even for arbitrary byte writable's [1], [2] ...

[1] https://hadoop.apache.org/docs/r0.23.11/api/src-html/org/apache/hadoop/io/BytesWritable.html
[2] https://hadoop.apache.org/docs/r0.23.11/api/src-html/org/apache/hadoop/io/WritableComparator.html#line.154

@mridulm
Copy link
Contributor

mridulm commented Aug 14, 2018

@squito @tgravescs I am probably missing something about why hash partitioner helps, can you please clarify ?
IIRC the partitioner for CoalescedRDD when shuffle is enabled is HashPartitioner ... the issue is the distributePartition before the shuffle which is order sensitive but is not deterministic since its input is not deterministic if it is derived from one or more shuffle output's.

Btw, when shuffle = false, it does not suffer from the problem - mentally I had assumed that had an issue too - on a recheck now, I find it interesting that it does not (I never used that, so had never checked in detail !)

Also, as I mentioned a few times above ... repartition/coalesce is only one of the public api's in spark which suffers from this; a host of others have same issue as well.
I like @squito's suggestion - since it is a general solution for all variants of the problem (barring bad user code).

@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 14, 2018

I took a quick look at the shuffle writer and feel it will be hard to insert a sort there.

I have a simpler proposal for the fix. To trigger this bug, there must be a shuffle before the repartition, queries like sc.textFile(...).repartition has no problem.

We can add a flag (named fromCoelesce) in the ShuffleRDD to indicate if it's produced by RDD#coalesce. In DAGScheduler, if we hit a FetchFailure, fail the job if the shuffle is from RDD#coalesce and the previous stage is also a shuffle map stage. We can provide a config to turn off this check, or add an RDD#repartitionBy which uses hash partitioner instead of round-robin. In the error message we should mention these 2 workarounds.

In the next release, we can implement the sort or the retry approach as a better fix.

After more thoughts, the problem can be generalized as

  1. RDD#mapPartitions and its friends can take arbitrary user functions, which may produce random result
  2. The shuffle map tasks will consume the inputs and distribute them to different reducers.
  3. When a reduce task throws FetchFailed, Spark scheduler will rerun the map tasks to rewrite the missing shuffle blocks and retry the reduce task. The already finished reduce tasks will not be rerun.

Step 3 is problematic: assuming we have 5 map tasks and 5 reduce tasks, and the input data is random. Let's say reduce task 1,2,3,4 are finished, reduce task 5 failed with FetchFailed, and Spark rerun map task 3 and 4. Map task 3 and 4 reprocess the random data and create diffrent shuffle blocks for reduce task 3, 4, 5. So not only reduce task 5 needs to rerun, reduce task 3, 4, 5 all need to rerun, because their input data changed.

That said, I think we are too optimistic when handling fetch failure. We should keep in mind that RDD output can be random. So when we see fetch failure and rerun map tasks, we should track which reducers have its shuffle blocks being rewritten, and rerun them.

Simply inserting a sort before shuffle doesn't help. The fix for dataframe is adding a sort before round-robin, to make it deterministic. If we add the sort after round-robin and before shuffle, the problem still exists.

I think the correct fix is: be more conservative when handling fetch failure and rerun more reduce tasks. We can provide an internal API to tag a RDD as deterministic (very common in Spark SQL) and then we can safely be optimistic when handling fetch failure.

@jiangxb1987
Copy link
Contributor Author

Thanks @cloud-fan your summary above is super useful, and I think it's clear enough.

So when we see fetch failure and rerun map tasks, we should track which reducers have its shuffle blocks being rewritten, and rerun them.

IIUC, patterns like rdd.map(...).groupBy() shall always be under risk if we can generate non-determine output in map() right?

Simply inserting a sort before shuffle doesn't help. The fix for dataframe is adding a sort before round-robin, to make it deterministic. If we add the sort after round-robin and before shuffle, the problem still exists.

Does this means, if we can generate non-determine output, then we can still loss some data even add a local sort before shuffle, because the reduce tasks may have already finished (or even have committed)?

be more conservative when handling fetch failure and rerun more reduce tasks. We can provide an internal API to tag a RDD as deterministic (very common in Spark SQL) and then we can safely be optimistic when handling fetch failure.

This is somehow like what I proposed yesterday, one issue we can't resolve is that some ResultTasks may have committed, in that case it seems the best effort we can make is just fail the job.

@tgravescs
Copy link
Contributor

@squito @tgravescs I am probably missing something about why hash partitioner helps, can you please clarify ?
IIRC the partitioner for CoalescedRDD when shuffle is enabled is HashPartitioner ... the issue is the distributePartition before the shuffle which is order sensitive but is not deterministic since its input is not deterministic if it is derived from one or more shuffle output's.

@mridulm sorry I wasn't real clear, I guess on the RDD side its not called RoundRobinPartitioner (like it is on dataframe side), but the distributePartition is essentially doing that would need to change to just use a normal hash or something that is deterministic. Basically any operation that does the shuffle has to have deterministic output for choosing which reducers it goes to. The idea at least is what the first PR for this jira was: #20414 (disclaimer, I haven't looked at that in detail). I'll spend more time going through the code to see all the specifics. But like we've discussed the downside is it will not be evenly distributed. So the question is if we would want that?

After more thoughts, the problem can be generalized as

  1. RDD#mapPartitions and its friends can take arbitrary user functions, which may produce random result
  2. The shuffle map tasks will consume the inputs and distribute them to different reducers.
  3. When a reduce task throws FetchFailed, Spark scheduler will rerun the map tasks to rewrite the missing shuffle blocks and retry the reduce task. The already finished reduce tasks will not be rerun.

Thanks @cloud-fan for the write up. This is exactly why I brought up HashPartitioner (ie stop using round robin whether in partitioner or distributePartition function) and exactly why Pig stopped using it for its Union operation. Its not just us internally doing it, the user code could do anything that the output is not in the same order on rerun of the Map Task.

Like you said, the sort has to be done before the round robin and it has to be done on the entire record (not just on key for instance if you had key,value), and I do see this as being a possibly very expensive operation. But if for instance we can't change repartition to not be evenly distributed, it seems like one of the only options. I haven't looked at the details about inserting it here either so I need to do that to understand how complicated it would be.

If we can't come up with another solution, I would actually be ok with failing short term, its better then corruption. Or perhaps we can actually allow user to choose the behavior, have a config for it would fail, one config for they don't care because they know they checkpointed or something, and one for doing the sort.

I think the correct fix is: be more conservative when handling fetch failure and rerun more reduce > tasks. We can provide an internal API to tag a RDD as deterministic (very common in Spark SQL) > and then we can safely be optimistic when handling fetch failure.

Like @jiangxb1987 said and we discussed above I don't think this will work when you have result tasks that could have committed output. You can't undo a task commit.

Now we could do a combination of things where as long as we aren't a ResultTask we fail all reducers and maps to rerun. If we are running ResultTask you either fail entire job or fall back to the expensive Sort.

Just to kind of summarize, the only solutions that I've thought of or mentioned by others:

  • don't use round robin (which for repartition could break the api since it wouldn't be evenly distributed)
  • Sort all records before doing the round robin assignment ( has to be deterministic) - probably very expensive and need to investigate how hard code would be
  • Fail all reducers and rerun more if the we haven't started any ResultTasks, if ResultTask decide another solution
  • Just fail entire job on this type of failure

@mridulm
Copy link
Contributor

mridulm commented Aug 14, 2018

@cloud-fan I think we have to be clear on the boundaries of the solution we can provide in spark.

RDD#mapPartitions and its friends can take arbitrary user functions, which may produce random result

As stated above, this is something we do not support in spark.
Spark, MR, etc assume that computation is idempotent - we do not support non determinism in computation : but computation could be sensitive to input order. For a given input partition (iterator of tuples), the closure must always generate the same output partition (iterator of tuples).

Which is why 'randomness' (or rather pseudo-randomness) is seeded based using invariants like partition id which result in same output partition on task re-execution.

The problem we have here is : even if user code satisfies this constraint, due to non determinism in input order, the output changes when closure is order sensitive.

Given this, analyzing the statement below :

Step 3 is problematic: assuming we have 5 map tasks and 5 reduce tasks, and the input data is random. Let's say reduce task 1,2,3,4 are finished, reduce task 5 failed with FetchFailed, and Spark rerun map task 3 and 4. Map task 3 and 4 reprocess the random data and create diffrent shuffle blocks for reduce task 3, 4, 5. So not only reduce task 5 needs to rerun, reduce task 3, 4, 5 all need to rerun, because their input data changed.

Here - map task 3 and 4 will always produce the same output partition for supported closures - if the input partition remains same.
When reading off checkpoint's, immutable hdfs files, etc - this invariant is satisfied.
With @squito's suggestion implemented, this invariant will be satisfied for shuffle input as well.

With deterministic input partition - we can see that output of map task 3 and 4 will always be the same - and reduce task input's for 3/4/5 will be the same. So only reduce task 5 will need to be rerun and none of the other input's will change.

@mridulm
Copy link
Contributor

mridulm commented Aug 14, 2018

I guess on the RDD side its not called RoundRobinPartitioner

Thanks for clarifying @tgravescs ! I was looking at RangePartitioner and variants and was wondering what I was missing - did not make the obvious connection with sql :-)

If we can't come up with another solution, I would actually be ok with failing short term, its better then corruption

If I understand correctly, the proposal is

  • In ShuffledRDD, add a flag orderSensitiveReducer (?) - to track specific patterns identified which is order sensitive (repartition with shuffle = true, zip, etc).
  • If a task is getting re-executed as part of stage re-execution, if the flag is true, fail job.
    • Task re-execution as part of same stage, speculative execution, etc should not be an issue - since only one task completes.
    • ResultStage should not be affected.
    • I am unsure about how cache'ing data interacts here - might need some investigation.

This looks like a reasonable stop gap until we fix the issue.

It also allows for users to make progress by inserting a checkpoint before the order sensitive closure to unblock them.

@cloud-fan
Copy link
Contributor

... assume that computation is idempotent - we do not support non determinism in computation

Ah this is a reasonable restriction, we should document it in the RDD classdoc. How about the source (root RDD or shuffle)? The output of reduce task is non-deterministic because Spark fetches multiple shuffle blocks at the same time and it's random which shuffle blocks can finish fetching first. External sorter has the same problem: the output order can change if spilling happens.

Generally I think there are 3 directions:

  1. assume computing functions are idempotent, and also make Spark internal operations idempotent(reducer, external sorter, maybe more). I think this is hard to do, but should be the clearest semantic.
  2. assume computing functions are idempotent and are insensitive to the input data order. Then Spark internal operations can have different output orders. An example is adding sort before round-robin, which makes this computing functions insensitive to the input data order. But I don't think it's reasonable to apply this restriction to all computing functions.
  3. assume computing functions are random. This is not friendly to the scheduler, as it needs to be able to revert a finished task. We need to think about if it's possible to revert a result task.

@tgravescs
Copy link
Contributor

so I think the assumption is that task results are idempotent but not ordered. Sorry if that contradictory. The data itself has to be the same on rerun but the order of things in there doesn't. That was my general assumption. I think zip doesn't follow that though when the inputs aren't ordered. Not sure if there are others spark supports, need to go through the list I guess, unless someone already has?

I think we just need to document these operations and say the results can be inconsistent if not sorted or perhaps give them an option to also sort. Either that or we have to say we don't support unordered output at all in Spark. Thoughts on just documenting zip or others with unordered input?

I don't think mapreduce and pig have this issue because they don't internally support an operation like zip, everything is on key/values and joins, groupby on the keys. User code there could generate it as well but I would claim its the users fault there.

@cloud-fan
Copy link
Contributor

I tried a prototype to fix the handling of fetch failure, seems not that hard: #22112

@mridulm
Copy link
Contributor

mridulm commented Aug 23, 2018

@jiangxb1987 I am guessing we should close this PR ?

@jiangxb1987
Copy link
Contributor Author

Thanks everyone! I closed this in favor of #22112

cloud-fan added a commit to cloud-fan/spark that referenced this pull request Sep 6, 2018
An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

a new test case

Closes apache#22112 from cloud-fan/repartition.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
asfgit pushed a commit that referenced this pull request Sep 7, 2018
backport #22112 to 2.3

-------

An alternative fix for #21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

a new test case

Closes #22354 from cloud-fan/repartition.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
bersprockets pushed a commit to bersprockets/spark that referenced this pull request Sep 7, 2018
backport apache#22112 to 2.2

-------

An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

a new test case

Closes apache#22354 from cloud-fan/repartition.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
asfgit pushed a commit that referenced this pull request Sep 11, 2018
…ectness issue

## What changes were proposed in this pull request?

Back port of #22354 and #17955 to 2.2 (#22354 depends on methods introduced by #17955).

-------

An alternative fix for #21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

## How was this patch tested?

a new test case

Closes #22382 from bersprockets/SPARK-23243-2.2.

Lead-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 25, 2019
…ectness issue

## What changes were proposed in this pull request?

Back port of apache#22354 and apache#17955 to 2.2 (apache#22354 depends on methods introduced by apache#17955).

-------

An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

## How was this patch tested?

a new test case

Closes apache#22382 from bersprockets/SPARK-23243-2.2.

Lead-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 26, 2019
…ectness issue

## What changes were proposed in this pull request?

Back port of apache#22354 and apache#17955 to 2.2 (apache#22354 depends on methods introduced by apache#17955).

-------

An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

## How was this patch tested?

a new test case

Closes apache#22382 from bersprockets/SPARK-23243-2.2.

Lead-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 27, 2019
…ectness issue

## What changes were proposed in this pull request?

Back port of apache#22354 and apache#17955 to 2.2 (apache#22354 depends on methods introduced by apache#17955).

-------

An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

## How was this patch tested?

a new test case

Closes apache#22382 from bersprockets/SPARK-23243-2.2.

Lead-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants