Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import scala.Tuple2;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;

/**
* Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
Expand Down Expand Up @@ -170,7 +171,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context)
* may result in OOM by making spark underestimate the actual input record sizes.
*/
long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(), config);
.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)).filterCompletedInstants(), config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we be more strict here on the calculation of average bytes per record? deltacommits may contain log files and the number of bytes written for log files cannot be applied to writing parquet base files. Let's only consider parquet file size here for record size estimation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a transformation ratio between log and parquet, but may not be very accurate.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you advise how can I distinguish/separate them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine with your change now, just need to add some tests for the method that you might abstract out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I commented previously, adding UT for func assignInserts is a little hard...
any suggestion from your side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a UT for averageBytesPerRecord I guess ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

averageBytesPerRecord is not changed and impacted as it just goes through the commits reversely to get the first eligible one to calculate the size. this explains why #6864 did modify it either.

here we go the existing UTs for the function. i know the value of UT while it is not easy to have one on the current code struct.
https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java#L168-L190

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a UT for averageBytesPerRecord to validate that both commit and delta_commit are included in the algorithm.

LOG.info("AvgRecordSize => " + averageRecordSize);

Map<String, List<SmallFile>> partitionSmallFilesMap =
Expand Down