diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java index 6215df54b2609..6ba6721d9b5c6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java @@ -32,20 +32,24 @@ public class IOUtils { /** - * Dynamic calculation of max memory to use for for spillable map. user.available.memory = executor.memory * - * (1 - memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime - * the engine memory fractions/total memory is changed, the memory used for spillable map changes - * accordingly + * Dynamic calculation of max memory to use for spillable map. There is always more than one task + * running on a executor and the each task maintains a spillable map. + * user.available.memory = executor.memory * (1 - memory.fraction) + * spillable.available.memory = user.available.memory * hoodie.memory.fraction / executor.cores. + * Anytime the engine memory fractions/total memory is changed, the memory used for spillable map + * changes accordingly. */ public static long getMaxMemoryAllowedForMerge(TaskContextSupplier context, String maxMemoryFraction) { Option totalMemoryOpt = context.getProperty(EngineProperty.TOTAL_MEMORY_AVAILABLE); Option memoryFractionOpt = context.getProperty(EngineProperty.MEMORY_FRACTION_IN_USE); + Option totalCoresOpt = context.getProperty(EngineProperty.TOTAL_CORES_PER_EXECUTOR); - if (totalMemoryOpt.isPresent() && memoryFractionOpt.isPresent()) { + if (totalMemoryOpt.isPresent() && memoryFractionOpt.isPresent() && totalCoresOpt.isPresent()) { long executorMemoryInBytes = Long.parseLong(totalMemoryOpt.get()); double memoryFraction = Double.parseDouble(memoryFractionOpt.get()); double maxMemoryFractionForMerge = Double.parseDouble(maxMemoryFraction); - double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction); + long executorCores = Long.parseLong(totalCoresOpt.get()); + double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction) / executorCores; long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge); return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge); } else { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java index 6397386b5348a..d118f0ead8d8e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkTaskContextSupplier.java @@ -78,6 +78,14 @@ public Option getProperty(EngineProperty prop) { .get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION)); } return Option.empty(); + } else if (prop == EngineProperty.TOTAL_CORES_PER_EXECUTOR) { + final String DEFAULT_SPARK_EXECUTOR_CORES = "1"; + final String SPARK_EXECUTOR_EXECUTOR_CORES_PROP = "spark.executor.cores"; + if (SparkEnv.get() != null) { + return Option.ofNullable(SparkEnv.get().conf() + .get(SPARK_EXECUTOR_EXECUTOR_CORES_PROP, DEFAULT_SPARK_EXECUTOR_CORES)); + } + return Option.empty(); } throw new HoodieException("Unknown engine property :" + prop); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java index df2f3bf6fca76..5e9a516ec4f81 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java @@ -26,8 +26,9 @@ public enum EngineProperty { EMBEDDED_SERVER_HOST, // Pool/queue to use to run compaction. COMPACTION_POOL_NAME, + TOTAL_CORES_PER_EXECUTOR, // Amount of total memory available to each engine executor TOTAL_MEMORY_AVAILABLE, // Fraction of that memory, that is already in use by the engine - MEMORY_FRACTION_IN_USE, + MEMORY_FRACTION_IN_USE }