Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
public static final String MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP = "hoodie.memory.compaction.fraction";
// Default max memory fraction during compaction, excess spills to disk
public static final String DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION = String.valueOf(0.6);
// Default memory size per compaction (used if SparkEnv is absent), excess spills to disk
public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 1024 * 1024 * 1024L; // 1GB
// Default memory size (1GB) per compaction (used if SparkEnv is absent), excess spills to disk
public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 1024 * 1024 * 1024L;
// Minimum memory size (100MB) for the spillable map.
public static final long DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 100 * 1024 * 1024L;
// Property to set the max memory for merge
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size";
// Property to set the max memory for compaction
Expand Down Expand Up @@ -91,6 +93,12 @@ public Builder withMaxMemoryFractionPerPartitionMerge(double maxMemoryFractionPe
return this;
}

public Builder withMaxMemoryMaxSize(long mergeMaxSize, long compactionMaxSize) {
props.setProperty(MAX_MEMORY_FOR_MERGE_PROP, String.valueOf(mergeMaxSize));
props.setProperty(MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf(compactionMaxSize));
return this;
}

public Builder withMaxMemoryFractionPerCompaction(double maxMemoryFractionPerCompaction) {
props.setProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, String.valueOf(maxMemoryFractionPerCompaction));
return this;
Expand Down Expand Up @@ -136,7 +144,7 @@ private long getMaxMemoryAllowedForMerge(String maxMemoryFraction) {
double maxMemoryFractionForMerge = Double.valueOf(maxMemoryFraction);
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge);
return maxMemoryForMerge;
return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge);
} else {
return DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,11 @@ private void init(String fileId, String partitionPath, HoodieDataFile dataFileTo
private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
try {
// Load the new records in a map
logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge());
this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(),
config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
long memoryForMerge = config.getMaxMemoryPerPartitionMerge();
logger.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge,
config.getSpillableMapBasePath(), new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(originalSchema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
Expand Down