diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index dcd6a2cf3c8e9..7b967ad064900 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -18,16 +18,14 @@ package org.apache.hudi.cli.commands; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.specific.SpecificData; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieCommitMetadata; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.commands.SparkMain.SparkCommand; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -37,6 +35,16 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; import org.springframework.shell.standard.ShellComponent; import org.springframework.shell.standard.ShellMethod; import org.springframework.shell.standard.ShellOption; @@ -52,6 +60,37 @@ */ @ShellComponent public class ArchivedCommitsCommand { + private static final Logger LOG = LogManager.getLogger(ArchivedCommitsCommand.class); + @ShellMethod(key = "trigger archival", value = "trigger archival") + public String triggerArchival( + @ShellOption(value = {"--minCommits"}, + help = "Minimum number of instants to retain in the active timeline. See hoodie.keep.min.commits", + defaultValue = "20") int minCommits, + @ShellOption(value = {"--maxCommits"}, + help = "Maximum number of instants to retain in the active timeline. See hoodie.keep.max.commits", + defaultValue = "30") int maxCommits, + @ShellOption(value = {"--commitsRetainedByCleaner"}, help = "Number of commits to retain, without cleaning", + defaultValue = "10") int retained, + @ShellOption(value = {"--enableMetadata"}, + help = "Enable the internal metadata table which serves table metadata like level file listings", + defaultValue = "true") boolean enableMetadata, + @ShellOption(value = "--sparkMemory", defaultValue = "1G", + help = "Spark executor memory") final String sparkMemory, + @ShellOption(value = "--sparkMaster", defaultValue = "local", help = "Spark Master") String master) throws Exception { + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + String cmd = SparkCommand.ARCHIVE.toString(); + sparkLauncher.addAppArgs(cmd, master, sparkMemory, Integer.toString(minCommits), Integer.toString(maxCommits), + Integer.toString(retained), Boolean.toString(enableMetadata), HoodieCLI.basePath); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to trigger archival"; + } + return "Archival successfully triggered"; + } @ShellMethod(key = "show archived commit stats", value = "Read commits from archived files and show details") public String showArchivedCommits( @@ -206,4 +245,5 @@ private Comparable[] readCommit(GenericRecord record, boolean skipMetadata) { return new Comparable[] {}; } } + } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 6649eaf7669e5..51e9bccac605c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -22,11 +22,14 @@ import org.apache.hudi.cli.DeDupeType; import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; @@ -37,6 +40,7 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieBootstrapConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -98,7 +102,7 @@ enum SparkCommand { BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE, - REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION + REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION, ARCHIVE } public static void main(String[] args) throws Exception { @@ -290,6 +294,10 @@ public static void main(String[] args) throws Exception { assert (args.length == 6); returnCode = renamePartition(jsc, args[3], args[4], args[5]); break; + case ARCHIVE: + assert (args.length == 8); + returnCode = archive(jsc, Integer.parseInt(args[3]), Integer.parseInt(args[4]), Integer.parseInt(args[5]), Boolean.parseBoolean(args[6]), args[7]); + break; default: break; } @@ -646,4 +654,23 @@ private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbac HoodieFailedWritesCleaningPolicy.EAGER).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); } + + private static int archive(JavaSparkContext jsc, int minCommits, int maxCommits, int commitsRetained, boolean enableMetadata, String basePath) { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build()) + .withEmbeddedTimelineServerEnabled(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()) + .build(); + HoodieEngineContext context = new HoodieSparkEngineContext(jsc); + HoodieSparkTable table = HoodieSparkTable.create(config, context); + try { + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); + archiver.archiveIfRequired(context,true); + } catch (IOException ioe) { + LOG.error("Failed to archive with IOException: " + ioe); + return -1; + } + return 0; + } } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java new file mode 100644 index 0000000000000..1a10d41c9ac6f --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; +import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil; +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.shell.Shell; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("functional") +@SpringBootTest(properties = {"spring.shell.interactive.enabled=false", "spring.shell.command.script.enabled=false"}) +public class TestArchiveCommand extends CLIFunctionalTestHarness { + + @Autowired + private Shell shell; + + @Test + public void testArchiving() throws Exception { + HoodieCLI.conf = hadoopConf(); + + // Create table and connect + String tableName = tableName(); + String tablePath = tablePath(tableName); + + new TableCommand().createTable( + tablePath, tableName, + "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload"); + + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + + // Create six commits + for (int i = 100; i < 106; i++) { + String timestamp = String.valueOf(i); + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath,timestamp, hadoopConf()); + } + + Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2 --maxCommits 3 --commitsRetainedByCleaner 1 --enableMetadata false"); + assertTrue(ShellEvaluationResultUtil.isSuccess(cmdResult)); + metaClient = HoodieTableMetaClient.reload(metaClient); + + //get instants in the active timeline only returns the latest state of the commit + //therefore we expect 2 instants because minCommits is 2 + assertEquals(2, metaClient.getActiveTimeline().getInstants().count()); + + //get instants in the archived timeline returns all instants in the commit + //therefore we expect 12 instants because 6 commits - 2 commits in active timeline = 4 in archived + //since each commit is completed, there are 3 instances per commit (requested, inflight, completed) + //and 3 instances per commit * 4 commits = 12 instances + assertEquals(12, metaClient.getArchivedTimeline().getInstants().count()); + } + +} +