Skip to content

Conversation

@hiboyang
Copy link

@hiboyang hiboyang commented Mar 3, 2021

What changes were proposed in this pull request?

Add Spark config "spark.shuffle.markFileLostOnExecutorLost" to not delete shuffle file on executor lost event

JIRA: https://issues.apache.org/jira/browse/SPARK-34601

Why are the changes needed?

There are multiple work going on with disaggregated/remote shuffle service (e.g. LinkedIn shuffle, Facebook shuffle service, Uber shuffle service). Such remote shuffle service is not Spark External Shuffle Service. It could be third party shuffle solution and user uses it by setting spark.shuffle.manager. In those systems, shuffle data will be stored on different server other than executor. Spark should not mark shuffle data lost when the executor is lost. We could add a Spark configuration to control this behavior. By default, Spark still mark shuffle file lost. For disaggregated/remote shuffle service, people could set the configure to not mark shuffle file lost.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Run Spark unit test

@github-actions github-actions bot added the CORE label Mar 3, 2021
@dbtsai
Copy link
Member

dbtsai commented Mar 3, 2021

Jenkins, add to whitelist.

@dbtsai
Copy link
Member

dbtsai commented Mar 3, 2021

cc @dongjoon-hyun @holdenk @viirya for reviews.

@dbtsai
Copy link
Member

dbtsai commented Mar 3, 2021

cc @attilapiros @HyukjinKwon as well.

@attilapiros
Copy link
Contributor

attilapiros commented Mar 3, 2021

I do not think this flag (indicating whether to unregister all the shuffle blocks created by the failed executor) must be set at the application level (the way this PR does).

I think this must be a property kept for the replicated shuffle block so somewhere close to MapStatus.

Imagine a mixed solution where the disaggregated storage just a fallback and still locally stored blocks can be used.
Then it make sense to unregister the shuffle blocks for the failed executors and leave the disaggregated storage access for the blocks intact.

@hiboyang
Copy link
Author

hiboyang commented Mar 3, 2021

I do not think this flag (indicating whether to unregister all the shuffle blocks created by the failed executor) must be set at the application level (the way this PR does).

I think this must be a property kept for the replicated shuffle block so somewhere close to MapStatus.

Imagine a mixed solution where the disaggregated storage just a fallback and still locally stored blocks can be used.
Then it make sense to unregister the shuffle blocks for the failed executors and leave the disaggregated storage access for the blocks intact.

Yeah, if user uses a mixed solution where the disaggregated storage just a fallback, it is better to have MapStatus to track availability of shuffle blocks. This PR does not block this scenario.

