diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java index 092f9270b9647..9adae1daa5336 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java @@ -40,14 +40,21 @@ public class ClusteringCommand implements CommandMarker { private static final Logger LOG = LogManager.getLogger(ClusteringCommand.class); + /** + * Schedule clustering table service. + *
+ * Example: + * > connect --path {path to hudi table} + * > clustering schedule --sparkMaster local --sparkMemory 2g + */ @CliCommand(value = "clustering schedule", help = "Schedule Clustering") public String scheduleClustering( - @CliOption(key = "sparkMemory", help = "Spark executor memory", - unspecifiedDefaultValue = "1G") final String sparkMemory, - @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for clustering", - unspecifiedDefaultValue = "") final String propsFilePath, - @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", - unspecifiedDefaultValue = "") final String[] configs) throws Exception { + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1g", help = "Spark executor memory") final String sparkMemory, + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations " + + "for hoodie client for clustering", unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can " + + "be passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); @@ -59,8 +66,8 @@ public String scheduleClustering( // First get a clustering instant time and pass it to spark launcher for scheduling clustering String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); - sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), client.getBasePath(), - client.getTableConfig().getTableName(), clusteringInstantTime, sparkMemory, propsFilePath); + sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), master, sparkMemory, + client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime, propsFilePath); UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); @@ -71,21 +78,25 @@ public String scheduleClustering( return "Succeeded to schedule clustering for " + clusteringInstantTime; } + /** + * Run clustering table service. + *
+ * Example: + * > connect --path {path to hudi table} + * > clustering schedule --sparkMaster local --sparkMemory 2g + * > clustering run --sparkMaster local --sparkMemory 2g --clusteringInstant 20211124005208 + */ @CliCommand(value = "clustering run", help = "Run Clustering") public String runClustering( - @CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", - unspecifiedDefaultValue = "1") final String parallelism, - @CliOption(key = "sparkMemory", help = "Spark executor memory", - unspecifiedDefaultValue = "4G") final String sparkMemory, - @CliOption(key = "retry", help = "Number of retries", - unspecifiedDefaultValue = "1") final String retry, - @CliOption(key = "clusteringInstant", help = "Clustering instant time", - mandatory = true) final String clusteringInstantTime, - @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", - unspecifiedDefaultValue = "") final String propsFilePath, - @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", - unspecifiedDefaultValue = "") final String[] configs - ) throws Exception { + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master, + @CliOption(key = "sparkMemory", help = "Spark executor memory", unspecifiedDefaultValue = "4g") final String sparkMemory, + @CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", unspecifiedDefaultValue = "1") final String parallelism, + @CliOption(key = "retry", help = "Number of retries", unspecifiedDefaultValue = "1") final String retry, + @CliOption(key = "clusteringInstant", help = "Clustering instant time", mandatory = true) final String clusteringInstantTime, + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for " + + "hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be " + + "passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); @@ -93,8 +104,9 @@ public String runClustering( String sparkPropertiesPath = Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), client.getBasePath(), - client.getTableConfig().getTableName(), clusteringInstantTime, parallelism, sparkMemory, retry, propsFilePath); + sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), master, sparkMemory, + client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime, + parallelism, retry, propsFilePath); UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index ef6416c0ca680..7de5ad74d3f1b 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieBootstrapConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -79,12 +80,14 @@ enum SparkCommand { } public static void main(String[] args) throws Exception { - String command = args[0]; - LOG.info("Invoking SparkMain:" + command); + ValidationUtils.checkArgument(args.length >= 4); + final String commandString = args[0]; + LOG.info("Invoking SparkMain: " + commandString); + final SparkCommand cmd = SparkCommand.valueOf(commandString); - SparkCommand cmd = SparkCommand.valueOf(command); + JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + commandString, + Option.of(args[1]), Option.of(args[2])); - JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2])); int returnCode = 0; try { switch (cmd) { @@ -111,8 +114,8 @@ public static void main(String[] args) throws Exception { if (args.length > 13) { configs.addAll(Arrays.asList(args).subList(13, args.length)); } - returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8], - Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs); + returnCode = dataLoad(jsc, commandString, args[3], args[4], args[5], args[6], args[7], args[8], + Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs); break; case COMPACT_RUN: assert (args.length >= 10); @@ -159,33 +162,34 @@ public static void main(String[] args) throws Exception { case COMPACT_UNSCHEDULE_PLAN: assert (args.length == 9); doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), - Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); + Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); returnCode = 0; break; case CLUSTERING_RUN: - assert (args.length >= 8); + assert (args.length >= 9); propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[7])) { - propsFilePath = args[7]; + if (!StringUtils.isNullOrEmpty(args[8])) { + propsFilePath = args[8]; } configs = new ArrayList<>(); - if (args.length > 8) { - configs.addAll(Arrays.asList(args).subList(8, args.length)); + if (args.length > 9) { + configs.addAll(Arrays.asList(args).subList(9, args.length)); } - returnCode = cluster(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], - Integer.parseInt(args[6]), false, propsFilePath, configs); + returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2], + Integer.parseInt(args[7]), false, propsFilePath, configs); break; case CLUSTERING_SCHEDULE: - assert (args.length >= 6); + assert (args.length >= 7); propsFilePath = null; - if (!StringUtils.isNullOrEmpty(args[5])) { - propsFilePath = args[5]; + if (!StringUtils.isNullOrEmpty(args[6])) { + propsFilePath = args[6]; } configs = new ArrayList<>(); - if (args.length > 6) { - configs.addAll(Arrays.asList(args).subList(6, args.length)); + if (args.length > 7) { + configs.addAll(Arrays.asList(args).subList(7, args.length)); } - returnCode = cluster(jsc, args[1], args[2], args[3], 1, args[4], 0, true, propsFilePath, configs); + returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2], + 0, true, propsFilePath, configs); break; case CLEAN: assert (args.length >= 5); @@ -229,7 +233,7 @@ public static void main(String[] args) throws Exception { break; } } catch (Throwable throwable) { - LOG.error("Fail to execute command", throwable); + LOG.error("Fail to execute commandString", throwable); returnCode = -1; } finally { jsc.stop(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java index 7127631775deb..cf853b4c5130a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java @@ -40,7 +40,7 @@ */ public class SparkUtil { - private static final String DEFAULT_SPARK_MASTER = "yarn"; + public static final String DEFAULT_SPARK_MASTER = "yarn"; /** * TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro. diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index a96a4b75e0f39..19f6c38bfab2a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -59,7 +59,7 @@ public class HoodieClusteringJob { public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) { this.cfg = cfg; this.jsc = jsc; - this.props = cfg.propsFilePath == null + this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath) ? UtilHelpers.buildProperties(cfg.configs) : readConfigFromFileSystem(jsc, cfg); }