-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-6437] Refine avg record size by considering both commit and deltacommit #9013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
#6864 introduced an optimized and restrict change. actually, we need to consider |
|
Is there any possibility we write some test cases then? |
|
thanks @danny0405 for replying, and sure thing I will try to add a unit test later in the PR. |
|
hi @danny0405 , a possible way is to consolidate the change into the util function. what do you think? |
|
I'm wondering whether we can add some tests for |
|
hey @danny0405, I understood your ask generally while it is not that easy. currently, both PRs are to change the input parameters of I think locating the change within furtherly, leveraging |
+1, let's move |
| */ | ||
| long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline() | ||
| .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(), config); | ||
| .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)).filterCompletedInstants(), config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we be more strict here on the calculation of average bytes per record? deltacommits may contain log files and the number of bytes written for log files cannot be applied to writing parquet base files. Let's only consider parquet file size here for record size estimation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a transformation ratio between log and parquet, but may not be very accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would you advise how can I distinguish/separate them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine with your change now, just need to add some tests for the method that you might abstract out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as I commented previously, adding UT for func assignInserts is a little hard...
any suggestion from your side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a UT for averageBytesPerRecord I guess ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
averageBytesPerRecord is not changed and impacted as it just goes through the commits reversely to get the first eligible one to calculate the size. this explains why #6864 did modify it either.
here we go the existing UTs for the function. i know the value of UT while it is not easy to have one on the current code struct.
https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java#L168-L190
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a UT for averageBytesPerRecord to validate that both commit and delta_commit are included in the algorithm.
| if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { | ||
| if (totalBytesWritten > fileSizeThreshold && totalBytesWritten <= fileSizeUpper && totalRecordsWritten > 0) { | ||
| avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); | ||
| break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this limitation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to achieve a similar goal as #6864.
this consolidated implementation avoids logic fragments and is also UT-friendly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the totalBytesWritten is the size accumulated from all the written files, what the meaning of it to compare with a single file size threshold?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are correct and I misunderstood the totalBytesWritten.
then there might be two options:
- consider extra FilesInsert/FileUpdated
- filter out REPLACE commit within the
averageBytesPerRecordfunction.
which one do you prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just updated the code to reflect option 2.
| while (instants.hasNext()) { | ||
| HoodieInstant instant = instants.next(); | ||
| if (instant.getAction().equals(REPLACE_COMMIT_ACTION)) { | ||
| continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer (CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION), one thing needs to note is the avro file uses the Java reflection to calcute the in-memory size of a record, while Parquet file size is the actual file size so it is more accurate, not sure whether we should include the log if there are parquets, that means, the parquet file size should have higher priority than log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @danny0405, I just reverted the code to the very first one.
Whether including the log or not, the current one would be better than the released one, at least for MoR tables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, it is great if you can add a test for for the method.
|
Close this one as it is resolved in: #10763 |
Change Logs
refine avg record size calculation by considering both commit and deltacommit.
Contributor's checklist