-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-1104] Adding support for UserDefinedPartitioners and SortModes to BulkInsert with Rows #2049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
d0f7b62 to
01fba00
Compare
01fba00 to
11512f9
Compare
11512f9 to
9b68e0e
Compare
Codecov Report
@@ Coverage Diff @@
## master #2049 +/- ##
============================================
+ Coverage 53.62% 53.68% +0.06%
- Complexity 2848 2850 +2
============================================
Files 359 359
Lines 16553 16574 +21
Branches 1780 1784 +4
============================================
+ Hits 8876 8898 +22
+ Misses 6918 6917 -1
Partials 759 759
Flags with carried forward coverage won't be shown. Click here to find out more. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like coalesce action is not based on the sorting result.
Here is an example, if I set outputSparkPartitions to 2, the partition column is event_type:
val df = Seq(
(100, "event_name_16", "2015-01-01T13:51:39.340396Z", "type1"),
(101, "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
(104, "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
(108, "event_name_18", "2015-01-01T11:51:33.340396Z", "type1"),
(109, "event_name_19", "2014-01-01T11:51:33.340396Z", "type3"),
(110, "event_name_20", "2014-02-01T11:51:33.340396Z", "type3"),
(105, "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
).toDF("event_id", "event_name", "event_ts", "event_type")
(Here I added a new column partitionID for better understanding) Based on the current logic, after sorting and coalesce, the df would become:
val df2 = df.sort(functions.col("event_type"), functions.col("event_id")).coalesce(2)
df2.withColumn("partitionID", spark_partition_id).show(false)
+--------+--------------+---------------------------+----------+-----------+
|event_id|event_name |event_ts |event_type|partitionID|
+--------+--------------+---------------------------+----------+-----------+
|100 |event_name_16 |2015-01-01T13:51:39.340396Z|type1 |0 |
|108 |event_name_18 |2015-01-01T11:51:33.340396Z|type1 |0 |
|105 |event_name_678|2015-01-01T13:51:42.248818Z|type2 |0 |
|110 |event_name_20 |2014-02-01T11:51:33.340396Z|type3 |0 |
|104 |event_name_123|2015-01-01T12:15:00.512679Z|type1 |1 |
|101 |event_name_546|2015-01-01T12:14:58.597216Z|type2 |1 |
|109 |event_name_19 |2014-01-01T11:51:33.340396Z|type3 |1 |
+--------+--------------+---------------------------+----------+-----------+
You can see the coalescing result actually does not depend on the sorting result. Each spark partition id contains 3 types of Hudi partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for bringing it up. wanted to avoid the shuffle and hence thought will rely on coalesce. let me see if there is something we could do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not introduced a new config for this, but reusing the same used for BulkInsertPartitioner.
Open to adding a new config if required.
a40300a to
3692d9b
Compare
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a note.
Codecov Report
@@ Coverage Diff @@
## master #2049 +/- ##
=============================================
- Coverage 45.79% 28.30% -17.49%
+ Complexity 5270 1233 -4037
=============================================
Files 909 372 -537
Lines 39390 13988 -25402
Branches 4244 1426 -2818
=============================================
- Hits 18039 3960 -14079
+ Misses 19508 9741 -9767
+ Partials 1843 287 -1556
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
…r to bulk insert of Rows
3692d9b to
7f80674
Compare
|
Changes in last commit is not yet ready for review. Pushed it to access it from a diff place. |
|
Closing this in favor of #3149 |
What is the purpose of the pull request
Brief change log
Verify this pull request
This change added tests and can be verified as follows:
Committer checklist
[ x] Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.