Skip to content

Conversation

@yangwwei
Copy link
Contributor

@yangwwei yangwwei commented Nov 19, 2021

What changes were proposed in this pull request?

Propose to skip registering with ESS if a customized shuffle manager (Remote Shuffle Service) is configured. Otherwise, when the dynamic allocation is enabled without an external shuffle service in place, the Spark executor still tries to connect to the external shuffle service which gets to a connection refused exception.

Why are the changes needed?

To get dynamic allocation works with a 3rd party remote shuffle service.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Test locally on K8s with docker-desktop, DA enabled, no external shuffle service, running Uber's RSS locally.

With the default setting, when I run spark job and it will fail with the following error:

21/11/30 07:35:24 INFO BlockManager: Registering executor with local external shuffle service.
21/11/30 07:35:24 ERROR BlockManager: Failed to connect to external shuffle server, will retry 2 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to /10.1.2.201:7337

Then apply this patch to Spark, rebuild Spark, and make the changes like the following in the ShuffleManager implementation, i.e RssShuffleManager.scala

diff --git a/src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala b/src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala
index 4b6e825..aeffd7d 100644
--- a/src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala
+++ b/src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala
@@ -442,5 +442,8 @@ class RssShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
       }
     new ServerConnectionCacheUpdateRefresher(serverConnectionResolver, MultiServerHeartbeatClient.getServerCache)
   }
+
+  override def supportExternalShuffleService: Boolean = false
+

restart RSS, and rerun the Spark job, the same job can be successfully completed.

