Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class SchedulerConfGenerator {
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
public static final String COMPACT_POOL_NAME = "hoodiecompact";
public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
public static final String SPARK_SCHEDULER_FAIR_MODE = "FAIR";
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";

private static final String SPARK_SCHEDULING_PATTERN =
Expand All @@ -52,36 +53,55 @@ public class SchedulerConfGenerator {
+ " </pool>\n <pool name=\"%s\">\n <schedulingMode>%s</schedulingMode>\n"
+ " <weight>%s</weight>\n <minShare>%s</minShare>\n </pool>\n</allocations>";

/**
* Helper to generate spark scheduling configs in XML format with input params.
*
* @param deltaSyncWeight Scheduling weight for delta sync
* @param compactionWeight Scheduling weight for compaction
* @param deltaSyncMinShare Minshare for delta sync
* @param compactionMinShare Minshare for compaction
* @return Spark scheduling configs
*/
private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare,
Integer compactionMinShare) {
return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, "FAIR", deltaSyncWeight.toString(),
deltaSyncMinShare.toString(), COMPACT_POOL_NAME, "FAIR", compactionWeight.toString(),
compactionMinShare.toString());
return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE,
deltaSyncWeight.toString(), deltaSyncMinShare.toString(), COMPACT_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE,
compactionWeight.toString(), compactionMinShare.toString());
}

/**
* Helper to set Spark Scheduling Configs dynamically.
*
* @param cfg Config
* @param cfg Config for HoodieDeltaStreamer
*/
public static Map<String, String> getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception {
scala.Option<String> scheduleModeKeyOption = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY);
final Option<String> sparkSchedulerMode =
scheduleModeKeyOption.isDefined() ? Option.of(scheduleModeKeyOption.get()) : Option.empty();

Map<String, String> additionalSparkConfigs = new HashMap<>();
if (sparkSchedulerMode.isPresent() && "FAIR".equals(sparkSchedulerMode.get()) && cfg.continuousMode
&& cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
Map<String, String> additionalSparkConfigs = new HashMap<>(1);
if (sparkSchedulerMode.isPresent() && SPARK_SCHEDULER_FAIR_MODE.equals(sparkSchedulerMode.get())
&& cfg.continuousMode && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile);
} else {
LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
+ "is not set to FAIR at instatiation time. Continuing without scheduling configs");
+ "is not set to FAIR at instantiation time. Continuing without scheduling configs");
}
return additionalSparkConfigs;
}

/**
* Generate spark scheduling configs and store it to a randomly generated tmp file.
*
* @param deltaSyncWeight Scheduling weight for delta sync
* @param compactionWeight Scheduling weight for compaction
* @param deltaSyncMinShare Minshare for delta sync
* @param compactionMinShare Minshare for compaction
* @return Return the absolute path of the tmp file which stores the spark schedule configs
* @throws IOException Throws an IOException when write configs to file failed
*/
private static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight,
Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException {
File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml");
Expand Down