Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,40 @@ public String runClustering(
}
return "Succeeded to run clustering for " + clusteringInstantTime;
}

/**
* Run clustering table service.
* <p>
* Example:
* > connect --path {path to hudi table}
* > clustering scheduleAndExecute --sparkMaster local --sparkMemory 2g
*/
@CliCommand(value = "clustering scheduleAndExecute", help = "Run Clustering. Make a cluster plan first and execute that plan immediately")
public String runClustering(
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
@CliOption(key = "sparkMemory", help = "Spark executor memory", unspecifiedDefaultValue = "4g") final String sparkMemory,
@CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", unspecifiedDefaultValue = "1") final String parallelism,
@CliOption(key = "retry", help = "Number of retries", unspecifiedDefaultValue = "1") final String retry,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for "
+ "hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be "
+ "passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);

String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory,
client.getBasePath(), client.getTableConfig().getTableName(), parallelism, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to run clustering for scheduleAndExecute.";
}
return "Succeeded to run clustering for scheduleAndExecute";
}
}
23 changes: 18 additions & 5 deletions hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class SparkMain {
enum SparkCommand {
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -190,7 +190,20 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2],
Integer.parseInt(args[7]), false, propsFilePath, configs);
Integer.parseInt(args[7]), HoodieClusteringJob.EXECUTE, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE_AND_EXECUTE:
assert (args.length >= 8);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[7])) {
propsFilePath = args[7];
}
configs = new ArrayList<>();
if (args.length > 8) {
configs.addAll(Arrays.asList(args).subList(8, args.length));
}
returnCode = cluster(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[2],
Integer.parseInt(args[6]), HoodieClusteringJob.SCHEDULE_AND_EXECUTE, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE:
assert (args.length >= 7);
Expand All @@ -203,7 +216,7 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(7, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2],
0, true, propsFilePath, configs);
0, HoodieClusteringJob.SCHEDULE, propsFilePath, configs);
break;
case CLEAN:
assert (args.length >= 5);
Expand Down Expand Up @@ -351,13 +364,13 @@ private static int compact(JavaSparkContext jsc, String basePath, String tableNa
}

private static int cluster(JavaSparkContext jsc, String basePath, String tableName, String clusteringInstant,
int parallelism, String sparkMemory, int retry, boolean schedule, String propsFilePath, List<String> configs) {
int parallelism, String sparkMemory, int retry, String runningMode, String propsFilePath, List<String> configs) {
HoodieClusteringJob.Config cfg = new HoodieClusteringJob.Config();
cfg.basePath = basePath;
cfg.tableName = tableName;
cfg.clusteringInstantTime = clusteringInstant;
cfg.parallelism = parallelism;
cfg.runSchedule = schedule;
cfg.runningMode = runningMode;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
jsc.getConf().set("spark.executor.memory", sparkMemory);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.cli.integ;

import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Integration test class for {@link org.apache.hudi.cli.commands.ClusteringCommand}.
* <p/>
* A command use SparkLauncher need load jars under lib which generate during mvn package.
* Use integration test instead of unit test.
*/
public class ITTestClusteringCommand extends AbstractShellIntegrationTest {

private String tablePath;
private String tableName;

@BeforeEach
public void init() throws IOException {
tableName = "test_table_" + ITTestClusteringCommand.class.getName();
tablePath = Paths.get(basePath, tableName).toString();

HoodieCLI.conf = jsc.hadoopConfiguration();
// Create table and connect
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
metaClient.setBasePath(tablePath);
metaClient = HoodieTableMetaClient.reload(metaClient);
}

/**
* Test case for command 'clustering schedule'.
*/
@Test
public void testScheduleClustering() throws IOException {
// generate commits
generateCommits();

CommandResult cr = scheduleClustering();
assertAll("Command run failed",
() -> assertTrue(cr.isSuccess()),
() -> assertTrue(
cr.getResult().toString().startsWith("Succeeded to schedule clustering for")));

// there is 1 requested clustering
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
assertEquals(1, timeline.filterPendingReplaceTimeline().countInstants());
}

/**
* Test case for command 'clustering run'.
*/
@Test
public void testClustering() throws IOException {
// generate commits
generateCommits();

CommandResult cr1 = scheduleClustering();
assertTrue(cr1.isSuccess());

// get clustering instance
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
Option<String> instance =
timeline.filterPendingReplaceTimeline().firstInstant().map(HoodieInstant::getTimestamp);
assertTrue(instance.isPresent(), "Must have pending clustering.");

CommandResult cr2 = getShell().executeCommand(
String.format("clustering run --parallelism %s --clusteringInstant %s --sparkMaster %s",
2, instance, "local"));

assertAll("Command run failed",
() -> assertTrue(cr2.isSuccess()),
() -> assertTrue(
cr2.getResult().toString().startsWith("Succeeded to run clustering for ")));

// assert clustering complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.filterCompletedInstants().getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending clustering must be completed");

assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.getCompletedReplaceTimeline().getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending clustering must be completed");
}

/**
* Test case for command 'clustering scheduleAndExecute'.
*/
@Test
public void testClusteringScheduleAndExecute() throws IOException {
// generate commits
generateCommits();

CommandResult cr2 = getShell().executeCommand(
String.format("clustering scheduleAndExecute --parallelism %s --sparkMaster %s", 2, "local"));

assertAll("Command run failed",
() -> assertTrue(cr2.isSuccess()),
() -> assertTrue(
cr2.getResult().toString().startsWith("Succeeded to run clustering for scheduleAndExecute")));

// assert clustering complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.getCompletedReplaceTimeline().getInstants()
.map(HoodieInstant::getTimestamp).count() > 0,
"Completed clustering couldn't be 0");
}

private CommandResult scheduleClustering() {
// generate requested clustering
return getShell().executeCommand(
String.format("clustering schedule --hoodieConfigs hoodie.clustering.inline.max.commits=1 --sparkMaster %s", "local"));
}

private void generateCommits() throws IOException {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();

SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);

insert(jsc, client, dataGen, "001");
insert(jsc, client, dataGen, "002");
}

private List<HoodieRecord> insert(JavaSparkContext jsc, SparkRDDWriteClient<HoodieAvroPayload> client,
HoodieTestDataGenerator dataGen, String newCommitTime) throws IOException {
// inserts
client.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
return records;
}

private JavaRDD<WriteStatus> operateFunc(
HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord> writeRecords, String commitTime)
throws IOException {
return writeFn.apply(client, writeRecords, commitTime);
}
}