diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index c2f5a43066d36..134cfd8d2c0b5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.NumericUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -36,6 +37,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -54,6 +56,8 @@ import scala.Tuple2; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; + /** * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition). */ @@ -158,13 +162,17 @@ private List filterSmallFilesInClustering(final Set pendingCl private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) { // for new inserts, compute buckets depending on how many records we have for each partition Set partitionPaths = profile.getPartitionPaths(); - long averageRecordSize = - averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), - config); + /* + * NOTE: we only use commit instants to calculate average record size because replacecommit can be + * created by clustering, which has smaller average record size, which affects assigning inserts and + * may result in OOM by making spark underestimate the actual input record sizes. + */ + long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline() + .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(), config); LOG.info("AvgRecordSize => " + averageRecordSize); Map> partitionSmallFilesMap = - getSmallFilesForPartitions(new ArrayList(partitionPaths), context); + getSmallFilesForPartitions(new ArrayList<>(partitionPaths), context); Map> partitionPathToPendingClusteringFileGroupsId = getPartitionPathToPendingClusteringFileGroupsId();