Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,24 @@

public class IOUtils {
/**
* Dynamic calculation of max memory to use for for spillable map. user.available.memory = executor.memory *
* (1 - memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime
* the engine memory fractions/total memory is changed, the memory used for spillable map changes
* accordingly
* Dynamic calculation of max memory to use for spillable map. There is always more than one task
* running on a executor and the each task maintains a spillable map.
* user.available.memory = executor.memory * (1 - memory.fraction)
* spillable.available.memory = user.available.memory * hoodie.memory.fraction / executor.cores.
* Anytime the engine memory fractions/total memory is changed, the memory used for spillable map
* changes accordingly.
*/
public static long getMaxMemoryAllowedForMerge(TaskContextSupplier context, String maxMemoryFraction) {
Option<String> totalMemoryOpt = context.getProperty(EngineProperty.TOTAL_MEMORY_AVAILABLE);
Option<String> memoryFractionOpt = context.getProperty(EngineProperty.MEMORY_FRACTION_IN_USE);
Option<String> totalCoresOpt = context.getProperty(EngineProperty.TOTAL_CORES_PER_EXECUTOR);

if (totalMemoryOpt.isPresent() && memoryFractionOpt.isPresent()) {
if (totalMemoryOpt.isPresent() && memoryFractionOpt.isPresent() && totalCoresOpt.isPresent()) {
long executorMemoryInBytes = Long.parseLong(totalMemoryOpt.get());
double memoryFraction = Double.parseDouble(memoryFractionOpt.get());
double maxMemoryFractionForMerge = Double.parseDouble(maxMemoryFraction);
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
long executorCores = Long.parseLong(totalCoresOpt.get());
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction) / executorCores;
long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge);
return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public Option<String> getProperty(EngineProperty prop) {
.get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION));
}
return Option.empty();
} else if (prop == EngineProperty.TOTAL_CORES_PER_EXECUTOR) {
final String DEFAULT_SPARK_EXECUTOR_CORES = "1";
final String SPARK_EXECUTOR_EXECUTOR_CORES_PROP = "spark.executor.cores";
if (SparkEnv.get() != null) {
return Option.ofNullable(SparkEnv.get().conf()
.get(SPARK_EXECUTOR_EXECUTOR_CORES_PROP, DEFAULT_SPARK_EXECUTOR_CORES));
}
return Option.empty();
}
throw new HoodieException("Unknown engine property :" + prop);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public enum EngineProperty {
EMBEDDED_SERVER_HOST,
// Pool/queue to use to run compaction.
COMPACTION_POOL_NAME,
TOTAL_CORES_PER_EXECUTOR,
// Amount of total memory available to each engine executor
TOTAL_MEMORY_AVAILABLE,
// Fraction of that memory, that is already in use by the engine
MEMORY_FRACTION_IN_USE,
MEMORY_FRACTION_IN_USE
}