diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MarkersCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MarkersCommand.java new file mode 100644 index 0000000000000..57a4ee1879855 --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MarkersCommand.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.spark.launcher.SparkLauncher; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +/** + * CLI command for marker options. + */ +@Component +public class MarkersCommand implements CommandMarker { + + @CliCommand(value = "marker delete", help = "Delete the marker") + public String deleteMarker(@CliOption(key = {"commit"}, help = "Delete a marker") final String instantTime, + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G", + help = "Spark executor memory") final String sparkMemory) + throws Exception { + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkMain.SparkCommand.DELETE_MARKER.toString(), master, sparkMemory, instantTime, + metaClient.getBasePath()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + // Refresh the current + HoodieCLI.refreshTableMetadata(); + if (exitCode != 0) { + return String.format("Failed: Could not delete marker \"%s\".", instantTime); + } + return String.format("Marker \"%s\" deleted.", instantTime); + } +} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index aa4dcd8c00ab5..0de1a1adfe0be 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -18,6 +18,7 @@ package org.apache.hudi.cli.commands; +import org.apache.hadoop.fs.Path; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.cli.DeDupeType; import org.apache.hudi.cli.DedupeSparkJob; @@ -25,6 +26,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -38,7 +40,9 @@ import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; +import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.utilities.HDFSParquetImporter; @@ -51,8 +55,6 @@ import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; - -import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; @@ -76,7 +78,7 @@ public class SparkMain { enum SparkCommand { BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, - CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE + CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE } public static void main(String[] args) throws Exception { @@ -234,6 +236,10 @@ public static void main(String[] args) throws Exception { assert (args.length == 7); returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]); break; + case DELETE_MARKER: + assert (args.length == 5); + returnCode = deleteMarker(jsc, args[3], args[4]); + break; case DELETE_SAVEPOINT: assert (args.length == 5); returnCode = deleteSavepoint(jsc, args[3], args[4]); @@ -277,6 +283,21 @@ protected static void clean(JavaSparkContext jsc, String basePath, String propsF new HoodieCleaner(cfg, jsc).run(); } + protected static int deleteMarker(JavaSparkContext jsc, String instantTime, String basePath) { + try { + SparkRDDWriteClient client = createHoodieClient(jsc, basePath); + HoodieWriteConfig config = client.getConfig(); + HoodieEngineContext context = client.getEngineContext(); + HoodieSparkTable table = HoodieSparkTable.create(config, context, true); + WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + return 0; + } catch (Exception e) { + LOG.warn(String.format("Failed: Could not clean marker instantTime: \"%s\".", instantTime), e); + return -1; + } + } + private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName, String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, int retry, String propsFilePath, List configs) { diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java new file mode 100644 index 0000000000000..221a29f5250d2 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.integ; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.cli.commands.TableCommand; +import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.FileCreateUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.shell.core.CommandResult; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test class for {@link org.apache.hudi.cli.commands.MarkersCommand}. + *

+ * A command use SparkLauncher need load jars under lib which generate during mvn package. + * Use integration test instead of unit test. + */ +public class ITTestMarkersCommand extends AbstractShellIntegrationTest { + + private String tablePath; + + @BeforeEach + public void init() throws IOException { + String tableName = "test_table"; + tablePath = basePath + Path.SEPARATOR + tableName; + + // Create table and connect + new TableCommand().createTable( + tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + } + + /** + * Test case of command 'marker delete'. + */ + @Test + public void testDeleteMarker() throws IOException { + // generate markers + String instantTime1 = "101"; + + FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f0", IOType.APPEND); + FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f1", IOType.APPEND); + + assertEquals(2, FileCreateUtils.getTotalMarkerFileCount(tablePath, "partA", instantTime1, IOType.APPEND)); + + CommandResult cr = getShell().executeCommand( + String.format("marker delete --commit %s --sparkMaster %s", instantTime1, "local")); + assertTrue(cr.isSuccess()); + + assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, "partA", instantTime1, IOType.APPEND)); + } +}