Skip to content

Conversation

@attilapiros
Copy link
Contributor

This is a copy of #28618 but merged with the current master resolving all the merge conflicts.
All the credit goes to @mccheah I just would like to help out here and avoid his progress to be lost.

What changes were proposed in this pull request?

Adds a ShuffleOutputTracker API that can be used for managing shuffle metadata on the driver. Accepts map output metadata returned by the map output writers.

Requires #28616.

Why are the changes needed?

Part of the design as discussed in this document, and part of the wider effort of SPARK-25299.

Does this PR introduce any user-facing change?

Enables additional APIs for the shuffle storage plugin tree. Usage will become more apparent when the read side of the shuffle plugin tree is introduced.

How was this patch tested?

We've added a mock implementation of the shuffle plugin tree here, to prove that a Spark job using a different implementation of the plugin can use all of the plugin points for an alternative shuffle data storage solution. But we don't include it here, in order to minimize the diff and the code to review in this specific patch. See #28902.

@SparkQA
Copy link

SparkQA commented Dec 14, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37373/

@SparkQA
Copy link

SparkQA commented Dec 14, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37373/

@SparkQA
Copy link

SparkQA commented Dec 14, 2020

Test build #132771 has finished for PR 30763 at commit 5cb3e9d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39154/

@SparkQA
Copy link

SparkQA commented Jan 27, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39154/

@SparkQA
Copy link

SparkQA commented Jan 27, 2021

Test build #134568 has finished for PR 30763 at commit b350258.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40438/

@SparkQA
Copy link

SparkQA commented Mar 8, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40438/

@SparkQA
Copy link

SparkQA commented Mar 8, 2021

Test build #135856 has finished for PR 30763 at commit abcf8f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor Author

Failure is totally unrealted:

  • org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceWithAdminSuite.subscribing topic by pattern from latest offsets (failOnDataLoss: false)

@attilapiros
Copy link
Contributor Author

Let me move the mima excludes from 3.1.x to 3.2.x.

@attilapiros
Copy link
Contributor Author

@Ngone51

Regarding cutting this to smaller pieces I can identify two potential sub-PRs:

  • introduction of MapTaskResult
  • introduction of ShuffleOutputTracker

I can do this cut if you think it is really needed and if you agree with the content of the sub-PRs.

@SparkQA
Copy link

SparkQA commented Mar 8, 2021

Test build #135864 has finished for PR 30763 at commit 8e54b41.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @attilapiros @hiboyang I'd like to discuss more about the way to register the metadata before we consider how to split this PR.

I actually have a different idea about this. As mentioned shortly in @hiboyang 's PR, I'd rather redesign the location of MapStatus:

