-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33114][CORE] Add metadata in MapStatus to support custom shuffle manager #31763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #135825 has finished for PR 31763 at commit
|
|
Could you close the previous PR and update the PR title correctly? |
Yes, updated this PR’s title. Will close previous PR after people start to review here. |
| case Some(shuffleStatus) => | ||
| shuffleStatus.withMapStatuses { statuses => | ||
| MapOutputTracker.checkMapStatuses(statuses, shuffleId) | ||
| statuses.clone |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So as we discussed in #30004 (comment)
To change it to getAllMapOutputStatusMetadata and only return the metadata could be a solution extended with the restriction to allow only immutable metadata.
So to be on the safe side please document we require the metadata to be immutable and introduce an updateMetadata(meta: Option[Serializable]) method in MapStatus. Then we will be safe and all the use cases are covered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, I will change it getAllMapOutputStatusMetadata and update this PR.
| assert(mapWorkerTracker.getMapSizesByExecutorId(10, 0).toSeq === | ||
| Seq((BlockManagerId("a", "hostA", 1000), | ||
| ArrayBuffer((ShuffleBlockId(10, 5, 0), size1000, 0))))) | ||
| assert(0 == masterTracker.getNumCachedSerializedBroadcast) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not remove this assert:
assert(0 == masterTracker.getNumCachedSerializedBroadcast)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems caused by merge issue, will add it back
| * to store information they need. For example, a Remote Shuffle Service ShuffleManager could | ||
| * store shuffle server information and let reducer task know where to fetch shuffle data. | ||
| */ | ||
| def metadata: Option[Serializable] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm...what's the relationship between SPARK-33114 and SPARK-25299? According to the JIRA description, SPARK-33114 seems to enhance the support for custom shuffle manager while SPARK-25299 only customize the storage with the default SortShuffleManager.
So if we are only talking about SPARK-33114, adding metadata may be a good choice according to its own scenario. But if we bring in SPARK-25299 together (IIUC, what this PR is doing would also benefit SPARK-25299), I personally think we need a more general design here. For example, I'd prefer to redesign the location of MapStatus to make it be able to support different scenarios (e.g., Spark BlockManager, Spark external shuffle service, custom remote storage, etc. ) mentioned in SPARK-25299. And in this way, different scenarios would be able to reuse the existing features, e.g., decommission(which may update mapstatus location during runtime) and reuse the existing code paths, e.g., we don't need the extra getAllMapOutputStatuses and everything should be the same as what we already did during shuffle reading.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 I agree with you that finishing the design laid out in SPARK-25299 would be much better.
This is why I opened #30763 as a copy of Matthew Cheah's original PR for SPARK-31801 (because he is busy with other projects) and kept it up-to-date several times with the master.
But it haven't got enough reviews and I wouldn't want to block @hiboyang further, #30004 (comment).
I am sure with your help we can complete SPARK-31801 and be on the road of SPARK-25299.
So next week I will do the conflict resolution and ping you when the PR is ready for review. Is this okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, thanks @attilapiros for keeping working on SPARK-25299 while unblocking this PR. @Ngone51 SPARK-33114 is a small change to support remote shuffle service/storage by adding a metadata object in MapStatus. It could be viewed as a subset of SPARK-25299 's work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So next week I will do the conflict resolution and ping you when the PR is ready for review. Is this okay?
Sure, please. @attilapiros
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hiboyang Thanks for your explanation. I agree that #30763 is too big for review. But I think we can discuss there first to ensure we towards the same direction before we deep into details. And when we're on the same page, we can split the big PR into smaller pieces and start to co-work. Does it sound good to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK-31801 is very big, and may take very long time to finish (already being there for 10 months). Could we merge this PR first?
If SPARK-31801 find a better way to support it and break getAllMapOutputStatusMetadata, it is actually a good thing :) We could have multiple iterations. This PR is the first iteration with very small change. SPARK-31801 is the iteration after that. The latter does not need to block the former one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should develop like this way...As you mentioned above, SPARK-33114 can be considered as a subtask of SPARK-25299. So how can we consider this PR as a first iteration when SPARK-25299 is still under discussion and development, especially when people haven't reached an agreement on the solution and has a possible alternative solution at the same time? Also, I think the custom shuffle manager isn't officially supported by Spark because the ShuffleManager interface is private. So it doesn't make sense for Spark to add an internal API for un-official use cases if there's no strong reason.
SPARK-31801 is surely big. But as I mentioned early, we can split it. When the solution is finalized, we can start with refactoring MapStatus first. I think it would be a much smaller task and be enough for your case. And then, we'll start the remaining work(e.g. use the new MapStatus where it was referenced) but you don't care.
I understand you have paid a lot of effort into this work, and sorry we can not get it in fast. And, unfortunately, I don't have the permission to merge. You could persuade committers to merge the PR if you insist on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is also good idea if we could split SPARK-31801 and start with refactoring MapStatus first. Do you or the community get ideas about how to split SPARK-31801?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're still discussing the solution in #30763. So I can't tell you the concrete split plan. But, I think, we'd be able to start with refactoring MapStatus either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I missed the latest discussion in #30763, will check there as well, thanks!
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #135846 has finished for PR 31763 at commit
|
|
Test build #137478 has finished for PR 31763 at commit
|
|
Test build #140641 has finished for PR 31763 at commit
|
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
This PR is copied from #30004, with extra change addressing the comments. My git environment was messed up and could not update previous PR 30004. Thus create this new PR to replace the previous one.
What changes were proposed in this pull request?
Add generic metadata in MapStatus class to support custom shuffle manager. Also add a new method to retrieve all map output statuses and their metadata. See Jira: https://issues.apache.org/jira/projects/SPARK/issues/SPARK-33114
Why are the changes needed?
Current MapStatus class is tightly bound with local (sort merge) shuffle which uses BlockManagerId to store the shuffle data location. It could not support other custom shuffle manager implementation.
For example, when we implement Remote Shuffle Service, we want to put remote shuffle server information into MapStatus so reducer could fetch that information and figure out where to fetch data. The added MapStatus.metadata field could store such information.
If people implement other shuffle manager, they could also store their related information into this metadata field.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit test