-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-5857] Insert overwrite into bucket table would generate new file group id #8072
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-5857] Insert overwrite into bucket table would generate new file group id #8072
Conversation
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
Outdated
Show resolved
Hide resolved
|
Thanks @beyond1920 overall looks good. |
KnightChess
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about remove tagLocation when use insert overwrite op, like #8073 , I think this can also solve this quesion too. And the scenario in insert overwrite I think no need to tagLocation, right?
| protected Partitioner getPartitioner(WorkloadProfile profile) { | ||
| return table.getStorageLayout().layoutPartitionerClass() | ||
| .map(c -> getLayoutPartitioner(profile, c)) | ||
| .map(c -> c.equals(HoodieLayoutConfig.SIMPLE_BUCKET_LAYOUT_PARTITIONER_CLASS_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does consistentBucketIndex will not cause the same problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, consistentBucketIndex works correctly, it would generate different file ids.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@KnightChess Thanks for your advice.
Remove tagLocation could also fixed this problem. However I prefer to fix this problem by generate new file ids because:
- Remove tag location would change stats, for example, miss updated count
- It's better to keep same behavior for all index types instead of only remove tag location in insert overwrite for bucket index table.
But remove tag location is a good improvement to speed up insert overwrite. I would created a new JIRA to track this issue. Maybe using bulk insert to do insert overwrite for all index typed. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beyond1920 I read consistentBucketIndex implementation, found it must tag incomming records to allocation fgId, so #8073 will cause some quesion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, consistentBucketIndex works correctly, it would generate different file ids.
consistentBucketIndex can not work correctly, change the ut case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emm, sorry for the hurry response before.
Thank you for point it out.
I need to spend more time to get familiar with ConsistentBucketIndex. I would response ASAP.
| } | ||
|
|
||
| test("Test Insert Overwrite") { | ||
| test("Test Insert Overwrite for bucket ") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add test for consistentBucketIndex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsistentBucketIndex works correctly, it would generate different file ids.
However, I add the test cases for consitentBucketIndex too.
9a4747e to
5fa9c60
Compare
| // Insert overwrite static partition | ||
| spark.sql( | ||
| s""" | ||
| | insert overwrite table $tableName partition(dt = '2021-01-05') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will create a new parquet file with the same prefix against log file, but something diff in fgId suffix. just like the picture, create new parquet file will add -0 after fgId(xxx-0-0_xxx), so it can be read if only insert overwrite onece, but if insert overwrite again, will use the same fgId(xxx-0-0), result nothing.

5fa9c60 to
d4e9858
Compare
XuQianJin-Stars
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
d4e9858 to
d0099b9
Compare
...di-client-common/src/main/java/org/apache/hudi/exception/HoodieInsertOverwriteException.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
Outdated
Show resolved
Hide resolved
| case CONSISTENT_HASHING: | ||
| return new SparkInsertOverwriteConsistentBucketIndexPartitioner(profile, context, table, config); | ||
| default: | ||
| throw new HoodieNotSupportedException("Unknown bucket index engine type: " + config.getBucketIndexEngineType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we inline all the different handling for getBucketInfo into SparkInsertOverwritePartitioner ? Let's make the code cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I move part of them which related to ConsistentBucketIndex to SparkInsertOverwritePartitioner.
And I left other part which related to SimpleBucketIndex in SparkBucketIndexInsertOverwritePartitioner.
Because SimpleBucketIndex and ConsistentBucketIndex are different when creates new BucketInfo.
| return handleInsert(binfo.fileIdPrefix, recordItr); | ||
| } else if (btype.equals(BucketType.UPDATE)) { | ||
| throw new HoodieInsertOverwriteException( | ||
| "Insert overwrite should always use INSERT bucketType, please correct the logical of " + partitioner.getClass().getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case we can hit the code path for BucketType.UPDATE ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a protected code to prevent hit this bug again when introduce new partitioner class in the future.
d0099b9 to
a76dc55
Compare
...lient/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public BucketInfo getBucketInfo(int bucketNumber) { | ||
| String partitionPath = partitionPaths.get(bucketNumber / numBuckets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In HoodieWriteConfig, we can fetch the operation then decides whether it is INSERT_OVERWRITE, then the logic can be moved into SparkBucketIndexPartitioner.
2015eb0 to
7267340
Compare
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, we are good to land once the CI is green
…rt overwrite behavior
7267340 to
f550344
Compare
|
@hudi-bot run azure |
Change Logs
Snapshot query result is wrong after apply insert overwrite to an existed table with simple bucket index.
see detailed in HUDI-5857.
The root cause of the bug is the write handler reuses existed bucket file id for insert overwrite. Besides it use replace commit for insert overwrite operation and mark all the existed bucket file id as replaced.
So the snapshot query result is wrong.
The pr aims to fix this bug by generating new file id for bucket if insert overwrite into bucket index table.
Impact
NA
Risk level (write none, low medium or high below)
NA
Documentation Update
NA
Contributor's checklist