diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java index 88f07b91c2577..be6b96eddfb84 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java @@ -40,8 +40,10 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig { public static final String MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP = "hoodie.memory.compaction.fraction"; // Default max memory fraction during compaction, excess spills to disk public static final String DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION = String.valueOf(0.6); - // Default memory size per compaction (used if SparkEnv is absent), excess spills to disk - public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 1024 * 1024 * 1024L; // 1GB + // Default memory size (1GB) per compaction (used if SparkEnv is absent), excess spills to disk + public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 1024 * 1024 * 1024L; + // Minimum memory size (100MB) for the spillable map. + public static final long DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 100 * 1024 * 1024L; // Property to set the max memory for merge public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size"; // Property to set the max memory for compaction @@ -91,6 +93,12 @@ public Builder withMaxMemoryFractionPerPartitionMerge(double maxMemoryFractionPe return this; } + public Builder withMaxMemoryMaxSize(long mergeMaxSize, long compactionMaxSize) { + props.setProperty(MAX_MEMORY_FOR_MERGE_PROP, String.valueOf(mergeMaxSize)); + props.setProperty(MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf(compactionMaxSize)); + return this; + } + public Builder withMaxMemoryFractionPerCompaction(double maxMemoryFractionPerCompaction) { props.setProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, String.valueOf(maxMemoryFractionPerCompaction)); return this; @@ -136,7 +144,7 @@ private long getMaxMemoryAllowedForMerge(String maxMemoryFraction) { double maxMemoryFractionForMerge = Double.valueOf(maxMemoryFraction); double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction); long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge); - return maxMemoryForMerge; + return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge); } else { return DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 4826c89d60928..095dbd5d73fa4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -193,9 +193,11 @@ private void init(String fileId, String partitionPath, HoodieDataFile dataFileTo private String init(String fileId, Iterator> newRecordsItr) { try { // Load the new records in a map - logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge()); - this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), - config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema)); + long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); + logger.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); + this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, + config.getSpillableMapBasePath(), new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(originalSchema)); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); }