Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,24 @@ private static SparkConf buildSparkConf(String appName, String defaultMaster, Ma
return SparkRDDWriteClient.registerClasses(sparkConf);
}

private static SparkConf buildSparkConf(String appName, Map<String, String> additionalConfigs) {
final SparkConf sparkConf = new SparkConf().setAppName(appName);
sparkConf.set("spark.ui.port", "8090");
sparkConf.setIfMissing("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
Comment on lines +290 to +296
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Neuw84 are these particularly for AWS Glue? Could we also update the Hudi website / docs on how Deltastreamer can be used on serverless platform?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yihua,

I just copied them from the other constructor. Most of them won´t have effect on Glue as the change of internal configs is very restricted.

Will try to make that happen, I was thinking on posting on the AWS blog but it will good to post this also on Hudi website/docs.

Will work on the Hudi website next.

Thanks for your inputs!


additionalConfigs.forEach(sparkConf::set);
return SparkRDDWriteClient.registerClasses(sparkConf);
}

public static JavaSparkContext buildSparkContext(String appName, Map<String, String> configs) {
return new JavaSparkContext(buildSparkConf(appName, configs));
}

public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map<String, String> configs) {
return new JavaSparkContext(buildSparkConf(appName, defaultMaster, configs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ public static class Config implements Serializable {
description = "the min sync interval of each sync in continuous mode")
public Integer minSyncIntervalSeconds = 0;

@Parameter(names = {"--spark-master"}, description = "spark master to use.")
public String sparkMaster = "local[2]";
@Parameter(names = {"--spark-master"},
description = "spark master to use, if not defined inherits from your environment taking into "
+ "account Spark Configuration priority rules (e.g. not using spark-submit command).")
public String sparkMaster = "";

@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
public Boolean commitOnErrors = false;
Expand Down Expand Up @@ -553,9 +555,12 @@ public static final Config getConfig(String[] args) {
public static void main(String[] args) throws Exception {
final Config cfg = getConfig(args);
Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
JavaSparkContext jssc =
UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs);

JavaSparkContext jssc = null;
if (StringUtils.isNullOrEmpty(cfg.sparkMaster)) {
jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, additionalSparkConfigs);
} else {
jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs);
}
if (cfg.enableHiveSync) {
LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,10 @@ public static class Config implements Serializable {
description = "the min sync interval of each sync in continuous mode")
public Integer minSyncIntervalSeconds = 0;

@Parameter(names = {"--spark-master"}, description = "spark master to use.")
public String sparkMaster = "local[2]";
@Parameter(names = {"--spark-master"},
description = "spark master to use, if not defined inherits from your environment taking into "
+ "account Spark Configuration priority rules (e.g. not using spark-submit command).")
public String sparkMaster = "";

@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
public Boolean commitOnErrors = false;
Expand Down