-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33114][CORE] Add metadata in MapStatus to support custom shuffle manager #30004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Sync to latest spark master branch
Sync with latest spark master branch
|
ok to test |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #129962 has finished for PR 30004 at commit
|
|
@boy-uber I feel that #28618 and previously related works seems doing what you want here. |
Yes, mccheah was working on #28618 to extend map status metadata, and he stopped working on that any more. I am taking a slightly different approach, trying to find out the minimum changes we need to support remote shuffle. The code in this PR is much smaller than that PR, and could support remote shuffle as well. May we review this one? |
|
cc @Ngone51 |
|
Hmm, I still see some updates from @attilapiros and @mccheah in Oct on #28618. |
|
Yes, I would like to help in finishing #28618. Last time I checked it was just a few missing issues, now it might have some more merge conflicts... |
|
Hi @attilapiros, glad to see you would like to finish #28618. This PR is kind of a very simplified version of that one (#28618). My concern is #28618 may take very long time to get agreement from all the people (it is already there for 6 months). Could we switch to this PR if #28618 gets stuck again?
|
@attilapiros sounds good, thanks for working on that and keeping us updated! |
|
@attilapiros are you still working on this? |
|
@hiboyang The problem is not getting enough reviews and without it I can not push this forward. But I do not want to block you further. |
|
Cool, thanks @attilapiros for the update! |
|
|
||
| override def getAllMapOutputStatuses(shuffleId: Int): Array[MapStatus] = { | ||
| logDebug(s"Fetching all output statuses for shuffle $shuffleId") | ||
| val statuses = getStatuses(shuffleId, conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please clear the mapStatuses in case of MetadataFetchFailedException!
Reasoning:
The getStatuses method before this PR was only used in getMapSizesByExecutorId where the MetadataFetchFailedException (the case when missing output location was detected) handled by clearing of the mapStatuses cache as it is probably outdated.
I am sure that clearing would not be missed if this cleaning would be done at the throwing of that exception.
Could you please check whether it can be moved there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I see why you cannot move the clearing there!
Still the clearing itself is needed to be done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we only need to clear mapStatuses in MapOutputTrackerWorker , will add that
| case Some(shuffleStatus) => | ||
| shuffleStatus.withMapStatuses { statuses => | ||
| MapOutputTracker.checkMapStatuses(statuses, shuffleId) | ||
| statuses.clone |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your intention by calling this clone here but I do not think this is enough.
As the MapStatus is trait and not a case class in addition its implementations are mutable with a lot of var fields.
The clone on the Array is not a deep copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, got your point. How about change this method to getAllMapOutputStatusMetadata to only return the metadada?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could not be enough if the metadata can mutate. But as I see we could solve all the problems with immutable metadata easily. So to be on the safe side please document we require the metadata to be immutable and introduce an updateMetadata(meta: Option[Serializable]) method in MapStatus. Then we will be safe and all the use cases are covered.
(And you can use a case class for the Uber RSS's MapTaskRssInfo)
| out.writeInt(compressedSizes.length) | ||
| out.write(compressedSizes) | ||
| out.writeLong(_mapTaskId) | ||
| if (_metadata.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
out.writeBoolean(_metadata.isDefined)
_metadata.foreach(out.writeObject)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice suggestion!
| out.writeByte(kv._2) | ||
| } | ||
| out.writeLong(_mapTaskId) | ||
| if (_metadata.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
out.writeBoolean(_metadata.isDefined)
_metadata.foreach(out.writeObject)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice suggestion!
| def checkMapStatuses(statuses: Array[MapStatus], shuffleId: Int): Unit = { | ||
| assert (statuses != null) | ||
| for (status <- statuses) { | ||
| if (status == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can extract this if into a new method and reuse the method in convertMapStatuses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, good suggestion!
| (BlockManagerId("b", "hostB", 1000), | ||
| ArrayBuffer((ShuffleBlockId(10, 6, 0), size10000, 1)))).toSet) | ||
| val allStatuses = tracker.getAllMapOutputStatuses(10) | ||
| assert(allStatuses.toSet === Set(mapStatus1, mapStatus2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the toSet you lose the ordering meanwhile ordering can be important (that is the mapIndex) so it should be tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion, will use Array to check the sequence as well
| tracker.registerMapOutput(10, 0, mapStatus1) | ||
| tracker.registerMapOutput(10, 1, mapStatus2) | ||
| val allStatuses = tracker.getAllMapOutputStatuses(10) | ||
| assert(allStatuses.toSet === Set(mapStatus1, mapStatus2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| assert(mapWorkerTracker.getMapSizesByExecutorId(10, 0).toSeq === | ||
| Seq((BlockManagerId("a", "hostA", 1000), | ||
| ArrayBuffer((ShuffleBlockId(10, 5, 0), size1000, 0))))) | ||
| assert(0 == masterTracker.getNumCachedSerializedBroadcast) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put the assert back:
assert(0 == masterTracker.getNumCachedSerializedBroadcast)
| assert(0 == masterTracker.getNumCachedSerializedBroadcast) | ||
| val allMapOutputStatuses = mapWorkerTracker.getAllMapOutputStatuses(10) | ||
| assert(allMapOutputStatuses.length === 1) | ||
| assert(allMapOutputStatuses(0).location === mapStatus.location) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to have an equals method on MapStatus and use the equals here.
Because in that case when MapStatus is extended with a new field this test will validate the serialization / deserialization of this new field too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In responding of one of previous comments, I am suggesting returning only metadata instead of the whole map statue object. Will revisit here after that discussion.
| (ShuffleBlockId(10, 6, 2), size1000, 1))) | ||
| ) | ||
| ) | ||
| assert(tracker.getAllMapOutputStatuses(10).toSet === Set(mapStatus1, mapStatus2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
order check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
| mapWorkerRpcEnv.shutdown() | ||
| } | ||
|
|
||
| test("remote get all map output statuses with metadata") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking about extending the remote fetch test (the one before this) with an extra registered map output where metadata is given and then this test could be deleted.
That way you will test the case when one of map status is given with and one is without a metadata.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous test has masterTracker.unregisterMapOutput and some test verification for that, thus want to avoid adding too much for that test. Also this test is specifically testing non-null metadata object, kind of following "separation of concerns" to make it as a separate test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fine for me
|
Sorry for the many separate single comments: first my plan was to peek into a bit. |
No worry, your comments are great and very helpful :) Thank a lot for taking time to review! |
|
Oops, I messed up my git a little bit. I deleted and recreated my previous Spark fork. Seems I could not update this PR anymore. @attilapiros I created another PR #31763 with changes addressing your comments. Sorry for the inconvenience to switch to that new PR. For the "The clone on the Array is not a deep copy" comment, want to wait and see how you like the idea to change the method to "getAllMapOutputStatusMetadata". |
|
You can fix this, please try the followings: |
|
You should be able to set the HEAD with |
Previously I deleted my Spark fork. Then I forked again and re-created mapStatusMetadata2 branch under my new Spark fork (https://github.com/hiboyang/spark/tree/mapStatusMetadata2), but this PR does not update after I commit new changes in mapStatusMetadata2 branch. |
The commit fe693eb was gone, and I could not git reset on it. That commit was created by my previous Spark fork which I deleted. I did not have that original fork/commit on my local git repo now. |
|
I assume branches are identified just by the name so if you are standing at the branch used for the other PR (#31763, lets say the name branch2) and then create a branch with the same name which is used for this PR (call it branch1) and force push it then the PR updates, so |
I tried this approach and it did not work. The branch I previously used for this PR is mapStatusMetadata2. I re-created this same name branch and pushed to GitHub, but this PR does not refresh with my new change. |
|
I see, let's continue at the other PR and close this one. |
|
This PR is closed. Please review the other PR #31763. Thanks! |
What changes were proposed in this pull request?
Add generic metadata in MapStatus class to support custom shuffle manager. Also add a new method to retrieve all map output statuses and their metadata. See Jira: https://issues.apache.org/jira/projects/SPARK/issues/SPARK-33114
Why are the changes needed?
Current MapStatus class is tightly bound with local (sort merge) shuffle which uses BlockManagerId to store the shuffle data location. It could not support other custom shuffle manager implementation.
For example, when we implement Remote Shuffle Service, we want to put remote shuffle server information into MapStatus so reducer could fetch that information and figure out where to fetch data. The added MapStatus.metadata field could store such information.
If people implement other shuffle manager, they could also store their related information into this metadata field.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit test