diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java
index 9adae1daa5336..4163f0cb5a6a4 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java
@@ -116,4 +116,40 @@ public String runClustering(
}
return "Succeeded to run clustering for " + clusteringInstantTime;
}
+
+ /**
+ * Run clustering table service.
+ *
+ * Example:
+ * > connect --path {path to hudi table}
+ * > clustering scheduleAndExecute --sparkMaster local --sparkMemory 2g
+ */
+ @CliCommand(value = "clustering scheduleAndExecute", help = "Run Clustering. Make a cluster plan first and execute that plan immediately")
+ public String runClustering(
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
+ @CliOption(key = "sparkMemory", help = "Spark executor memory", unspecifiedDefaultValue = "4g") final String sparkMemory,
+ @CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", unspecifiedDefaultValue = "1") final String parallelism,
+ @CliOption(key = "retry", help = "Number of retries", unspecifiedDefaultValue = "1") final String retry,
+ @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for "
+ + "hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath,
+ @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be "
+ + "passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception {
+ HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+ boolean initialized = HoodieCLI.initConf();
+ HoodieCLI.initFS(initialized);
+
+ String sparkPropertiesPath =
+ Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
+ SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
+ sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory,
+ client.getBasePath(), client.getTableConfig().getTableName(), parallelism, retry, propsFilePath);
+ UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
+ Process process = sparkLauncher.launch();
+ InputStreamConsumer.captureOutput(process);
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ return "Failed to run clustering for scheduleAndExecute.";
+ }
+ return "Succeeded to run clustering for scheduleAndExecute";
+ }
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 1b6d10b177f07..47cd59cd883cb 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -76,7 +76,7 @@ public class SparkMain {
enum SparkCommand {
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
- CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
+ CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
}
public static void main(String[] args) throws Exception {
@@ -190,7 +190,20 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2],
- Integer.parseInt(args[7]), false, propsFilePath, configs);
+ Integer.parseInt(args[7]), HoodieClusteringJob.EXECUTE, propsFilePath, configs);
+ break;
+ case CLUSTERING_SCHEDULE_AND_EXECUTE:
+ assert (args.length >= 8);
+ propsFilePath = null;
+ if (!StringUtils.isNullOrEmpty(args[7])) {
+ propsFilePath = args[7];
+ }
+ configs = new ArrayList<>();
+ if (args.length > 8) {
+ configs.addAll(Arrays.asList(args).subList(8, args.length));
+ }
+ returnCode = cluster(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[2],
+ Integer.parseInt(args[6]), HoodieClusteringJob.SCHEDULE_AND_EXECUTE, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE:
assert (args.length >= 7);
@@ -203,7 +216,7 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(7, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2],
- 0, true, propsFilePath, configs);
+ 0, HoodieClusteringJob.SCHEDULE, propsFilePath, configs);
break;
case CLEAN:
assert (args.length >= 5);
@@ -351,13 +364,13 @@ private static int compact(JavaSparkContext jsc, String basePath, String tableNa
}
private static int cluster(JavaSparkContext jsc, String basePath, String tableName, String clusteringInstant,
- int parallelism, String sparkMemory, int retry, boolean schedule, String propsFilePath, List configs) {
+ int parallelism, String sparkMemory, int retry, String runningMode, String propsFilePath, List configs) {
HoodieClusteringJob.Config cfg = new HoodieClusteringJob.Config();
cfg.basePath = basePath;
cfg.tableName = tableName;
cfg.clusteringInstantTime = clusteringInstant;
cfg.parallelism = parallelism;
- cfg.runSchedule = schedule;
+ cfg.runningMode = runningMode;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
jsc.getConf().set("spark.executor.memory", sparkMemory);
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java
new file mode 100644
index 0000000000000..17075f9d3dfb6
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link org.apache.hudi.cli.commands.ClusteringCommand}.
+ *
+ * A command use SparkLauncher need load jars under lib which generate during mvn package.
+ * Use integration test instead of unit test.
+ */
+public class ITTestClusteringCommand extends AbstractShellIntegrationTest {
+
+ private String tablePath;
+ private String tableName;
+
+ @BeforeEach
+ public void init() throws IOException {
+ tableName = "test_table_" + ITTestClusteringCommand.class.getName();
+ tablePath = Paths.get(basePath, tableName).toString();
+
+ HoodieCLI.conf = jsc.hadoopConfiguration();
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+ metaClient.setBasePath(tablePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ }
+
+ /**
+ * Test case for command 'clustering schedule'.
+ */
+ @Test
+ public void testScheduleClustering() throws IOException {
+ // generate commits
+ generateCommits();
+
+ CommandResult cr = scheduleClustering();
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertTrue(
+ cr.getResult().toString().startsWith("Succeeded to schedule clustering for")));
+
+ // there is 1 requested clustering
+ HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
+ assertEquals(1, timeline.filterPendingReplaceTimeline().countInstants());
+ }
+
+ /**
+ * Test case for command 'clustering run'.
+ */
+ @Test
+ public void testClustering() throws IOException {
+ // generate commits
+ generateCommits();
+
+ CommandResult cr1 = scheduleClustering();
+ assertTrue(cr1.isSuccess());
+
+ // get clustering instance
+ HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
+ Option instance =
+ timeline.filterPendingReplaceTimeline().firstInstant().map(HoodieInstant::getTimestamp);
+ assertTrue(instance.isPresent(), "Must have pending clustering.");
+
+ CommandResult cr2 = getShell().executeCommand(
+ String.format("clustering run --parallelism %s --clusteringInstant %s --sparkMaster %s",
+ 2, instance, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr2.isSuccess()),
+ () -> assertTrue(
+ cr2.getResult().toString().startsWith("Succeeded to run clustering for ")));
+
+ // assert clustering complete
+ assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
+ .filterCompletedInstants().getInstants()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
+ "Pending clustering must be completed");
+
+ assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
+ .getCompletedReplaceTimeline().getInstants()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
+ "Pending clustering must be completed");
+ }
+
+ /**
+ * Test case for command 'clustering scheduleAndExecute'.
+ */
+ @Test
+ public void testClusteringScheduleAndExecute() throws IOException {
+ // generate commits
+ generateCommits();
+
+ CommandResult cr2 = getShell().executeCommand(
+ String.format("clustering scheduleAndExecute --parallelism %s --sparkMaster %s", 2, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr2.isSuccess()),
+ () -> assertTrue(
+ cr2.getResult().toString().startsWith("Succeeded to run clustering for scheduleAndExecute")));
+
+ // assert clustering complete
+ assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
+ .getCompletedReplaceTimeline().getInstants()
+ .map(HoodieInstant::getTimestamp).count() > 0,
+ "Completed clustering couldn't be 0");
+ }
+
+ private CommandResult scheduleClustering() {
+ // generate requested clustering
+ return getShell().executeCommand(
+ String.format("clustering schedule --hoodieConfigs hoodie.clustering.inline.max.commits=1 --sparkMaster %s", "local"));
+ }
+
+ private void generateCommits() throws IOException {
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ // Create the write client to write some records in
+ HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+ .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
+ .withDeleteParallelism(2).forTable(tableName)
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
+
+ SparkRDDWriteClient client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
+
+ insert(jsc, client, dataGen, "001");
+ insert(jsc, client, dataGen, "002");
+ }
+
+ private List insert(JavaSparkContext jsc, SparkRDDWriteClient client,
+ HoodieTestDataGenerator dataGen, String newCommitTime) throws IOException {
+ // inserts
+ client.startCommitWithTime(newCommitTime);
+
+ List records = dataGen.generateInserts(newCommitTime, 10);
+ JavaRDD writeRecords = jsc.parallelize(records, 1);
+ operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
+ return records;
+ }
+
+ private JavaRDD operateFunc(
+ HoodieClientTestBase.Function3, SparkRDDWriteClient, JavaRDD, String> writeFn,
+ SparkRDDWriteClient client, JavaRDD writeRecords, String commitTime)
+ throws IOException {
+ return writeFn.apply(client, writeRecords, commitTime);
+ }
+}