@github-actions github-actions bot added the CORE label Nov 19, 2021
@yangwwei yangwwei changed the title [SPARK-37394] Skip registering to ESS if a customized shuffle manager is configured [SPARK-37394] Skip registering with ESS if a customized shuffle manager is configured Nov 19, 2021
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon HyukjinKwon changed the title [SPARK-37394] Skip registering with ESS if a customized shuffle manager is configured [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured Nov 22, 2021
@HyukjinKwon
Copy link
Member

cc @mridulm and @Ngone51

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable to me given immediate need to make progress - though I dont like hardcoding it this way.
+CC @tgravescs, @Ngone51

@attilapiros
Copy link
Contributor

In this case there will be a problem when somebody would like to use the external shuffle service with a customized shuffle manager (like with some minor version of the SortShuffleManager).

I do not know how valid this use case is but we can come around it in the following way:
What about extending ShuffleManager trait with a new method indicating whether this shuffle manager implementation works with the external shuffle manager or not. It can have a default implementation giving back true and only needed to be overridden when the external shuffle manager is not supported.

@tgravescs
Copy link
Contributor

So my first reaction is: you have a 3rd party shuffle manager that is an external shuffle service because it supports dynamic allocation, then why is it failing... is it because you didn't override something, or because you couldn't override something?
In this case it's creating a ExternalBlockStoreClient, which I think isn't setup to be overridden. I think it comes down to we just haven't really added support to allow this. I definitely agree that we should have this and that 3rd party shuffle services should be able to use dynamic allocation but I would like to see it done more officially then this.
Either something like @attilapiros mention - which I definitely like the idea of asking the shuffle manager what it supports - or user overrides this class somehow.

I need to go refresh my memory with all the various shuffle stuff that went in. we had some other shuffle work in progress that seems to have stalled as well.
Maybe I'm missing why this needs to go in quickly?

@yangwwei
Copy link
Contributor Author

Thank you @HyukjinKwon , @attilapiros , @tgravescs

What about extending ShuffleManager trait with a new method indicating whether this shuffle manager implementation works with the external shuffle manager or not. It can have a default implementation giving back true and only needed to be overridden when the external shuffle manager is not supported.

I really like this idea, thank you @attilapiros. How about adding a new method: supportExternalShuffleService(). This method gives each shuffle manager implementation a way to tell if the external shuffle service is needed for this shuffle manager to work. Default it returns true, and then the block manager will register with the external shuffle server; otherwise, that registration can be skipped.

So my first reaction is: you have a 3rd party shuffle manager that is an external shuffle service because it supports dynamic allocation, then why is it failing... is it because you didn't override something, or because you couldn't override something? In this case it's creating a ExternalBlockStoreClient, which I think isn't setup to be overridden. I think it comes down to we just haven't really added support to allow this.

We actually found this issue while using Uber's Remote Shuffle Service with DA enabled. This is due to this part of code being hardcoded to register with the external shuffle service even a 3rd party shuffle service is used. We will need a more general way to handle this. Please let me know your thoughts, thanks!

@yangwwei
Copy link
Contributor Author

yangwwei commented Nov 30, 2021

@mridulm , @attilapiros , @tgravescs could you pls help to review the changes again?
Per @attilapiros 's suggestion, I have added a method in the ShuffleManager trait and this is allowed to be overridden when needed. The default returns true, so there is no behavior change. I have also updated the "How this was tested" with more details about the tests I've done locally.

Note, this is still an "incompatible" change to the 3rd party shuffle service implementations. Adding a method with a default implementation in a trait will require a re-compile of the RSS's server/client library.

Thanks!

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tgravescs
Copy link
Contributor

so I'm a bit on the fence about this. I hesitate to change this API and cause folks to recompile without really more investigation to properly make shuffle pluggable. The shuffleManager here is private[spark] so the fact we are adding a function for 3rd party libraries doesn't really match well with that by itself. Ideally we really make api's pluggable and classes developers apis for 3rd parties but I also realize that is a much larger task to find how to do that properly. I just hate to change the api multiple times if we are planning on doing more work there. At the same time, I think other attempts at that have stalled, so open here to thoughts from others?

@yangwwei
Copy link
Contributor Author

so I'm a bit on the fence about this. I hesitate to change this API and cause folks to recompile without really more investigation to properly make shuffle pluggable. The shuffleManager here is private[spark] so the fact we are adding a function for 3rd party libraries doesn't really match well with that by itself. Ideally we really make api's pluggable and classes developers apis for 3rd parties but I also realize that is a much larger task to find how to do that properly. I just hate to change the api multiple times if we are planning on doing more work there. At the same time, I think other attempts at that have stalled, so open here to thoughts from others?

Thanks for sharing your thoughts @tgravescs . By far, there are several 3rd shuffle service implementations out there, and it seems the existing APIs are pluggable enough. Well, I may miss some facts, but at least we are pretty happy to run with Uber's RSS with minimal configs. This is the only issue we found that we cannot bypass, we need to have some changes in the Spark in order not to "always" register with the local shuffle service. This approach is the simplest and safest solution I can think of. I would be loving to hear thoughts from others as well.

@tgravescs
Copy link
Contributor

yeah I understand there are a bunch of people that override it, including myself, but its not really a public pluggable API which it should be so that users don't have to hack it by being in the spark package. Ideally during making it public we would think of issues like this one to allow users to configure things different depending on what it supports. It should also come with some API guarantees, for versioning, like not breaking it unless absolutely necessary like we are doing here.

@attilapiros
Copy link
Contributor

@tgravescs I agree with you and I would be happy to work on the ideal solution. This is why I tried to push SPARK-31801 by copying Matthew Cheah's PR as #30763. But it got side tracked and become stale.
Based on this experience I am afraid the ideal solution is a bit far away in the future.

But right now we have this class at hand which can be used by those experimental shuffle solutions.
And I would not block them without having a timeline for the ideal solution.

I am sure all those solutions are useful for Spark as well as they are possible source of good feedbacks (especially for the ideal API) we just let them grow.

@mridulm
Copy link
Contributor

mridulm commented Dec 1, 2021

Agree with @tgravescs.
We are increasingly seeing external use/extensions of traits/classes explicitly marked private to spark, and changes to accommodate this pattern. This makes it more difficult to evolve the code - there is no explicit contract which we can check for while changing the code (MapOutputTracker is another example of this).

Pushing on @attilapiros's work, working to explicitly open up subsystems with well defined contracts should help with this - instead of continuing to rely on internals.

@yangwwei
Copy link
Contributor Author

yangwwei commented Dec 2, 2021

hi @tgravescs , @mridulm understand your concerns, but we need a solution to get this work for today's RSS. How about adding a config like:

  private[spark] val SHUFFLE_REGISTRATION_ENABLED =
    ConfigBuilder("spark.shuffle.registration.enabled")
      .doc("Enable the executors to register with the local external shuffle service. When " +
        "`spark.shuffle.service.enabled` is true and a local external shuffle service is used, " +
        "it must be set to true; if the local shuffle service is not usd, set this value to " +
        "false to skip the registration.")
      .version("3.3.0")
      .booleanConf
      .createWithDefault(true)

this way at least we have a config property to set for RSS, in order to skip the registration step currently being hardcoded.

@hiboyang
Copy link

hi @tgravescs , @mridulm understand your concerns, but we need a solution to get this work for today's RSS. How about adding a config like:

  private[spark] val SHUFFLE_REGISTRATION_ENABLED =
    ConfigBuilder("spark.shuffle.registration.enabled")
      .doc("Enable the executors to register with the local external shuffle service. When " +
        "`spark.shuffle.service.enabled` is true and a local external shuffle service is used, " +
        "it must be set to true; if the local shuffle service is not usd, set this value to " +
        "false to skip the registration.")
      .version("3.3.0")
      .booleanConf
      .createWithDefault(true)

this way at least we have a config property to set for RSS, in order to skip the registration step currently being hardcoded.

+1 for adding this config! This is a simple change, and will help to unblock various third party shuffle implementations/experiments.

@tgravescs
Copy link
Contributor

@attilapiros is there any active work on it now? I get that is a lot more work but at the same time if we keep putting it off for temporary hacks it won't get done.

if you look at it from just a Spark perspective, the API is not public, we would be adding an interface that is not used internally to Spark or a config that would not be used internally and is for a private api. Neither of those things are great API choices in my mind. I do get that shuffle is overridden by people though and that full features take time, which is why I'm a bit torn on this.

There is logic right above this check for external shuffle service as well that sets the shuffleServerID, what are you using for that port? Its used in mapStatus and register, I guess it doesn't matter in this case for map status. I'm not familiar with the details of Uber RSS, can it just fake up a remote shuffle port and ignore what is sent? have you requested their version to support dynamic allocation?

@hiboyang
Copy link

hiboyang commented Jan 7, 2022

@tgravescs I am the author of Uber RSS. In terms of your question "I'm not familiar with the details of Uber RSS, can it just fake up a remote shuffle port and ignore what is sent? have you requested their version to support dynamic allocation?"... Uber RSS could support dynamic allocation on Kubernetes by some work around, e.g enabling shuffle tracking feature and set shuffle timeout to be zero. But it is a hack. The code change here will make that hack unnecessary.

In terms of adding a config like "spark.shuffle.registration.enabled", any concern?

@tgravescs
Copy link
Contributor

tgravescs commented Jan 7, 2022

yes my concern is that its a config for something that isn't public. it doesn't make sense to me to have a public config for a non-public api and by itself without this 3rd party lib the config would not apply.

I don't think my question was answered (can't you just open a port on one of your remote shuffle services and ignore the messages) and it sounds like it does work now with some hack (is that hack just setup and configs)? If so then I would much rather prefer a real solution like was already mentioned above because you can make it work now.

@yangwwei
Copy link
Contributor Author

yangwwei commented Jan 7, 2022

hi @tgravescs thanks for getting back on this one : ).

I think @attilapiros already mentioned

But it got sidetracked and become stale.
Based on this experience I am afraid the ideal solution is a bit far away in the future.

@attilapiros please share your thoughts.

I understand this may not be the best/elegant solution, but it is simple and it works. I do not think hacking more things in the remote shuffle service side is better, this is a common problem that should be handled on the Spark side. If there is a better, actionable solution, I would love to explore more.

@tgravescs
Copy link
Contributor

If there is a better, actionable solution, I would love to explore more.

The ideal solution is what I've been saying, Spark actually makes a public pluggable interface for this. Your entire plugin could be consider a "hack" because it overrides a private Spark api. the shuffle manager config was initially created to switch between internal Spark shuffle manager implementations, not for external 3rd party implementations.

I assume spark is creating an ExternalBlockStoreClient for this if you have the external shuffle service enabled. That api assumes that you have implemented a certain api and registration is one of them. I guess you probably override the shuffle reader so maybe its not really used for much else. But then why are we creating a ExternalBlockStoreClient (For this case should there be some api change that doesn't create it at all.).. again goes back to making a proper interface for this.

I do understand that is a much bigger chunk of work, which is why I said I was on the fence about it. If it has enough impact and there is no other way around it perhaps we let it in short term. But at the same time if we keep hacking on this private interface to allow this or that and doing temporary fixes it will never get done properly. If its something you can do on the remote shuffle side already then it doesn't require yet another hack in Spark itself.

How are you currently hacking it to work now?

this is a common problem
Where else is this a problem?

@attilapiros
Copy link
Contributor

Sorry I was sick last week.
Currently the PR I mentioned is closed (as it become stale) and I would like to look into this problem a bit more: whether there is an easier way to go. Let me play with the code a bit.

@mridulm
Copy link
Contributor

mridulm commented Jan 10, 2022

Thanks for picking it up again @attilapiros.
That is the right path forward here, instead of depending on spark internals - agree with @tgravescs.

@yangwwei
Copy link
Contributor Author

Thank you all, @tgravescs , @attilapiros , @hiboyang , @mridulm
I had a conversation with @attilapiros , I will work with him and see if there will be any way to solve this in a better way. Will come back and update this thread once we find out more. Thank you all for the feedback and suggestions.

@attilapiros
Copy link
Contributor

Actually I need help from all of you.

I checked #30763 and the current code. And as I see now it is even harder to create an API interface because of some extra features integrated into master (like the pushed based shuffle).

So I thought about the problem (how we can get rid of the extra methods polluting generic interfaces) and came up with something. When the feature is controlled by a feature flag we are safe in runtime (these methods are not used).
My intention is to do minimal changes (in risk and size) regarding the Spark internal features (plugins in this sense are secondary). So my solution is to cast into specific implementations where this extra functionality needed.

I can show it via an example how I cleaned up ShuffleBlockResolver: #35180

If this fine we should continue with MapOutputTracker which I think should be part of Shuffle API with a meaningful base implementation. My plan to progress with baby steps to make the reviewers life easier.

@tgravescs, @mridulm, @yangwwei. @hiboyang: WDYT?

@yangwwei
Copy link
Contributor Author

I checked two open-source projects, Uber Zeus and Tencent Firestorm, they both did not implement ShuffleBlockResolver , see firestorm RssShuffleManager#shuffleBlockResolver(), and Zeus's RssShuffleBlockResolver. They have both implemented ShuffleManager, and their own shuffle read/writer. So if we start the refactoring from this level, that might not help, what do you think, @attilapiros ?

@attilapiros
Copy link
Contributor

ShuffleBlockResolver is part of the ShuffleManager and Zeus do/would implement it but because of the extra methods (getMergedBlockData, getMergedBlockMeta) it even won't compile.

We should cleanup all the interfaces available from ShuffleManager and open them.

@tgravescs
Copy link
Contributor

It does feel a bit odd that ShuffleBlockResolver doesn't really have any real apis but I get what you are going for here. They aren't necessarily needed for other implementations. Its weird now though too because ShuffleWriteProcessor would access the shuffle block resolver but its behind a feature flag and there is explicitly checks for specific IndexShuffleBlockResolver impl. Which really isn't ideal in the sense if someone wanted to implement a shuffle manager that supported shuffle merging, they have no way to tie in there.
I definitely think if we have basically 4 implementations of the shuffle we should make an api that fits and get something done rather then getting caught up in trying to cover all future cases.

so looking at other things the ExternalBlockStoreClient is not part of the shuffle manager, the intention there would be to keep like the api introduced in this PR? Just trying to go through what all changes may be required.

@attilapiros
Copy link
Contributor

I definitely think if we have basically 4 implementations of the shuffle we should make an api that fits and get something done rather then getting caught up in trying to cover all future cases.

In this case the most important part is MapStatus where filling out the blockmanger ID has some consequences and there is no place for arbitrary metadata.

so looking at other things the ExternalBlockStoreClient is not part of the shuffle manager, the intention there would be to keep like the api introduced in this PR?

I would keep it as it is.

As we already have same implementations of ShuffleManager I would keep the final opened API as close as possible.
This way we can easily use those project to test our solutions.

@hiboyang
Copy link

hiboyang commented Jan 13, 2022

Thanks folks for keeping working on this! Current ShuffleManager API is actually pretty good in Spark, agree we should make it as public API (maybe with small modification as discussed here).

ShuffleBlockResolver is tightly coupled with current open source Spark shuffle implementation to get shuffle blocks, other shuffle implementation may not need it, e.g., Uber's RSS (Zeus) does not need it. It will be nice to make ShuffleBlockResolver optional in ShuffleManager, or even remove it from public API.

@attilapiros
Copy link
Contributor

attilapiros commented Jan 21, 2022

Regarding storing and retrieving ofMapOutputMetadata here is where I am right now: attilapiros#4.

I am interested in your thoughts about this (before going further).

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-37394][CORE] Skip registering with ESS if a customized shuffle manager is configured [SPARK-37394][CORE] Skip registering with external shuffle server if a customized shuffle manager is configured Mar 9, 2022
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 29, 2022
@github-actions github-actions bot closed this Sep 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants