diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index f389695f7bb4..f7327886397a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -112,8 +112,8 @@ public class UtilHelpers { private static final Logger LOG = LogManager.getLogger(UtilHelpers.class); public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc, - SparkSession sparkSession, SchemaProvider schemaProvider, - HoodieDeltaStreamerMetrics metrics) throws IOException { + SparkSession sparkSession, SchemaProvider schemaProvider, + HoodieDeltaStreamerMetrics metrics) throws IOException { try { try { return (Source) ReflectionUtils.loadClass(sourceClass, @@ -192,7 +192,7 @@ public static Option createTransformer(List classNames) thr public static InitialCheckPointProvider createInitialCheckpointProvider( String className, TypedProperties props) throws IOException { try { - return (InitialCheckPointProvider) ReflectionUtils.loadClass(className, new Class[]{TypedProperties.class}, props); + return (InitialCheckPointProvider) ReflectionUtils.loadClass(className, new Class[] {TypedProperties.class}, props); } catch (Throwable e) { throw new IOException("Could not load initial checkpoint provider class " + className, e); } @@ -243,7 +243,7 @@ public static void validateAndAddProperties(String[] configs, SparkLauncher spar /** * Parse Schema from file. * - * @param fs File System + * @param fs File System * @param schemaFile Schema File */ public static String parseSchema(FileSystem fs, String schemaFile) throws Exception { @@ -285,10 +285,28 @@ private static SparkConf buildSparkConf(String appName, String defaultMaster, Ma return SparkRDDWriteClient.registerClasses(sparkConf); } + private static SparkConf buildSparkConf(String appName, Map additionalConfigs) { + final SparkConf sparkConf = new SparkConf().setAppName(appName); + sparkConf.set("spark.ui.port", "8090"); + sparkConf.setIfMissing("spark.driver.maxResultSize", "2g"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.hadoop.mapred.output.compress", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + + additionalConfigs.forEach(sparkConf::set); + return SparkRDDWriteClient.registerClasses(sparkConf); + } + public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map configs) { return new JavaSparkContext(buildSparkConf(appName, defaultMaster, configs)); } + public static JavaSparkContext buildSparkContext(String appName, Map configs) { + return new JavaSparkContext(buildSparkConf(appName, configs)); + } + public static JavaSparkContext buildSparkContext(String appName, String defaultMaster) { return new JavaSparkContext(buildSparkConf(appName, defaultMaster)); } @@ -307,13 +325,13 @@ public static JavaSparkContext buildSparkContext(String appName, String sparkMas /** * Build Hoodie write client. * - * @param jsc Java Spark Context - * @param basePath Base Path - * @param schemaStr Schema + * @param jsc Java Spark Context + * @param basePath Base Path + * @param schemaStr Schema * @param parallelism Parallelism */ public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, - int parallelism, Option compactionStrategyClass, TypedProperties properties) { + int parallelism, Option compactionStrategyClass, TypedProperties properties) { HoodieCompactionConfig compactionConfig = compactionStrategyClass .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) @@ -482,13 +500,13 @@ public static SchemaProvider createRowBasedSchemaProvider(StructType structType, * Create latest schema provider for Target schema. * * @param structType spark data type of incoming batch. - * @param jssc instance of {@link JavaSparkContext}. - * @param fs instance of {@link FileSystem}. - * @param basePath base path of the table. + * @param jssc instance of {@link JavaSparkContext}. + * @param fs instance of {@link FileSystem}. + * @param basePath base path of the table. * @return the schema provider where target schema refers to latest schema(either incoming schema or table schema). */ public static SchemaProvider createLatestSchemaProvider(StructType structType, - JavaSparkContext jssc, FileSystem fs, String basePath) { + JavaSparkContext jssc, FileSystem fs, String basePath) { SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType); Schema writeSchema = rowSchemaProvider.getTargetSchema(); Schema latestTableSchema = writeSchema; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 7a688b50c709..4423ab6e2b9f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -253,7 +253,7 @@ public static class Config implements Serializable { @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", - splitter = IdentitySplitter.class) + splitter = IdentitySplitter.class) public List configs = new ArrayList<>(); @Parameter(names = {"--source-class"}, @@ -326,8 +326,8 @@ public static class Config implements Serializable { description = "the min sync interval of each sync in continuous mode") public Integer minSyncIntervalSeconds = 0; - @Parameter(names = {"--spark-master"}, description = "spark master to use.") - public String sparkMaster = "local[2]"; + @Parameter(names = {"--spark-master"}, description = "spark master to use, if not defined inherits (e.g. not using spark-summit command).") + public String sparkMaster = ""; @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") public Boolean commitOnErrors = false; @@ -428,86 +428,86 @@ public boolean equals(Object o) { } Config config = (Config) o; return sourceLimit == config.sourceLimit - && Objects.equals(targetBasePath, config.targetBasePath) - && Objects.equals(targetTableName, config.targetTableName) - && Objects.equals(tableType, config.tableType) - && Objects.equals(baseFileFormat, config.baseFileFormat) - && Objects.equals(propsFilePath, config.propsFilePath) - && Objects.equals(configs, config.configs) - && Objects.equals(sourceClassName, config.sourceClassName) - && Objects.equals(sourceOrderingField, config.sourceOrderingField) - && Objects.equals(payloadClassName, config.payloadClassName) - && Objects.equals(schemaProviderClassName, config.schemaProviderClassName) - && Objects.equals(transformerClassNames, config.transformerClassNames) - && operation == config.operation - && Objects.equals(filterDupes, config.filterDupes) - && Objects.equals(enableHiveSync, config.enableHiveSync) - && Objects.equals(maxPendingCompactions, config.maxPendingCompactions) - && Objects.equals(maxPendingClustering, config.maxPendingClustering) - && Objects.equals(continuousMode, config.continuousMode) - && Objects.equals(minSyncIntervalSeconds, config.minSyncIntervalSeconds) - && Objects.equals(sparkMaster, config.sparkMaster) - && Objects.equals(commitOnErrors, config.commitOnErrors) - && Objects.equals(deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight) - && Objects.equals(compactSchedulingWeight, config.compactSchedulingWeight) - && Objects.equals(clusterSchedulingWeight, config.clusterSchedulingWeight) - && Objects.equals(deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare) - && Objects.equals(compactSchedulingMinShare, config.compactSchedulingMinShare) - && Objects.equals(clusterSchedulingMinShare, config.clusterSchedulingMinShare) - && Objects.equals(forceDisableCompaction, config.forceDisableCompaction) - && Objects.equals(checkpoint, config.checkpoint) - && Objects.equals(initialCheckpointProvider, config.initialCheckpointProvider) - && Objects.equals(help, config.help); + && Objects.equals(targetBasePath, config.targetBasePath) + && Objects.equals(targetTableName, config.targetTableName) + && Objects.equals(tableType, config.tableType) + && Objects.equals(baseFileFormat, config.baseFileFormat) + && Objects.equals(propsFilePath, config.propsFilePath) + && Objects.equals(configs, config.configs) + && Objects.equals(sourceClassName, config.sourceClassName) + && Objects.equals(sourceOrderingField, config.sourceOrderingField) + && Objects.equals(payloadClassName, config.payloadClassName) + && Objects.equals(schemaProviderClassName, config.schemaProviderClassName) + && Objects.equals(transformerClassNames, config.transformerClassNames) + && operation == config.operation + && Objects.equals(filterDupes, config.filterDupes) + && Objects.equals(enableHiveSync, config.enableHiveSync) + && Objects.equals(maxPendingCompactions, config.maxPendingCompactions) + && Objects.equals(maxPendingClustering, config.maxPendingClustering) + && Objects.equals(continuousMode, config.continuousMode) + && Objects.equals(minSyncIntervalSeconds, config.minSyncIntervalSeconds) + && Objects.equals(sparkMaster, config.sparkMaster) + && Objects.equals(commitOnErrors, config.commitOnErrors) + && Objects.equals(deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight) + && Objects.equals(compactSchedulingWeight, config.compactSchedulingWeight) + && Objects.equals(clusterSchedulingWeight, config.clusterSchedulingWeight) + && Objects.equals(deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare) + && Objects.equals(compactSchedulingMinShare, config.compactSchedulingMinShare) + && Objects.equals(clusterSchedulingMinShare, config.clusterSchedulingMinShare) + && Objects.equals(forceDisableCompaction, config.forceDisableCompaction) + && Objects.equals(checkpoint, config.checkpoint) + && Objects.equals(initialCheckpointProvider, config.initialCheckpointProvider) + && Objects.equals(help, config.help); } @Override public int hashCode() { return Objects.hash(targetBasePath, targetTableName, tableType, - baseFileFormat, propsFilePath, configs, sourceClassName, - sourceOrderingField, payloadClassName, schemaProviderClassName, - transformerClassNames, sourceLimit, operation, filterDupes, - enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode, - minSyncIntervalSeconds, sparkMaster, commitOnErrors, - deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare, - compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint, - initialCheckpointProvider, help); + baseFileFormat, propsFilePath, configs, sourceClassName, + sourceOrderingField, payloadClassName, schemaProviderClassName, + transformerClassNames, sourceLimit, operation, filterDupes, + enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode, + minSyncIntervalSeconds, sparkMaster, commitOnErrors, + deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare, + compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint, + initialCheckpointProvider, help); } @Override public String toString() { return "Config{" - + "targetBasePath='" + targetBasePath + '\'' - + ", targetTableName='" + targetTableName + '\'' - + ", tableType='" + tableType + '\'' - + ", baseFileFormat='" + baseFileFormat + '\'' - + ", propsFilePath='" + propsFilePath + '\'' - + ", configs=" + configs - + ", sourceClassName='" + sourceClassName + '\'' - + ", sourceOrderingField='" + sourceOrderingField + '\'' - + ", payloadClassName='" + payloadClassName + '\'' - + ", schemaProviderClassName='" + schemaProviderClassName + '\'' - + ", transformerClassNames=" + transformerClassNames - + ", sourceLimit=" + sourceLimit - + ", operation=" + operation - + ", filterDupes=" + filterDupes - + ", enableHiveSync=" + enableHiveSync - + ", maxPendingCompactions=" + maxPendingCompactions - + ", maxPendingClustering=" + maxPendingClustering - + ", continuousMode=" + continuousMode - + ", minSyncIntervalSeconds=" + minSyncIntervalSeconds - + ", sparkMaster='" + sparkMaster + '\'' - + ", commitOnErrors=" + commitOnErrors - + ", deltaSyncSchedulingWeight=" + deltaSyncSchedulingWeight - + ", compactSchedulingWeight=" + compactSchedulingWeight - + ", clusterSchedulingWeight=" + clusterSchedulingWeight - + ", deltaSyncSchedulingMinShare=" + deltaSyncSchedulingMinShare - + ", compactSchedulingMinShare=" + compactSchedulingMinShare - + ", clusterSchedulingMinShare=" + clusterSchedulingMinShare - + ", forceDisableCompaction=" + forceDisableCompaction - + ", checkpoint='" + checkpoint + '\'' - + ", initialCheckpointProvider='" + initialCheckpointProvider + '\'' - + ", help=" + help - + '}'; + + "targetBasePath='" + targetBasePath + '\'' + + ", targetTableName='" + targetTableName + '\'' + + ", tableType='" + tableType + '\'' + + ", baseFileFormat='" + baseFileFormat + '\'' + + ", propsFilePath='" + propsFilePath + '\'' + + ", configs=" + configs + + ", sourceClassName='" + sourceClassName + '\'' + + ", sourceOrderingField='" + sourceOrderingField + '\'' + + ", payloadClassName='" + payloadClassName + '\'' + + ", schemaProviderClassName='" + schemaProviderClassName + '\'' + + ", transformerClassNames=" + transformerClassNames + + ", sourceLimit=" + sourceLimit + + ", operation=" + operation + + ", filterDupes=" + filterDupes + + ", enableHiveSync=" + enableHiveSync + + ", maxPendingCompactions=" + maxPendingCompactions + + ", maxPendingClustering=" + maxPendingClustering + + ", continuousMode=" + continuousMode + + ", minSyncIntervalSeconds=" + minSyncIntervalSeconds + + ", sparkMaster='" + sparkMaster + '\'' + + ", commitOnErrors=" + commitOnErrors + + ", deltaSyncSchedulingWeight=" + deltaSyncSchedulingWeight + + ", compactSchedulingWeight=" + compactSchedulingWeight + + ", clusterSchedulingWeight=" + clusterSchedulingWeight + + ", deltaSyncSchedulingMinShare=" + deltaSyncSchedulingMinShare + + ", compactSchedulingMinShare=" + compactSchedulingMinShare + + ", clusterSchedulingMinShare=" + clusterSchedulingMinShare + + ", forceDisableCompaction=" + forceDisableCompaction + + ", checkpoint='" + checkpoint + '\'' + + ", initialCheckpointProvider='" + initialCheckpointProvider + '\'' + + ", help=" + help + + '}'; } } @@ -542,9 +542,12 @@ public static final Config getConfig(String[] args) { public static void main(String[] args) throws Exception { final Config cfg = getConfig(args); Map additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - JavaSparkContext jssc = - UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); - + JavaSparkContext jssc = null; + if (cfg.sparkMaster.isEmpty()) { + jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, additionalSparkConfigs); + } else { + jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); + } if (cfg.enableHiveSync) { LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing"); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 376c9cfae373..596e0c320ace 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -343,8 +343,8 @@ public static class Config implements Serializable { description = "the min sync interval of each sync in continuous mode") public Integer minSyncIntervalSeconds = 0; - @Parameter(names = {"--spark-master"}, description = "spark master to use.") - public String sparkMaster = "local[2]"; + @Parameter(names = {"--spark-master"}, description = "spark master to use, if not defined inherits (e.g. not using spark-summit command).") + public String sparkMaster = ""; @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written") public Boolean commitOnErrors = false;