diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 175ba3d66fd1f..cdeb18d34c26d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -285,6 +285,24 @@ private static SparkConf buildSparkConf(String appName, String defaultMaster, Ma return SparkRDDWriteClient.registerClasses(sparkConf); } + private static SparkConf buildSparkConf(String appName, Map additionalConfigs) { + final SparkConf sparkConf = new SparkConf().setAppName(appName); + sparkConf.set("spark.ui.port", "8090"); + sparkConf.setIfMissing("spark.driver.maxResultSize", "2g"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.hadoop.mapred.output.compress", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + + additionalConfigs.forEach(sparkConf::set); + return SparkRDDWriteClient.registerClasses(sparkConf); + } + + public static JavaSparkContext buildSparkContext(String appName, Map configs) { + return new JavaSparkContext(buildSparkConf(appName, configs)); + } + public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map configs) { return new JavaSparkContext(buildSparkConf(appName, defaultMaster, configs)); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 367a121e9f783..867aa05b301f8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -331,8 +331,10 @@ public static class Config implements Serializable { description = "the min sync interval of each sync in continuous mode") public Integer minSyncIntervalSeconds = 0; - @Parameter(names = {"--spark-master"}, description = "spark master to use.") - public String sparkMaster = "local[2]"; + @Parameter(names = {"--spark-master"}, + description = "spark master to use, if not defined inherits from your environment taking into " + + "account Spark Configuration priority rules (e.g. not using spark-submit command).") + public String sparkMaster = ""; @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") public Boolean commitOnErrors = false; @@ -553,9 +555,12 @@ public static final Config getConfig(String[] args) { public static void main(String[] args) throws Exception { final Config cfg = getConfig(args); Map additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - JavaSparkContext jssc = - UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); - + JavaSparkContext jssc = null; + if (StringUtils.isNullOrEmpty(cfg.sparkMaster)) { + jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, additionalSparkConfigs); + } else { + jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); + } if (cfg.enableHiveSync) { LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing"); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 87684f450f9c4..3532c34705035 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -355,8 +355,10 @@ public static class Config implements Serializable { description = "the min sync interval of each sync in continuous mode") public Integer minSyncIntervalSeconds = 0; - @Parameter(names = {"--spark-master"}, description = "spark master to use.") - public String sparkMaster = "local[2]"; + @Parameter(names = {"--spark-master"}, + description = "spark master to use, if not defined inherits from your environment taking into " + + "account Spark Configuration priority rules (e.g. not using spark-submit command).") + public String sparkMaster = ""; @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") public Boolean commitOnErrors = false;