There is non-mixed solution where people totally rely on remote shuffle service (like Facebook/Uber's one). In that case, people only need to set this flag in the application level. This PR will help this scenario.

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40302/

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40302/

Comment on lines 2123 to 2124
.doc("Mark shuffle file lost when executor is lost. People could set this to false when " +
"using remote shuffle services because the shuffle file is not stored on the executor.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What about if we reworded this to something like:
"Mark shuffle files as lost when an executor is lost. If you are using a remote shuffle service such that shuffle files are not stored on the executor consider setting this to false."

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good suggestion, will do the rewording, thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk I updated the wording just now.

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @dbtsai .

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Test build #135720 has finished for PR 31715 at commit ddb6d8f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private[spark] val MARK_FILE_LOST_ON_EXECUTOR_LOST =
ConfigBuilder("spark.shuffle.markFileLostOnExecutorLost")
.doc("Mark shuffle files as lost when an executor is lost. If you are using a remote " +
"shuffle service such that shuffle files are not stored on the executor, consider " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well. In this context, remote shuffle service is not external shuffle service in Apache Spark because we are ignoring env.blockManager.externalShuffleServiceEnabled here. We have better be clear on this.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution, @hiboyang .

  1. Could you update the documentation together in this PR because this configuration can be misleading to the other config sets?
  2. Could you provide a unit test for this new feature please? Without test coverage, your new feature is not protected at all in the future release.

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40305/

@viirya
Copy link
Member

viirya commented Mar 4, 2021

I'd tend to more agree with @attilapiros on the point. It sounds not very proper and error-prone to let users control the behavior. I think more ideal approach is to let Spark decide to if the shuffle output should be unregistered when the executor is going to be removed.

@hiboyang
Copy link
Author

hiboyang commented Mar 4, 2021

Thank you for your contribution, @hiboyang .

  1. Could you update the documentation together in this PR because this configuration can be misleading to the other config sets?
  2. Could you provide a unit test for this new feature please? Without test coverage, your new feature is not protected at all in the future release.

Good suggestions, will do!

@hiboyang
Copy link
Author

hiboyang commented Mar 4, 2021

I'd tend to more agree with @attilapiros on the point. It sounds not very proper and error-prone to let users control the behavior. I think more ideal approach is to let Spark decide to if the shuffle output should be unregistered when the executor is going to be removed.

The usage here is user decides to use Remote Shuffle Service (e.g. Facebook/Uber's) and sets spark.shuffle.manager with a customized class which supports that Remote Shuffle Service. Then they will set spark.shuffle.markFileLostOnExecutorLost to avoid marking shuffle file lost when executor is lost. In other scenarios, user will not set spark.shuffle.markFileLostOnExecutorLost. Would you clarify the "error-prone" concern here?

I am going to update the comment for this config to something like that to help users understand more:
Mark shuffle files as lost when an executor is lost. If you set spark.shuffle.manager with a customized class to use a different shuffle implementation like storing shuffle files on remote storage/server or using third party remote shuffle service, and you are sure the shuffle files are not stored on the executor, consider setting this to false.

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40305/

@viirya
Copy link
Member

viirya commented Mar 4, 2021

Users may need to set up this application config differently across different solutions, e.g. external shuffle service, built-in shuffle service, remote shuffle service, mixed solution, etc. This is somehow low-level Spark behavior, and I'm suspicious it is good to expose it to end users and let them decide the config. It sounds easy to set a improper value.

Can Spark decide to unregister shuffle output automatically? Like based on which shuffle manager is used for shuffle output? or like @attilapiros's idea, to have a property somewhere close to MapStatus?

@hiboyang
Copy link
Author

hiboyang commented Mar 4, 2021

Users may need to set up this application config differently across different solutions, e.g. external shuffle service, built-in shuffle service, remote shuffle service, mixed solution, etc. This is somehow low-level Spark behavior, and I'm suspicious it is good to expose it to end users and let them decide the config. It sounds easy to set a improper value.

Can Spark decide to unregister shuffle output automatically? Like based on which shuffle manager is used for shuffle output? or like @attilapiros's idea, to have a property somewhere close to MapStatus?

If Spark decides to unregister shuffle output based on which shuffle manager is used, that requires Spark has knowledge about different shuffle manager implementation. It is hard to implement because user could set any shuffle manager implementation by spark.shuffle.manager.

In terms of "Users may need to set up this application config differently across different solutions", yes, this is the purpose. There are many shuffle solutions as you listed. Current Spark design is pretty good, allowing user to set spark.shuffle.manager with customized class to choose different solution. However, it assumes shuffle file always lost when executor is lost. This assumption conflicts with the customizable shuffle manager design. The new config spark.shuffle.markFileLostOnExecutorLost is to keep that assumption by default, but gives user the option to choose different solution when needed.

@hiboyang
Copy link
Author

hiboyang commented Mar 4, 2021

Thank you for your contribution, @hiboyang .

  1. Could you update the documentation together in this PR because this configuration can be misleading to the other config sets?
  2. Could you provide a unit test for this new feature please? Without test coverage, your new feature is not protected at all in the future release.

Hi @dongjoon-hyun, thanks for the suggestion! I updated the documentation and added a unit test :)

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Test build #135723 has finished for PR 31715 at commit de2d2b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.createWithDefaultString("3m")

private[spark] val MARK_FILE_LOST_ON_EXECUTOR_LOST =
ConfigBuilder("spark.shuffle.markFileLostOnExecutorLost")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am having similar concern with @viirya and @attilapiros . I think we should not make it as a user-facing config. If we would like introducing a config for this anyway, it'd better to start with internal() config, but not user-facing.

Though spark.shuffle.manager can be arbitrary class, but in practice, they are just a handful of implementation in one company's environment, and ideally the infra developers should control which implementation to use, instead of user to control this. Similarly for whether to mark shuffle file lost should be controlled by developers team but not users.

Just share some context, in FB, we (spark developer) control these kinds of behavior transparently and make this invisible to spark user. This also helps us to migrate to newer implementation easier without worrying about users setting wrong config. The mixed case (query uses customized shuffle service and default shuffle service) can happen quite a bit in production, as we have rate limit for traffic on customized service, and need fallback.

Copy link
Author

@hiboyang hiboyang Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @c21 , thanks for the comment, and known you from FB! We previously discussed with people from FB about Cosco, but not for this specific issue. Just curious, do you guys modify your internal Spark distribution to make executor lost event not trigger driver to delete map output?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you guys modify your internal Spark distribution to make executor lost event not trigger driver to delete map output?

Yeah I think so, I can do a double check with cosco folks as well tomorrow. btw nice to know you as well!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, that will be interested to know, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@c21 May I ask what is the status of open sourcing Cosco?
I just think it might help us to make better decisions about the Shuffle plugin API of Spark if we could look into that one too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros - unfortunately there's no specific ETA now, but the team is working on it.

// from a Standalone cluster, where the shuffle service lives in the Worker.)
val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
val fileLost = (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) &&
env.blockManager.markFileLostOnExecutorLost
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we check this config to not take effect, if spark.shuffle.manager is sort (the default one)?

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40308/

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40308/

@viirya
Copy link
Member

viirya commented Mar 4, 2021

Then how about a property somewhere close to MapStatus? I guess this config will become sort of mysterious config. As Spark, users also might not have knowledge about how to set this? In Spark, it should know where the shuffle output is kept. So ideally Spark should know if the shuffle output should be unregistered or not. I just don't know if currently Spark provides necessary stuffs for it.

Under mixed solution, this config is also hard to set properly. Should it be set to true or false if the shuffle output could be kept either in fallback storage and executor?

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Test build #135726 has finished for PR 31715 at commit 1418e8f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hiboyang
Copy link
Author

hiboyang commented Mar 4, 2021

Then how about a property somewhere close to MapStatus? I guess this config will become sort of mysterious config. As Spark, users also might not have knowledge about how to set this? In Spark, it should know where the shuffle output is kept. So ideally Spark should know if the shuffle output should be unregistered or not. I just don't know if currently Spark provides necessary stuffs for it.

Under mixed solution, this config is also hard to set properly. Should it be set to true or false if the shuffle output could be kept either in fallback storage and executor?

Yeah, I did some further looking following your suggestion. It looks Spark already checks and matches the executor id when it tries to remove map output, like following code. And it could already work well with customized shuffle manager and thrid party remote shuffle service.

  def removeOutputsOnExecutor(execId: String): Unit = withWriteLock {
    logDebug(s"Removing outputs for execId ${execId}")
    removeOutputsByFilter(x => x.executorId == execId)
  }

Turns out I do not need to add this markFileLostOnExecutorLost configure any more :) Thanks again for your comments!

I will close this PR (after checking other comments).

@dongjoon-hyun
Copy link
Member

Thank you for your decision, @hiboyang .

@attilapiros
Copy link
Contributor

Sorry I was away from the computer but I agree we can close this.

@attilapiros
Copy link
Contributor

Just checked Uber RSS (as I know that is open source and once I read its source at a level) where the location is a dummy block manager and the topology info holds the RSS servers list:

https://github.com/uber/RemoteShuffleService/blob/master/src/main/scala/org/apache/spark/shuffle/rss/RssUtils.scala#L56

But I know you know this already ;)

@dongjoon-hyun
Copy link
Member

Thank you all! I'm closing this now.
We can reopen this if there is a new decision.

@viirya
Copy link
Member

viirya commented Mar 5, 2021

Thanks @hiboyang @dongjoon-hyun. Yea, we can reopen this if we find it is necessary.

@hiboyang
Copy link
Author

hiboyang commented Mar 6, 2021

Just checked Uber RSS (as I know that is open source and once I read its source at a level) where the location is a dummy block manager and the topology info holds the RSS servers list:

https://github.com/uber/RemoteShuffleService/blob/master/src/main/scala/org/apache/spark/shuffle/rss/RssUtils.scala#L56

But I know you know this already ;)

Yes, I added there :) That was kind of a work around to make it work with open source Spark. Still waiting for another PR "Adding metadata to MapStatus" gets reviewed. That could provide a better solution for embedding RSS servers list inside MapStatus.

@attilapiros
Copy link
Contributor

Still waiting for another PR "Adding metadata to MapStatus" gets reviewed.

Ok, I can help on that :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants