Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -54,6 +56,8 @@

import scala.Tuple2;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;

/**
* Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
*/
Expand Down Expand Up @@ -158,13 +162,17 @@ private List<SmallFile> filterSmallFilesInClustering(final Set<String> pendingCl
private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) {
// for new inserts, compute buckets depending on how many records we have for each partition
Set<String> partitionPaths = profile.getPartitionPaths();
long averageRecordSize =
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
config);
/*
* NOTE: we only use commit instants to calculate average record size because replacecommit can be
* created by clustering, which has smaller average record size, which affects assigning inserts and
* may result in OOM by making spark underestimate the actual input record sizes.
*/
long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(), config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good. Is it possible to reproduce a workload for this kind of scenario?

Copy link
Member Author

@xushiyan xushiyan Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simulated the scenario: COW inserts created small files, and then cluster created larger files.

Clustered file

4.4M Oct  5 18:33 9b3b96be-95ca-4e93-9a08-ab49640bd113-0_2-99-376_20221005183355230.parquet

// cat .hoodie/xxx.replacecommit
      "numWrites" : 39781,
      "totalWriteBytes" : 4664174,
// avg ≈ 117

Inserted file

863K Oct  5 18:33 d88007a8-f5d9-4c97-a5f1-56907b99d4a1-0_23-77-286_20221005183349960.parquet

// cat .hoodie/xxx.commit
      "numWrites" : 3588,
      "totalWriteBytes" : 818456,
// avg ≈ 228

so using replacecommit tends to underestimate avg rec size

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Thanks for sharing.

LOG.info("AvgRecordSize => " + averageRecordSize);

Map<String, List<SmallFile>> partitionSmallFilesMap =
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), context);
getSmallFilesForPartitions(new ArrayList<>(partitionPaths), context);

Map<String, Set<String>> partitionPathToPendingClusteringFileGroupsId = getPartitionPathToPendingClusteringFileGroupsId();

Expand Down