-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33779][SQL] DataSource V2: API to request distribution and ordering on write #30706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc81501 to
bbf7a00
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only interface I marked as Evolving. All other interfaces are marked as Experimental.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunchao, I thought abou defaulting these implementations as build().toBatch() but it will introduce a circular dependency between these methods. Data sources may only implement buildForBatch, for example. Right now, calling buildForStreaming will produce an exception for them. If we default, this will result in a stackoverflow exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll also need to change WriteBuilder.build to not override toBatch and toStreaming? such as
default Write build() {
return new Write() {
};
}with this, Spark users just need to override build method in their DS implementations and the old APIs buildForBatch and buildForStreaming will automatically pick up the logic in the other methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunchao, I think because of the circular call that @aokolnychyi pointed out, we can either provide a default for build or a default for buildForBatch / buildForStreaming. Anton's approach throws an exception in the old methods, while your approach throws an exception in the new methods.
I think Anton's approach is the right one because it provides backward-compatibility for sources that currently implement the old methods (buildForBatch). Those sources are still compatible without being modified. If we removed the implementation of toBatch then we would not be able to call build unless the source supported it.
Compatibility with existing code is the more important concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I overlooked the compatibility issue. Sorry for the wrong suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to deprecate these.
|
Test build #132564 has finished for PR 30706 at commit
|
|
@sunchao, I added |
bbf7a00 to
0e4752c
Compare
|
Test build #132565 has finished for PR 30706 at commit
|
0e4752c to
cc19fd6
Compare
|
Test build #132569 has finished for PR 30706 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
None of the failed tests look related to this PR. @dongjoon-hyun, are these tests known to be flaky? |
|
|
If possible, it'd be nice to have an individual issue per each PR. Otherwise it's hard to determine whether SPARK-23889 is completed or how many tasks are left on completion. That said, SPARK-23889 can be out of sub-issue now. |
|
@HeartSaVioR, I wanted to create sub-tasks under SPARK-23889 but it is already a sub-task. Then I realized the scope of that JIRA is adding interfaces which is what this PR does. I am fine with either converting SPARK-23889 to a standalone issue and creating sub tasks or keeping the scope of it to interfaces and adding more JIRAs for subsequent items. |
|
Any approach would be OK. Just a general comment that it would be nice to associate each part with each JIRA issue. Further JIRA issues don't need to be sub-task of SPARK-23889 - it's also OK to create under SPARK-22386. |
|
@aokolnychyi, could you fix the issue reference? Once that's done, I think this is ready. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @aokolnychyi and all.
I also support this feature as a part of Apache Spark 3.2.0.
cc @gatorsmile , @cloud-fan , @HyukjinKwon , @gengliangwang , @maryannxue
|
I created SPARK-33779 under SPARK-22386 and linked to the old one. We may want to rename the old issue into something more generic but I don't have right for that. |
|
Merged to master. Thanks for fixing the issue reference, @aokolnychyi! Thanks for the reviews, @sunchao, @HeartSaVioR, @viirya, @dongjoon-hyun! |
|
Thank you so much, @aokolnychyi and @rdblue ! |
|
Thanks for the amazing contribution! I really look forward to the following PR - I have some data source which requires hash partitioning, and due to the lack of functionality it's still DSv1. (Both reader and writer are DSv1 in my repo, and I've proposed the reader PR with new DSv2 in Spark repo.) I hope I'll be able to craft the writer to new DSv2 as well. |
|
I didn't take a very close look but I am happy that we're making improvement here. Thanks for working on this @aokolnychyi. |
| * @since 3.2.0 | ||
| */ | ||
| @Experimental | ||
| public interface ClusteredDistribution extends Distribution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution, shall we remove that and always use this new one? The new one is better as it's more flexible (using Expression rather than String).
It's better if we can avoid breaking existing APIs, but I can't figure out a good way. The old ClusteredDistribution is only used in a mixin trait and marked as Evolving. Seems OK to break it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @aokolnychyi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have started this discussion in comments to the original PR a bit. Some background here and here.
Overall, there are two options: break the API now and use new interfaces instead of the old ones in other places or evolve these two separately for a while and replace the read side once we have a clear plan for bucketed joins.
My first idea was to break the API and migrate to the new interfaces like suggested by @cloud-fan as that seems inevitable at some point. However, there were a couple of good points mentioned by reviewers on the original PR and I am now inclined to evolve these separately for a while until we know what changes will be required to support bucketed tables. I would like to avoid breaking the read side twice if possible.
That said, I don't feel strongly here and each option has its own benefits and drawbacks. I'll be fine with either one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also cc @viirya @HeartSaVioR @rdblue @dongjoon-hyun @sunchao who reviewed the change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK to break it later, just don't leave the old one there forever. Shall we create a blocker ticket for Spark 3.2 to remove the old ClusteredDistribution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a blocker SPARK-33807 to track this.
Thanks, everyone!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @aokolnychyi , do you have time to clean it up now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Partitioning.satisfy is still referring to the old interface I'm wondering whether we should defer this to 3.3.0 timeframe. For my storage partitioned join work I found that I need to change the Partitioning interface again in a backward incompatible way so that it can work for joins. It may be better to break it once instead of two times. That's my 2 cents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let's retarget it to 3.3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for my late reply. I also assumed we don't have any clarity on how the Partitioning interface should look like so I think delaying the update and breaking that API once is better.
|
Great work! |
| * Spark will order incoming records within partitions to satisfy the required ordering | ||
| * before passing those records to the data source table on write. | ||
| * <p> | ||
| * Implementations may return an empty array if they don't require any specific ordering of data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon for the late comment.
The return value is an array.
It would be better to illustrate when more than one SortOrder would be returned (and how they are used).
…ering on write ### What changes were proposed in this pull request? This PR adds connector interfaces proposed in the [design doc](https://docs.google.com/document/d/1X0NsQSryvNmXBY9kcvfINeYyKC-AahZarUqg3nS1GQs/edit#) for SPARK-23889. **Note**: This PR contains a subset of changes discussed in PR apache#29066. ### Why are the changes needed? Data sources should be able to request a specific distribution and ordering of data on write. In particular, these scenarios are considered useful: - global sort - cluster data and sort within partitions - local sort within partitions - no sort Please see the design doc above for a more detailed explanation of requirements. ### Does this PR introduce _any_ user-facing change? This PR introduces public changes to the DS V2 by adding a logical write abstraction as we have on the read path as well as additional interfaces to represent distribution and ordering of data (please see the doc for more info). The existing `Distribution` interface in `read` package is read-specific and not flexible enough like discussed in the design doc. The current proposal is to evolve these interfaces separately until they converge. ### How was this patch tested? This patch adds only interfaces. Closes apache#30706 from aokolnychyi/spark-23889-interfaces. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Ryan Blue <[email protected]> (cherry picked from commit 82aca7e) Signed-off-by: Dongjoon Hyun <[email protected]>
…required interfaces to `Evolving` ### What changes were proposed in this pull request? This PR aims to promote `RequiresDistributionAndOrdering` and its required interfaces to `Evolving` from `Experimental`. ### Why are the changes needed? Since Apache Spark 3.2.0, `RequiresDistributionAndOrdering` and its required interfaces have been served stably over 5 years like the following. In Apache Spark 4.2.0, we had better promote this to `Evolving` because these are no longer `Experimental`. - #30706 | Interface | Description | | - | - | | org.apache.spark.sql.connector.distributions.ClusteredDistribution | Unchange | | org.apache.spark.sql.connector.distributions.Distribution | Unchange | | org.apache.spark.sql.connector.distributions.Distributions | Unchange | | org.apache.spark.sql.connector.distributions.OrderedDistribution | Unchange | | org.apache.spark.sql.connector.distributions.UnspecifiedDistribution | Unchange | | org.apache.spark.sql.connector.expressions.NullOrdering | No Functional Change | | org.apache.spark.sql.connector.expressions.SortDirection | No Functional Change | | org.apache.spark.sql.connector.expressions.SortOrder | Stably Evolving | | org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering | Stably Evolving | ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs and manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53361 from dongjoon-hyun/SPARK-54622. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…required interfaces to `Evolving` ### What changes were proposed in this pull request? This PR aims to promote `RequiresDistributionAndOrdering` and its required interfaces to `Evolving` from `Experimental`. ### Why are the changes needed? Since Apache Spark 3.2.0, `RequiresDistributionAndOrdering` and its required interfaces have been served stably over 5 years like the following. In Apache Spark 4.2.0, we had better promote this to `Evolving` because these are no longer `Experimental`. - apache#30706 | Interface | Description | | - | - | | org.apache.spark.sql.connector.distributions.ClusteredDistribution | Unchange | | org.apache.spark.sql.connector.distributions.Distribution | Unchange | | org.apache.spark.sql.connector.distributions.Distributions | Unchange | | org.apache.spark.sql.connector.distributions.OrderedDistribution | Unchange | | org.apache.spark.sql.connector.distributions.UnspecifiedDistribution | Unchange | | org.apache.spark.sql.connector.expressions.NullOrdering | No Functional Change | | org.apache.spark.sql.connector.expressions.SortDirection | No Functional Change | | org.apache.spark.sql.connector.expressions.SortOrder | Stably Evolving | | org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering | Stably Evolving | ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs and manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53361 from dongjoon-hyun/SPARK-54622. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This PR adds connector interfaces proposed in the design doc for SPARK-23889.
Note: This PR contains a subset of changes discussed in PR #29066.
Why are the changes needed?
Data sources should be able to request a specific distribution and ordering of data on write. In particular, these scenarios are considered useful:
Please see the design doc above for a more detailed explanation of requirements.
Does this PR introduce any user-facing change?
This PR introduces public changes to the DS V2 by adding a logical write abstraction as we have on the read path as well as additional interfaces to represent distribution and ordering of data (please see the doc for more info).
The existing
Distributioninterface inreadpackage is read-specific and not flexible enough like discussed in the design doc. The current proposal is to evolve these interfaces separately until they converge.How was this patch tested?
This patch adds only interfaces.