From 5ad7698f1df011d9fff8ea91d8b7a474528f6ef8 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 17 Oct 2022 10:00:30 -0400 Subject: [PATCH 01/11] current progress --- .../cli/commands/ArchivedCommitsCommand.java | 41 ++++++++++ .../hudi/cli/commands/TestArchiveCommand.java | 82 +++++++++++++++++++ .../commands/TestArchivedCommitsCommand.java | 20 +++++ 3 files changed, 143 insertions(+) create mode 100644 hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index dcd6a2cf3c8e..426189447959 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -28,7 +28,12 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.client.HoodieTimelineArchiver; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -37,6 +42,13 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkTable; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; import org.springframework.shell.standard.ShellComponent; import org.springframework.shell.standard.ShellMethod; import org.springframework.shell.standard.ShellOption; @@ -52,6 +64,29 @@ */ @ShellComponent public class ArchivedCommitsCommand { + private static final Logger LOG = LogManager.getLogger(ArchivedCommitsCommand.class); + private JavaSparkContext jsc; + @ShellMethod(key = "trigger archival", value = "trigger archival") + public void triggerArchival(@ShellOption(value = {"--minCommits"}, + help = "Minimum number of instants to retain in the active timeline. See hoodie.keep.min.commits", + defaultValue = "20") int minCommits, + @ShellOption(value = {"--maxCommits"}, + help = "Maximum number of instants to retain in the active timeline. See hoodie.keep.max.commits", + defaultValue = "30") int maxCommits) { + + initJavaSparkContext(); + HoodieArchivalConfig arCfg = HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build(); + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath).withArchivalConfig(arCfg).build(); + HoodieEngineContext context = new HoodieSparkEngineContext(jsc); + HoodieSparkTable table = HoodieSparkTable.create(config, context); + try { + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); + archiver.archiveIfRequired(context,true); + } catch (IOException ioe) { + LOG.error("Failed to archive with IOException: " + ioe); + } + } @ShellMethod(key = "show archived commit stats", value = "Read commits from archived files and show details") public String showArchivedCommits( @@ -206,4 +241,10 @@ private Comparable[] readCommit(GenericRecord record, boolean skipMetadata) { return new Comparable[] {}; } } + + private void initJavaSparkContext() { + if (jsc == null) { + jsc = SparkUtil.initJavaSparkContext(SparkUtil.getDefaultConf("HoodieCLI", Option.of(SparkUtil.DEFAULT_SPARK_MASTER))); + } + } } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java new file mode 100644 index 000000000000..5e04447ed06c --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java @@ -0,0 +1,82 @@ +package org.apache.hudi.cli.commands; + + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; +import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.client.HoodieTimelineArchiver; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkTable; + +import org.junit.jupiter.api.Tag; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.shell.Shell; + +@Tag("functional") +@SpringBootTest(properties = {"spring.shell.interactive.enabled=false", "spring.shell.command.script.enabled=false"}) +public class TestArchiveCommand extends CLIFunctionalTestHarness { + + @Autowired + private Shell shell; + private String tablePath; + + @Test + public void testArchiving() throws Exception { + HoodieCLI.conf = hadoopConf(); + + // Create table and connect + String tableName = tableName(); + tablePath = tablePath(tableName); + + new TableCommand().createTable( + tablePath, tableName, + "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload"); + + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + + // Generate archive + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) + .forTable("test-trip-table").build(); + + // Create six commits + for (int i = 100; i < 106; i++) { + String timestamp = String.valueOf(i); + // Requested Compaction + HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), hadoopConf()); + // Inflight Compaction + HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), hadoopConf()); + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, hadoopConf()); + } + + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + HoodieTestUtils.createCompactionCommitInMetadataTable( + hadoopConf(), metaClient.getFs(), tablePath, "105"); + + metaClient = HoodieTableMetaClient.reload(metaClient); + // reload the timeline and get all the commits before archive + metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); + + // archive + HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + archiver.archiveIfRequired(context()); + } + + +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index b642c1b3f8eb..4ffc46f3af00 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -37,6 +37,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -63,6 +64,9 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness { private String tablePath; + + + @BeforeEach public void init() throws Exception { HoodieCLI.conf = hadoopConf(); @@ -113,6 +117,22 @@ public void init() throws Exception { archiver.archiveIfRequired(context()); } + + @Test + public void testArchiving() { + Configuration conf = HoodieCLI.conf; + + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + + for (int i = 1; i < 20; i++) { + String timestamp = String.valueOf(i); + // Write corrupted requested Clean File + HoodieTestCommitMetadataGenerator.createCommitFile(HoodieCLI.basePath, timestamp, conf); + } + + + } + /** * Test for command: show archived commit stats. */ From dd10eb82be67a56fc66c1e6cfb0ecdebd0cef5b5 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Tue, 25 Oct 2022 10:55:58 -0700 Subject: [PATCH 02/11] latest changes --- .../hudi/cli/commands/TestArchiveCommand.java | 12 ++++-------- .../cli/commands/TestArchivedCommitsCommand.java | 16 ---------------- 2 files changed, 4 insertions(+), 24 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java index 5e04447ed06c..eca4b8d436af 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java @@ -15,7 +15,9 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.shell.Shell; @@ -26,7 +28,7 @@ public class TestArchiveCommand extends CLIFunctionalTestHarness { @Autowired private Shell shell; - private String tablePath; + @Test public void testArchiving() throws Exception { @@ -34,7 +36,7 @@ public void testArchiving() throws Exception { // Create table and connect String tableName = tableName(); - tablePath = tablePath(tableName); + String tablePath = tablePath(tableName); new TableCommand().createTable( tablePath, tableName, @@ -54,12 +56,6 @@ public void testArchiving() throws Exception { // Create six commits for (int i = 100; i < 106; i++) { String timestamp = String.valueOf(i); - // Requested Compaction - HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, - new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), hadoopConf()); - // Inflight Compaction - HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), hadoopConf()); HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, hadoopConf()); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index 4ffc46f3af00..0aa377c7dd65 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -117,22 +117,6 @@ public void init() throws Exception { archiver.archiveIfRequired(context()); } - - @Test - public void testArchiving() { - Configuration conf = HoodieCLI.conf; - - HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); - - for (int i = 1; i < 20; i++) { - String timestamp = String.valueOf(i); - // Write corrupted requested Clean File - HoodieTestCommitMetadataGenerator.createCommitFile(HoodieCLI.basePath, timestamp, conf); - } - - - } - /** * Test for command: show archived commit stats. */ From 48afc1da111b9494da1ec07e264e4f721006318d Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 27 Oct 2022 10:47:22 -0700 Subject: [PATCH 03/11] working now --- .../cli/commands/ArchivedCommitsCommand.java | 42 +++++++++--- .../hudi/cli/commands/TestArchiveCommand.java | 67 +++++++++++-------- 2 files changed, 70 insertions(+), 39 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index 426189447959..294fcdc21221 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -28,7 +28,6 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; -import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -43,12 +42,15 @@ import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; import org.springframework.shell.standard.ShellComponent; import org.springframework.shell.standard.ShellMethod; import org.springframework.shell.standard.ShellOption; @@ -67,17 +69,22 @@ public class ArchivedCommitsCommand { private static final Logger LOG = LogManager.getLogger(ArchivedCommitsCommand.class); private JavaSparkContext jsc; @ShellMethod(key = "trigger archival", value = "trigger archival") - public void triggerArchival(@ShellOption(value = {"--minCommits"}, - help = "Minimum number of instants to retain in the active timeline. See hoodie.keep.min.commits", - defaultValue = "20") int minCommits, - @ShellOption(value = {"--maxCommits"}, - help = "Maximum number of instants to retain in the active timeline. See hoodie.keep.max.commits", - defaultValue = "30") int maxCommits) { + public void triggerArchival( + @ShellOption(value = {"--minCommits"}, + help = "Minimum number of instants to retain in the active timeline. See hoodie.keep.min.commits", + defaultValue = "20") int minCommits, + @ShellOption(value = {"--maxCommits"}, + help = "Maximum number of instants to retain in the active timeline. See hoodie.keep.max.commits", + defaultValue = "30") int maxCommits, + @ShellOption(value = {"--commitsRetained"}, help = "Number of commits to retain, without cleaning", + defaultValue = "10") int retained) { initJavaSparkContext(); - HoodieArchivalConfig arCfg = HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build(); - - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath).withArchivalConfig(arCfg).build(); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(retained).build()) + .withEmbeddedTimelineServerEnabled(false) + .build(); HoodieEngineContext context = new HoodieSparkEngineContext(jsc); HoodieSparkTable table = HoodieSparkTable.create(config, context); try { @@ -244,7 +251,20 @@ private Comparable[] readCommit(GenericRecord record, boolean skipMetadata) { private void initJavaSparkContext() { if (jsc == null) { - jsc = SparkUtil.initJavaSparkContext(SparkUtil.getDefaultConf("HoodieCLI", Option.of(SparkUtil.DEFAULT_SPARK_MASTER))); + SparkConf sparkConf = new SparkConf(); + sparkConf.set("spark.app.name", getClass().getName()); + sparkConf.set("spark.master", "local[*]"); + sparkConf.set("spark.default.parallelism", "4"); + sparkConf.set("spark.sql.shuffle.partitions", "4"); + sparkConf.set("spark.driver.maxResultSize", "2g"); + sparkConf.set("spark.hadoop.mapred.output.compress", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); + sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); + sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate(); + jsc = new JavaSparkContext(spark.sparkContext()); + //sc = SparkUtil.initJavaSparkContext(SparkUtil.getDefaultConf("HoodieCLI", Option.of(SparkUtil.DEFAULT_SPARK_MASTER))); } } } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java index eca4b8d436af..2f155a5c8c39 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java @@ -1,27 +1,42 @@ -package org.apache.hudi.cli.commands; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hudi.cli.commands; +import org.apache.hudi.HoodieTestCommitGenerator; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; -import org.apache.hudi.client.HoodieTimelineArchiver; +import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieTestUtils; -import org.apache.hudi.config.HoodieArchivalConfig; -import org.apache.hudi.config.HoodieCleanConfig; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieSparkTable; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.shell.Shell; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + @Tag("functional") @SpringBootTest(properties = {"spring.shell.interactive.enabled=false", "spring.shell.command.script.enabled=false"}) public class TestArchiveCommand extends CLIFunctionalTestHarness { @@ -29,7 +44,6 @@ public class TestArchiveCommand extends CLIFunctionalTestHarness { @Autowired private Shell shell; - @Test public void testArchiving() throws Exception { HoodieCLI.conf = hadoopConf(); @@ -42,37 +56,34 @@ public void testArchiving() throws Exception { tablePath, tableName, "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload"); - HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); - // Generate archive - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) - .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) - .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) - .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withRemoteServerPort(timelineServicePort).build()) - .forTable("test-trip-table").build(); + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); // Create six commits for (int i = 100; i < 106; i++) { String timestamp = String.valueOf(i); - HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, hadoopConf()); + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath,timestamp, hadoopConf()); } - // Simulate a compaction commit in metadata table timeline - // so the archival in data table can happen + //need to create compaction commit or else nothing will be archived HoodieTestUtils.createCompactionCommitInMetadataTable( hadoopConf(), metaClient.getFs(), tablePath, "105"); + Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2 --maxCommits 3 --commitsRetained 1"); + assertTrue(ShellEvaluationResultUtil.isSuccess(cmdResult)); metaClient = HoodieTableMetaClient.reload(metaClient); - // reload the timeline and get all the commits before archive - metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); - // archive - HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); - HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); - archiver.archiveIfRequired(context()); - } + //get instants in the active timeline only returns the latest state of the commit + //therefore we expect 2 instants because minCommits is 2 + assertEquals(2, metaClient.getActiveTimeline().getInstants().count()); + //get instants in the archived timeline returns all instants in the commit + //therefore we expect 12 instants because 6 commits - 2 commits in active timeline = 4 in archived + //since each commit is completed, there are 3 instances per commit (requested, inflight, completed) + //and 3 instances per commit * 4 commits = 12 instances + metaClient.getArchivedTimeline().getInstants().forEach(u -> System.out.println("archived: " + u.toString())); + assertEquals(12, metaClient.getArchivedTimeline().getInstants().count()); + } } + From defcf17a27ae96e4d17150ac405ce44b8311de64 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 27 Oct 2022 10:52:29 -0700 Subject: [PATCH 04/11] reverted wrong changes --- .../apache/hudi/cli/commands/TestArchivedCommitsCommand.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index 0aa377c7dd65..b642c1b3f8eb 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -37,7 +37,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -64,9 +63,6 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness { private String tablePath; - - - @BeforeEach public void init() throws Exception { HoodieCLI.conf = hadoopConf(); From 23f04f96b8d93586ee76d54dc1cab7e5596dc89c Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 27 Oct 2022 10:53:33 -0700 Subject: [PATCH 05/11] got rid of debugging prints --- .../java/org/apache/hudi/cli/commands/TestArchiveCommand.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java index 2f155a5c8c39..378ab77e50e1 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java @@ -18,14 +18,11 @@ package org.apache.hudi.cli.commands; -import org.apache.hudi.HoodieTestCommitGenerator; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.junit.jupiter.api.Tag; @@ -81,7 +78,6 @@ public void testArchiving() throws Exception { //therefore we expect 12 instants because 6 commits - 2 commits in active timeline = 4 in archived //since each commit is completed, there are 3 instances per commit (requested, inflight, completed) //and 3 instances per commit * 4 commits = 12 instances - metaClient.getArchivedTimeline().getInstants().forEach(u -> System.out.println("archived: " + u.toString())); assertEquals(12, metaClient.getArchivedTimeline().getInstants().count()); } From 116b0903e9816284f38bca08b5d87c22c909a4b4 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 27 Oct 2022 10:54:54 -0700 Subject: [PATCH 06/11] got rid of commented line --- .../org/apache/hudi/cli/commands/ArchivedCommitsCommand.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index 294fcdc21221..ce906b898855 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -264,7 +264,6 @@ private void initJavaSparkContext() { sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate(); jsc = new JavaSparkContext(spark.sparkContext()); - //sc = SparkUtil.initJavaSparkContext(SparkUtil.getDefaultConf("HoodieCLI", Option.of(SparkUtil.DEFAULT_SPARK_MASTER))); } } } From 82cc7354ee41a3983583dc8a5455110c9b66f0f6 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 27 Oct 2022 11:09:11 -0700 Subject: [PATCH 07/11] disabled metadata table --- .../apache/hudi/cli/commands/ArchivedCommitsCommand.java | 2 ++ .../org/apache/hudi/cli/commands/TestArchiveCommand.java | 7 +------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index ce906b898855..d58c903c4c99 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -30,6 +30,7 @@ import org.apache.hudi.cli.TableHeader; import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; @@ -84,6 +85,7 @@ public void triggerArchival( .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build()) .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(retained).build()) .withEmbeddedTimelineServerEnabled(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .build(); HoodieEngineContext context = new HoodieSparkEngineContext(jsc); HoodieSparkTable table = HoodieSparkTable.create(config, context); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java index 378ab77e50e1..2e7f0a7b5ad8 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java @@ -52,8 +52,7 @@ public void testArchiving() throws Exception { new TableCommand().createTable( tablePath, tableName, "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload"); - - + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); // Create six commits @@ -62,10 +61,6 @@ public void testArchiving() throws Exception { HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath,timestamp, hadoopConf()); } - //need to create compaction commit or else nothing will be archived - HoodieTestUtils.createCompactionCommitInMetadataTable( - hadoopConf(), metaClient.getFs(), tablePath, "105"); - Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2 --maxCommits 3 --commitsRetained 1"); assertTrue(ShellEvaluationResultUtil.isSuccess(cmdResult)); metaClient = HoodieTableMetaClient.reload(metaClient); From a2d3672965a7bce164434b6cf64a7ec422db71da Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 27 Oct 2022 12:12:34 -0700 Subject: [PATCH 08/11] removed unused import --- .../java/org/apache/hudi/cli/commands/TestArchiveCommand.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java index 2e7f0a7b5ad8..91dda89bfd1e 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java @@ -23,7 +23,6 @@ import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.testutils.HoodieTestUtils; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -52,7 +51,7 @@ public void testArchiving() throws Exception { new TableCommand().createTable( tablePath, tableName, "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload"); - + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); // Create six commits From 565b83f6ba744fd62312d9e2dff8dbf6956d6b80 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 31 Oct 2022 14:17:15 -0400 Subject: [PATCH 09/11] added enableMetadata option --- .../apache/hudi/cli/commands/ArchivedCommitsCommand.java | 7 +++++-- .../org/apache/hudi/cli/commands/TestArchiveCommand.java | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index d58c903c4c99..f133a2b7116d 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -78,14 +78,17 @@ public void triggerArchival( help = "Maximum number of instants to retain in the active timeline. See hoodie.keep.max.commits", defaultValue = "30") int maxCommits, @ShellOption(value = {"--commitsRetained"}, help = "Number of commits to retain, without cleaning", - defaultValue = "10") int retained) { + defaultValue = "10") int retained, + @ShellOption(value = {"--enableMetadata"}, + help = "Enable the internal metadata table which serves table metadata like level file listings", + defaultValue = "true") boolean enableMetadata) { initJavaSparkContext(); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath) .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build()) .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(retained).build()) .withEmbeddedTimelineServerEnabled(false) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()) .build(); HoodieEngineContext context = new HoodieSparkEngineContext(jsc); HoodieSparkTable table = HoodieSparkTable.create(config, context); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java index 91dda89bfd1e..f29b9c04539b 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java @@ -60,7 +60,7 @@ public void testArchiving() throws Exception { HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath,timestamp, hadoopConf()); } - Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2 --maxCommits 3 --commitsRetained 1"); + Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2 --maxCommits 3 --commitsRetained 1 --enableMetadata false"); assertTrue(ShellEvaluationResultUtil.isSuccess(cmdResult)); metaClient = HoodieTableMetaClient.reload(metaClient); From ccdb305daa9c8392ab9898b395a3987697627e80 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 2 Nov 2022 15:57:14 -0400 Subject: [PATCH 10/11] redid the command matching how the others used jsc --- .../cli/commands/ArchivedCommitsCommand.java | 79 +++++++------------ .../apache/hudi/cli/commands/SparkMain.java | 29 ++++++- 2 files changed, 55 insertions(+), 53 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index f133a2b7116d..fde60c9a26b9 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -18,22 +18,15 @@ package org.apache.hudi.cli.commands; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.specific.SpecificData; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieCommitMetadata; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; -import org.apache.hudi.client.HoodieTimelineArchiver; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.cli.commands.SparkMain.SparkCommand; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -42,16 +35,16 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieArchivalConfig; -import org.apache.hudi.config.HoodieCleanConfig; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieSparkTable; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; import org.springframework.shell.standard.ShellComponent; import org.springframework.shell.standard.ShellMethod; import org.springframework.shell.standard.ShellOption; @@ -68,9 +61,8 @@ @ShellComponent public class ArchivedCommitsCommand { private static final Logger LOG = LogManager.getLogger(ArchivedCommitsCommand.class); - private JavaSparkContext jsc; @ShellMethod(key = "trigger archival", value = "trigger archival") - public void triggerArchival( + public String triggerArchival( @ShellOption(value = {"--minCommits"}, help = "Minimum number of instants to retain in the active timeline. See hoodie.keep.min.commits", defaultValue = "20") int minCommits, @@ -81,23 +73,23 @@ public void triggerArchival( defaultValue = "10") int retained, @ShellOption(value = {"--enableMetadata"}, help = "Enable the internal metadata table which serves table metadata like level file listings", - defaultValue = "true") boolean enableMetadata) { - - initJavaSparkContext(); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath) - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build()) - .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(retained).build()) - .withEmbeddedTimelineServerEnabled(false) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()) - .build(); - HoodieEngineContext context = new HoodieSparkEngineContext(jsc); - HoodieSparkTable table = HoodieSparkTable.create(config, context); - try { - HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); - archiver.archiveIfRequired(context,true); - } catch (IOException ioe) { - LOG.error("Failed to archive with IOException: " + ioe); + defaultValue = "true") boolean enableMetadata, + @ShellOption(value = "--sparkMemory", defaultValue = "1G", + help = "Spark executor memory") final String sparkMemory, + @ShellOption(value = "--sparkMaster", defaultValue = "local", help = "Spark Master") String master) throws Exception { + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + String cmd = SparkCommand.ARCHIVE.toString(); + sparkLauncher.addAppArgs(cmd, master, sparkMemory, Integer.toString(minCommits), Integer.toString(maxCommits), + Integer.toString(retained), Boolean.toString(enableMetadata), HoodieCLI.basePath); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to trigger archival"; } + return "Archival successfully triggered"; } @ShellMethod(key = "show archived commit stats", value = "Read commits from archived files and show details") @@ -254,21 +246,4 @@ private Comparable[] readCommit(GenericRecord record, boolean skipMetadata) { } } - private void initJavaSparkContext() { - if (jsc == null) { - SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.app.name", getClass().getName()); - sparkConf.set("spark.master", "local[*]"); - sparkConf.set("spark.default.parallelism", "4"); - sparkConf.set("spark.sql.shuffle.partitions", "4"); - sparkConf.set("spark.driver.maxResultSize", "2g"); - sparkConf.set("spark.hadoop.mapred.output.compress", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); - sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); - sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate(); - jsc = new JavaSparkContext(spark.sparkContext()); - } - } } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 6649eaf7669e..51e9bccac605 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -22,11 +22,14 @@ import org.apache.hudi.cli.DeDupeType; import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; @@ -37,6 +40,7 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieBootstrapConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -98,7 +102,7 @@ enum SparkCommand { BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE, - REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION + REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION, ARCHIVE } public static void main(String[] args) throws Exception { @@ -290,6 +294,10 @@ public static void main(String[] args) throws Exception { assert (args.length == 6); returnCode = renamePartition(jsc, args[3], args[4], args[5]); break; + case ARCHIVE: + assert (args.length == 8); + returnCode = archive(jsc, Integer.parseInt(args[3]), Integer.parseInt(args[4]), Integer.parseInt(args[5]), Boolean.parseBoolean(args[6]), args[7]); + break; default: break; } @@ -646,4 +654,23 @@ private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbac HoodieFailedWritesCleaningPolicy.EAGER).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); } + + private static int archive(JavaSparkContext jsc, int minCommits, int maxCommits, int commitsRetained, boolean enableMetadata, String basePath) { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build()) + .withEmbeddedTimelineServerEnabled(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()) + .build(); + HoodieEngineContext context = new HoodieSparkEngineContext(jsc); + HoodieSparkTable table = HoodieSparkTable.create(config, context); + try { + HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); + archiver.archiveIfRequired(context,true); + } catch (IOException ioe) { + LOG.error("Failed to archive with IOException: " + ioe); + return -1; + } + return 0; + } } From d4f5107ca1bd9bba7bda422609072e971f89f12c Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 2 Nov 2022 16:46:26 -0400 Subject: [PATCH 11/11] changed commitsRetained to commitsRetainedByCleaner --- .../org/apache/hudi/cli/commands/ArchivedCommitsCommand.java | 2 +- .../java/org/apache/hudi/cli/commands/TestArchiveCommand.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index fde60c9a26b9..7b967ad06490 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -69,7 +69,7 @@ public String triggerArchival( @ShellOption(value = {"--maxCommits"}, help = "Maximum number of instants to retain in the active timeline. See hoodie.keep.max.commits", defaultValue = "30") int maxCommits, - @ShellOption(value = {"--commitsRetained"}, help = "Number of commits to retain, without cleaning", + @ShellOption(value = {"--commitsRetainedByCleaner"}, help = "Number of commits to retain, without cleaning", defaultValue = "10") int retained, @ShellOption(value = {"--enableMetadata"}, help = "Enable the internal metadata table which serves table metadata like level file listings", diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java index f29b9c04539b..1a10d41c9ac6 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java @@ -60,7 +60,7 @@ public void testArchiving() throws Exception { HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath,timestamp, hadoopConf()); } - Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2 --maxCommits 3 --commitsRetained 1 --enableMetadata false"); + Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2 --maxCommits 3 --commitsRetainedByCleaner 1 --enableMetadata false"); assertTrue(ShellEvaluationResultUtil.isSuccess(cmdResult)); metaClient = HoodieTableMetaClient.reload(metaClient);