From 5c9b2a6cb8dfab10d634a6bd511b37500db6157f Mon Sep 17 00:00:00 2001 From: Angel Conde Date: Fri, 29 Apr 2022 11:14:53 +0200 Subject: [PATCH 1/3] Added support for initializing DeltaStreamer without a defined Spark Master. That will enable the usage of DeltaStreamer on environments such as AWS Glue or other serverless environments where the spark master is inherited and we do not have access to it. --- .../apache/hudi/utilities/UtilHelpers.java | 19 +++++++++++++++++++ .../deltastreamer/HoodieDeltaStreamer.java | 13 ++++++++----- .../HoodieMultiTableDeltaStreamer.java | 4 ++-- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 5d1fd19267911..e3e4895a77774 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -284,10 +284,29 @@ private static SparkConf buildSparkConf(String appName, String defaultMaster, Ma return SparkRDDWriteClient.registerClasses(sparkConf); } + private static SparkConf buildSparkConf(String appName, Map additionalConfigs) { + final SparkConf sparkConf = new SparkConf().setAppName(appName); + sparkConf.set("spark.ui.port", "8090"); + sparkConf.setIfMissing("spark.driver.maxResultSize", "2g"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.hadoop.mapred.output.compress", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + + additionalConfigs.forEach(sparkConf::set); + return SparkRDDWriteClient.registerClasses(sparkConf); + } + + public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map configs) { return new JavaSparkContext(buildSparkConf(appName, defaultMaster, configs)); } + public static JavaSparkContext buildSparkContext(String appName, Map configs) { + return new JavaSparkContext(buildSparkConf(appName, configs)); + } + public static JavaSparkContext buildSparkContext(String appName, String defaultMaster) { return new JavaSparkContext(buildSparkConf(appName, defaultMaster)); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 56124b82afc06..a74f8946b44e1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -325,8 +325,8 @@ public static class Config implements Serializable { description = "the min sync interval of each sync in continuous mode") public Integer minSyncIntervalSeconds = 0; - @Parameter(names = {"--spark-master"}, description = "spark master to use.") - public String sparkMaster = "local[2]"; + @Parameter(names = {"--spark-master"}, description = "spark master to use, if not defined inherits (e.g. not using spark-summit command).") + public String sparkMaster = ""; @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") public Boolean commitOnErrors = false; @@ -538,9 +538,12 @@ public static final Config getConfig(String[] args) { public static void main(String[] args) throws Exception { final Config cfg = getConfig(args); Map additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - JavaSparkContext jssc = - UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); - + JavaSparkContext jssc = null; + if(cfg.sparkMaster.isEmpty()){ + jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, additionalSparkConfigs); + }else{ + jscc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); + } if (cfg.enableHiveSync) { LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing"); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 376c9cfae3730..596e0c320ace7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -343,8 +343,8 @@ public static class Config implements Serializable { description = "the min sync interval of each sync in continuous mode") public Integer minSyncIntervalSeconds = 0; - @Parameter(names = {"--spark-master"}, description = "spark master to use.") - public String sparkMaster = "local[2]"; + @Parameter(names = {"--spark-master"}, description = "spark master to use, if not defined inherits (e.g. not using spark-summit command).") + public String sparkMaster = ""; @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") public Boolean commitOnErrors = false; From 01de4e917cd66362676e09580b066fba2fc767e2 Mon Sep 17 00:00:00 2001 From: Angel Conde Date: Fri, 29 Apr 2022 19:10:37 +0200 Subject: [PATCH 2/3] Fixed variable name on DeltaStreamer's spark context --- .../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index a74f8946b44e1..862d8153b980f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -542,7 +542,7 @@ public static void main(String[] args) throws Exception { if(cfg.sparkMaster.isEmpty()){ jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, additionalSparkConfigs); }else{ - jscc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); + jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); } if (cfg.enableHiveSync) { LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing"); From eace1b7e8a2b95fc6514c7548ce9f000358189a1 Mon Sep 17 00:00:00 2001 From: Angel Conde Date: Wed, 11 May 2022 20:38:59 +0200 Subject: [PATCH 3/3] Fixed style to be compliant. --- .../apache/hudi/utilities/UtilHelpers.java | 27 ++-- .../deltastreamer/HoodieDeltaStreamer.java | 152 +++++++++--------- 2 files changed, 89 insertions(+), 90 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index f792f0967a2e2..f7327886397ac 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -112,8 +112,8 @@ public class UtilHelpers { private static final Logger LOG = LogManager.getLogger(UtilHelpers.class); public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, - SparkSession sparkSession, SchemaProvider schemaProvider, - HoodieDeltaStreamerMetrics metrics) throws IOException { + SparkSession sparkSession, SchemaProvider schemaProvider, + HoodieDeltaStreamerMetrics metrics) throws IOException { try { try { return (Source) ReflectionUtils.loadClass(sourceClass, @@ -192,7 +192,7 @@ public static Option createTransformer(List classNames) thr public static InitialCheckPointProvider createInitialCheckpointProvider( String className, TypedProperties props) throws IOException { try { - return (InitialCheckPointProvider) ReflectionUtils.loadClass(className, new Class[]{TypedProperties.class}, props); + return (InitialCheckPointProvider) ReflectionUtils.loadClass(className, new Class[] {TypedProperties.class}, props); } catch (Throwable e) { throw new IOException("Could not load initial checkpoint provider class " + className, e); } @@ -243,7 +243,7 @@ public static void validateAndAddProperties(String[] configs, SparkLauncher spar /** * Parse Schema from file. * - * @param fs File System + * @param fs File System * @param schemaFile Schema File */ public static String parseSchema(FileSystem fs, String schemaFile) throws Exception { @@ -299,7 +299,6 @@ private static SparkConf buildSparkConf(String appName, Map addi return SparkRDDWriteClient.registerClasses(sparkConf); } - public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map configs) { return new JavaSparkContext(buildSparkConf(appName, defaultMaster, configs)); } @@ -307,7 +306,7 @@ public static JavaSparkContext buildSparkContext(String appName, String defaultM public static JavaSparkContext buildSparkContext(String appName, Map configs) { return new JavaSparkContext(buildSparkConf(appName, configs)); } - + public static JavaSparkContext buildSparkContext(String appName, String defaultMaster) { return new JavaSparkContext(buildSparkConf(appName, defaultMaster)); } @@ -326,13 +325,13 @@ public static JavaSparkContext buildSparkContext(String appName, String sparkMas /** * Build Hoodie write client. * - * @param jsc Java Spark Context - * @param basePath Base Path - * @param schemaStr Schema + * @param jsc Java Spark Context + * @param basePath Base Path + * @param schemaStr Schema * @param parallelism Parallelism */ public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, - int parallelism, Option compactionStrategyClass, TypedProperties properties) { + int parallelism, Option compactionStrategyClass, TypedProperties properties) { HoodieCompactionConfig compactionConfig = compactionStrategyClass .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) @@ -501,13 +500,13 @@ public static SchemaProvider createRowBasedSchemaProvider(StructType structType, * Create latest schema provider for Target schema. * * @param structType spark data type of incoming batch. - * @param jssc instance of {@link JavaSparkContext}. - * @param fs instance of {@link FileSystem}. - * @param basePath base path of the table. + * @param jssc instance of {@link JavaSparkContext}. + * @param fs instance of {@link FileSystem}. + * @param basePath base path of the table. * @return the schema provider where target schema refers to latest schema(either incoming schema or table schema). */ public static SchemaProvider createLatestSchemaProvider(StructType structType, - JavaSparkContext jssc, FileSystem fs, String basePath) { + JavaSparkContext jssc, FileSystem fs, String basePath) { SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType); Schema writeSchema = rowSchemaProvider.getTargetSchema(); Schema latestTableSchema = writeSchema; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 66de620f2ceae..4423ab6e2b9f9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -253,7 +253,7 @@ public static class Config implements Serializable { @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", - splitter = IdentitySplitter.class) + splitter = IdentitySplitter.class) public List configs = new ArrayList<>(); @Parameter(names = {"--source-class"}, @@ -428,86 +428,86 @@ public boolean equals(Object o) { } Config config = (Config) o; return sourceLimit == config.sourceLimit - && Objects.equals(targetBasePath, config.targetBasePath) - && Objects.equals(targetTableName, config.targetTableName) - && Objects.equals(tableType, config.tableType) - && Objects.equals(baseFileFormat, config.baseFileFormat) - && Objects.equals(propsFilePath, config.propsFilePath) - && Objects.equals(configs, config.configs) - && Objects.equals(sourceClassName, config.sourceClassName) - && Objects.equals(sourceOrderingField, config.sourceOrderingField) - && Objects.equals(payloadClassName, config.payloadClassName) - && Objects.equals(schemaProviderClassName, config.schemaProviderClassName) - && Objects.equals(transformerClassNames, config.transformerClassNames) - && operation == config.operation - && Objects.equals(filterDupes, config.filterDupes) - && Objects.equals(enableHiveSync, config.enableHiveSync) - && Objects.equals(maxPendingCompactions, config.maxPendingCompactions) - && Objects.equals(maxPendingClustering, config.maxPendingClustering) - && Objects.equals(continuousMode, config.continuousMode) - && Objects.equals(minSyncIntervalSeconds, config.minSyncIntervalSeconds) - && Objects.equals(sparkMaster, config.sparkMaster) - && Objects.equals(commitOnErrors, config.commitOnErrors) - && Objects.equals(deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight) - && Objects.equals(compactSchedulingWeight, config.compactSchedulingWeight) - && Objects.equals(clusterSchedulingWeight, config.clusterSchedulingWeight) - && Objects.equals(deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare) - && Objects.equals(compactSchedulingMinShare, config.compactSchedulingMinShare) - && Objects.equals(clusterSchedulingMinShare, config.clusterSchedulingMinShare) - && Objects.equals(forceDisableCompaction, config.forceDisableCompaction) - && Objects.equals(checkpoint, config.checkpoint) - && Objects.equals(initialCheckpointProvider, config.initialCheckpointProvider) - && Objects.equals(help, config.help); + && Objects.equals(targetBasePath, config.targetBasePath) + && Objects.equals(targetTableName, config.targetTableName) + && Objects.equals(tableType, config.tableType) + && Objects.equals(baseFileFormat, config.baseFileFormat) + && Objects.equals(propsFilePath, config.propsFilePath) + && Objects.equals(configs, config.configs) + && Objects.equals(sourceClassName, config.sourceClassName) + && Objects.equals(sourceOrderingField, config.sourceOrderingField) + && Objects.equals(payloadClassName, config.payloadClassName) + && Objects.equals(schemaProviderClassName, config.schemaProviderClassName) + && Objects.equals(transformerClassNames, config.transformerClassNames) + && operation == config.operation + && Objects.equals(filterDupes, config.filterDupes) + && Objects.equals(enableHiveSync, config.enableHiveSync) + && Objects.equals(maxPendingCompactions, config.maxPendingCompactions) + && Objects.equals(maxPendingClustering, config.maxPendingClustering) + && Objects.equals(continuousMode, config.continuousMode) + && Objects.equals(minSyncIntervalSeconds, config.minSyncIntervalSeconds) + && Objects.equals(sparkMaster, config.sparkMaster) + && Objects.equals(commitOnErrors, config.commitOnErrors) + && Objects.equals(deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight) + && Objects.equals(compactSchedulingWeight, config.compactSchedulingWeight) + && Objects.equals(clusterSchedulingWeight, config.clusterSchedulingWeight) + && Objects.equals(deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare) + && Objects.equals(compactSchedulingMinShare, config.compactSchedulingMinShare) + && Objects.equals(clusterSchedulingMinShare, config.clusterSchedulingMinShare) + && Objects.equals(forceDisableCompaction, config.forceDisableCompaction) + && Objects.equals(checkpoint, config.checkpoint) + && Objects.equals(initialCheckpointProvider, config.initialCheckpointProvider) + && Objects.equals(help, config.help); } @Override public int hashCode() { return Objects.hash(targetBasePath, targetTableName, tableType, - baseFileFormat, propsFilePath, configs, sourceClassName, - sourceOrderingField, payloadClassName, schemaProviderClassName, - transformerClassNames, sourceLimit, operation, filterDupes, - enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode, - minSyncIntervalSeconds, sparkMaster, commitOnErrors, - deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare, - compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint, - initialCheckpointProvider, help); + baseFileFormat, propsFilePath, configs, sourceClassName, + sourceOrderingField, payloadClassName, schemaProviderClassName, + transformerClassNames, sourceLimit, operation, filterDupes, + enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode, + minSyncIntervalSeconds, sparkMaster, commitOnErrors, + deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare, + compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint, + initialCheckpointProvider, help); } @Override public String toString() { return "Config{" - + "targetBasePath='" + targetBasePath + '\'' - + ", targetTableName='" + targetTableName + '\'' - + ", tableType='" + tableType + '\'' - + ", baseFileFormat='" + baseFileFormat + '\'' - + ", propsFilePath='" + propsFilePath + '\'' - + ", configs=" + configs - + ", sourceClassName='" + sourceClassName + '\'' - + ", sourceOrderingField='" + sourceOrderingField + '\'' - + ", payloadClassName='" + payloadClassName + '\'' - + ", schemaProviderClassName='" + schemaProviderClassName + '\'' - + ", transformerClassNames=" + transformerClassNames - + ", sourceLimit=" + sourceLimit - + ", operation=" + operation - + ", filterDupes=" + filterDupes - + ", enableHiveSync=" + enableHiveSync - + ", maxPendingCompactions=" + maxPendingCompactions - + ", maxPendingClustering=" + maxPendingClustering - + ", continuousMode=" + continuousMode - + ", minSyncIntervalSeconds=" + minSyncIntervalSeconds - + ", sparkMaster='" + sparkMaster + '\'' - + ", commitOnErrors=" + commitOnErrors - + ", deltaSyncSchedulingWeight=" + deltaSyncSchedulingWeight - + ", compactSchedulingWeight=" + compactSchedulingWeight - + ", clusterSchedulingWeight=" + clusterSchedulingWeight - + ", deltaSyncSchedulingMinShare=" + deltaSyncSchedulingMinShare - + ", compactSchedulingMinShare=" + compactSchedulingMinShare - + ", clusterSchedulingMinShare=" + clusterSchedulingMinShare - + ", forceDisableCompaction=" + forceDisableCompaction - + ", checkpoint='" + checkpoint + '\'' - + ", initialCheckpointProvider='" + initialCheckpointProvider + '\'' - + ", help=" + help - + '}'; + + "targetBasePath='" + targetBasePath + '\'' + + ", targetTableName='" + targetTableName + '\'' + + ", tableType='" + tableType + '\'' + + ", baseFileFormat='" + baseFileFormat + '\'' + + ", propsFilePath='" + propsFilePath + '\'' + + ", configs=" + configs + + ", sourceClassName='" + sourceClassName + '\'' + + ", sourceOrderingField='" + sourceOrderingField + '\'' + + ", payloadClassName='" + payloadClassName + '\'' + + ", schemaProviderClassName='" + schemaProviderClassName + '\'' + + ", transformerClassNames=" + transformerClassNames + + ", sourceLimit=" + sourceLimit + + ", operation=" + operation + + ", filterDupes=" + filterDupes + + ", enableHiveSync=" + enableHiveSync + + ", maxPendingCompactions=" + maxPendingCompactions + + ", maxPendingClustering=" + maxPendingClustering + + ", continuousMode=" + continuousMode + + ", minSyncIntervalSeconds=" + minSyncIntervalSeconds + + ", sparkMaster='" + sparkMaster + '\'' + + ", commitOnErrors=" + commitOnErrors + + ", deltaSyncSchedulingWeight=" + deltaSyncSchedulingWeight + + ", compactSchedulingWeight=" + compactSchedulingWeight + + ", clusterSchedulingWeight=" + clusterSchedulingWeight + + ", deltaSyncSchedulingMinShare=" + deltaSyncSchedulingMinShare + + ", compactSchedulingMinShare=" + compactSchedulingMinShare + + ", clusterSchedulingMinShare=" + clusterSchedulingMinShare + + ", forceDisableCompaction=" + forceDisableCompaction + + ", checkpoint='" + checkpoint + '\'' + + ", initialCheckpointProvider='" + initialCheckpointProvider + '\'' + + ", help=" + help + + '}'; } } @@ -543,11 +543,11 @@ public static void main(String[] args) throws Exception { final Config cfg = getConfig(args); Map additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); JavaSparkContext jssc = null; - if(cfg.sparkMaster.isEmpty()){ - jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, additionalSparkConfigs); - }else{ - jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); - } + if (cfg.sparkMaster.isEmpty()) { + jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, additionalSparkConfigs); + } else { + jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); + } if (cfg.enableHiveSync) { LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing"); }