-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-1984] Support independent flink hudi compaction function #3046
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #3046 +/- ##
=============================================
+ Coverage 8.43% 52.60% +44.16%
- Complexity 62 406 +344
=============================================
Files 70 70
Lines 2880 2880
Branches 359 359
=============================================
+ Hits 243 1515 +1272
+ Misses 2616 1220 -1396
- Partials 21 145 +124
Flags with carried forward coverage won't be shown. Click here to find out more.
|
garyli1019
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swuferhong thanks for your contribution! left some comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing apache license.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will modify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually don't add personal info to the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will modify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this keyBy will unevenly distribute the task cause the total events number are relatively small. Maybe we could find a better partitioning strategy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a bottleneck, what partitioning strategy do you suggest ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like we list all the filegroup for pending compaction and assign a taskID from (0 to taskNums - 1) to each of them, and partitionCustom by taskID % parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, we can infer the parallelism actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi, @garyli1019, it is not easy to set a logic similar to reblance here, so we only optimized the number of parallelism, which determine by the size of operations instead of passing in conf through the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add more descriptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thank you for your suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we move these validation checks to a commonplace? Looks like this was being used elsewhere too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bug in BaseScheduleCompactionActionExecutor$execute as the PR #3025 want to fix, this bug will let independence hudi compaction can not run. But this PR need to discuss, so we choose to modify the subclass FlinkScheduleCompactionActionExecutor to finish compaction.
0054e2c to
56d6dd6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether to execute compaction asynchronously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronous way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
() -> doCompaction(instantTime, compactionOperation, collector)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompactionCommitSource => CompactionPlanSourceFunction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
* Flink hudi compaction source function.
*
* <p>This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task
* event {@link CompactionPlanEvent} to downstream operators.
*
* <p>The compaction instant time is specified explicitly with strategies:
*
* <ul>
* <li>If the timeline has no inflight instants,
* use {@link HoodieActiveTimeline#createNewInstantTime()} as the instant time;</li>
* <li>If the timeline has inflight instants,
* use the {earliest inflight instant time - 1ms} as the instant time.</li>
* </ul>
*/There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get => Gets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
* Sets up the avro schema string into the give configuration {@code conf}
* through reading from the hoodie table metadata.
*/There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it and use FlinkClientUtil.getHadoopConf() directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invalid import.
80953b2 to
9bddbd1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add parameter HoodieTableMetaClient metaClient and move out the table name set up as a separate method:
setUpTableName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check condition never hits, remove all these checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method should not have return value.
efb8f15 to
7d3a329
Compare
danny0405
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
garyli1019
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Tips
What is the purpose of the pull request
Support independent flink hudi compaction function, which can do hudi compaction by a independent flink task.
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.