private[spark] sealed trait MapStatus {
/** Location where this task output is. */
def location: BlockManagerId

If we want to introduce custom storage, I think the location should be abstracted to represent different storages, e.g., we can introduce class Location. And the Location would have some common attributes, e.g., type. And we can add metadata as an interface to Location to provide arbitrary infos. Then, BlockManagerId would be a native implementation for Spark and users could implement Location to support custom storage.

Also, this way wouldn't change the existing framework of shuffle read/write. It'd allow us to be able to reuse the features and code paths without extra effort.

WDYT?

@attilapiros
Copy link
Contributor Author

attilapiros commented Mar 8, 2021

. Then, BlockManagerId would be a native implementation for Spark and users could implement Location to support custom storage.

To test the idea I try to come up with hard situations but this does not mean I am against the idea.

So if I understand correctly BlockManagerId would extend the Location class, right?
And here MapStatus#location would be a generic Location?

In this case we should check the references of this MapStatus#location and based on that decide where we are safe to cast Location to BlockManagerId or where else we would pass the location further as a Location (or at least what else the generic location should contain to have the existing things working...).

As the current reader uses MapOutputTracker#getMapSizesByExecutorId you would like to keep that method and runtime throw an exception when it's called and location is not BlockManagerId? This is a central method to get blocksByAddress for fetching in the Spark shuffle.

For example as I see MapOutputTracker is tailored to satisfy the current shuffle solution. This should be checked for the idea.

On the other hand write side might be easier as there MapStatus is filled with the id of the current block manager. So a new writer implementation just uses its location.

But for the read side my worry is having runtime checks/assert/guards to enforce when allowed to use what.

@attilapiros
Copy link
Contributor Author

I still think the location abstraction is good idea. I just have my doubts about the amount of the efforts we need to do:

Also, this way wouldn't change the existing framework of shuffle read/write. It'd allow us to be able to reuse the features and code paths without extra effort.

@Ngone51
Copy link
Member

Ngone51 commented Mar 8, 2021

So if I understand correctly BlockManagerId would extend the Location class, right?
And here MapStatus#location would be a generic Location?

Yes

As the current reader uses MapOutputTracker#getMapSizesByExecutorId you would like to keep that method and runtime throw an exception when it's called and location is not BlockManagerId?

We don't. Actually, MapOutputTracker should be refactored to work with the Location instead of the specific BlockManagerId if Location introduced. Accordingly, blocksByAddress would be refactored to store the unique "address" generated by Localtion. That also means we'd always keep the generic Location inside ShuffleBlockFetchIterator instead of a specific Location, so we don't need casting.

I think it also answers this question:

In this case we should check the references of this MapStatus#location and based on that decide where we are safe to cast Location to BlockManagerId or where else we would pass the location further as a Location (or at least what else the generic location should contain to have the existing things working...).

Acutally, I only find one reference that need cast:

val execId = status.location.executorId

And yes the custom reader should care more about casting. They should definitely cast the generic Location to their implemented one if they want to get the specific information. But the casting should always succeed because Spark would only use one type storage at a time.

@Ngone51
Copy link
Member

Ngone51 commented Mar 8, 2021

I just have my doubts about the amount of the efforts we need to do

As far as I see,

  1. in @hiboyang 's PR, he added getAllMapOutputStatusMetadata in MapOutputTracker. IIUC, this must need corresponding change to handle the metadata at the reader side, which would be a new code path.

  2. in this PR, I see we added ShuffleOutputTracker, which is very similar to MapOutputTracker. And, MapOutputTracker has add a new interface - updateMapOutput to support node decommission recently. But ShuffleOutputTracker doesn't have it. Do we want to support decommission for custom storages too or only specific to the BlockManager? In the way of ShuffleOutputTracker, I think we must need extra effort if we want to support it in custom storages. However, if we have the generic Location, we can reuse MapOutputTracker directly.

@Ngone51
Copy link
Member

Ngone51 commented Mar 8, 2021

BTW, I'm thinking we still need to think carefully about the Location solution. I worry we'd overengineering if the use cases don't require such flexibility. Because I can imagine how Location would widely refactor the code base, especially for those central parts. So I hope we'd discuss more use cases if you think the idea is generally good.

@hiboyang
Copy link

Just see the discussion here. The location abstraction is a good idea. For different shuffle solutions, they could have different location implementation, e.g. Spark's default sort shuffle has BlockManagerId as the location, remote shuffle service has shuffle servers as the location, disaggregated shuffle storage (e.g. S3) has S3 bucket/path as the location.

MapOutputTracker#getMapSizesByExecutorId may not need to throw exception? It could return a list of Locations and sizes.

@hiboyang
Copy link

@Ngone51 @attilapiros do we want to proceed with the location idea?

@Ngone51
Copy link
Member

Ngone51 commented Mar 17, 2021

I'm waiting for @attilapiros 's feedback.

@attilapiros
Copy link
Contributor Author

We are all agree more abstraction here is really a good idea and reading #30763 (comment) gives me the impression we both worry about the impact of the change but as I see you have solutions for all the concerns: #30763 (comment).

@Ngone51 I am fine if you proceed and when it is ready we can see the real price of this change.

@hiboyang
Copy link

We are all agree more abstraction here is really a good idea and reading #30763 (comment) gives me the impression we both worry about the impact of the change but as I see you have solutions for all the concerns: #30763 (comment).

@Ngone51 I am fine if you proceed and when it is ready we can see the real price of this change.

+1

@Ngone51
Copy link
Member

Ngone51 commented Mar 17, 2021

Sure, I'll give a try these days.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants