Skip to content

Conversation

@xuanyuanking
Copy link
Member

What changes were proposed in this pull request?

Add indeterminate stage rerun support in shuffle writer API, the usage of newly added shuffleGenerationId param is in #24892.

How was this patch tested?

Existing UT.

Add indeterminate stage rerun support in shuffle writer api

(cherry picked from commit 99c2b4a)
Signed-off-by: Yuanjian Li <[email protected]>
@xuanyuanking xuanyuanking changed the title [SPARK-28625][Core] [SPARK-28625][Core] Indeterminate shuffle support in Shuffle Writer API Aug 5, 2019
@SparkQA
Copy link

SparkQA commented Aug 5, 2019

Test build #108662 has finished for PR 25361 at commit dc9759b.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 5, 2019

Test build #108663 has finished for PR 25361 at commit 6e9bab2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* partitioned bytes written by that map task.
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param shuffleGenerationId The shuffle generation ID of the stage that this task belongs to,
Copy link
Contributor

@mccheah mccheah Aug 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just use the mapTaskAttemptId? (In fact I wonder if we can just remove shuffleId and mapId and just use mapTaskAttemptId as a global identifier, but that might be a bit ambitious.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically taking a look at the linked PR for indeterminate retries - I'd expect that on a rolled back map stage, the implementation of this plugin will be given a different mapTaskAttemptId anyways since that's going to be updated on the resubmit. So, we'll end up opening a new writer regardless, but, we could have gotten the same behavior just by using the mapTaskAttemptId.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually let's move discussion over to #24892

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thanks for your faster review, I describe the requirement in #24892 (comment)

ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, mapTaskAttemptId, numPartitions);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
shuffleId, -1, mapId, mapTaskAttemptId, numPartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we always passing in -1 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR created for quick reviewing of API changes, you can see the real scenario of shuffleGeneraionId here :)

@mccheah
Copy link
Contributor

mccheah commented Aug 5, 2019

@squito @vanzin also

@xuanyuanking
Copy link
Member Author

Close this preview PR, the API changed in #25620, which uses map task attempt Id as the map id during writer creation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants