From b0851814180676c4a43f198c859b402a14d86b9a Mon Sep 17 00:00:00 2001 From: wangxianghu Date: Tue, 18 Feb 2020 22:33:42 +0800 Subject: [PATCH] [MINOR] Add javadoc to SchedulerConfGenerator and code clean --- .../deltastreamer/SchedulerConfGenerator.java | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java index e98b86763a919..54fcf689a99ab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java @@ -44,6 +44,7 @@ public class SchedulerConfGenerator { public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync"; public static final String COMPACT_POOL_NAME = "hoodiecompact"; public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode"; + public static final String SPARK_SCHEDULER_FAIR_MODE = "FAIR"; public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file"; private static final String SPARK_SCHEDULING_PATTERN = @@ -52,36 +53,55 @@ public class SchedulerConfGenerator { + " \n \n %s\n" + " %s\n %s\n \n"; + /** + * Helper to generate spark scheduling configs in XML format with input params. + * + * @param deltaSyncWeight Scheduling weight for delta sync + * @param compactionWeight Scheduling weight for compaction + * @param deltaSyncMinShare Minshare for delta sync + * @param compactionMinShare Minshare for compaction + * @return Spark scheduling configs + */ private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare, Integer compactionMinShare) { - return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, "FAIR", deltaSyncWeight.toString(), - deltaSyncMinShare.toString(), COMPACT_POOL_NAME, "FAIR", compactionWeight.toString(), - compactionMinShare.toString()); + return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE, + deltaSyncWeight.toString(), deltaSyncMinShare.toString(), COMPACT_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE, + compactionWeight.toString(), compactionMinShare.toString()); } /** * Helper to set Spark Scheduling Configs dynamically. * - * @param cfg Config + * @param cfg Config for HoodieDeltaStreamer */ public static Map getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception { scala.Option scheduleModeKeyOption = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY); final Option sparkSchedulerMode = scheduleModeKeyOption.isDefined() ? Option.of(scheduleModeKeyOption.get()) : Option.empty(); - Map additionalSparkConfigs = new HashMap<>(); - if (sparkSchedulerMode.isPresent() && "FAIR".equals(sparkSchedulerMode.get()) && cfg.continuousMode - && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { + Map additionalSparkConfigs = new HashMap<>(1); + if (sparkSchedulerMode.isPresent() && SPARK_SCHEDULER_FAIR_MODE.equals(sparkSchedulerMode.get()) + && cfg.continuousMode && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight, cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare); additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile); } else { LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode " - + "is not set to FAIR at instatiation time. Continuing without scheduling configs"); + + "is not set to FAIR at instantiation time. Continuing without scheduling configs"); } return additionalSparkConfigs; } + /** + * Generate spark scheduling configs and store it to a randomly generated tmp file. + * + * @param deltaSyncWeight Scheduling weight for delta sync + * @param compactionWeight Scheduling weight for compaction + * @param deltaSyncMinShare Minshare for delta sync + * @param compactionMinShare Minshare for compaction + * @return Return the absolute path of the tmp file which stores the spark schedule configs + * @throws IOException Throws an IOException when write configs to file failed + */ private static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException { File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml");