diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 552adfa8f3490..3ebaba56d97c1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -274,6 +274,12 @@ public Option firstInstant() { return Option.fromJavaOptional(instants.stream().findFirst()); } + @Override + public Option firstInstant(String action, State state) { + return Option.fromJavaOptional(instants.stream() + .filter(s -> action.equals(s.getAction()) && state.equals(s.getState())).findFirst()); + } + @Override public Option nthInstant(int n) { if (empty() || n >= countInstants()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index b473c7b1fb4d1..3c4d69f3d1a4d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -209,6 +209,13 @@ public interface HoodieTimeline extends Serializable { */ Option firstInstant(); + /** + * @param action Instant action String. + * @param state Instant State. + * @return first instant of a specific action and state if available + */ + Option firstInstant(String action, State state); + /** * @return nth completed instant from the first completed instant */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index e7ecde167c38f..9d89c2a6b5feb 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; + import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -176,6 +177,15 @@ public void testTimelineOperations() { assertFalse(timeline.empty()); assertFalse(timeline.getCommitTimeline().filterPendingExcludingCompaction().empty()); assertEquals(12, timeline.countInstants()); + assertEquals("01", timeline.firstInstant( + HoodieTimeline.COMMIT_ACTION, State.COMPLETED).get().getTimestamp()); + assertEquals("21", timeline.firstInstant( + HoodieTimeline.COMMIT_ACTION, State.INFLIGHT).get().getTimestamp()); + assertFalse(timeline.firstInstant( + HoodieTimeline.COMMIT_ACTION, State.REQUESTED).isPresent()); + assertFalse(timeline.firstInstant( + HoodieTimeline.REPLACE_COMMIT_ACTION, State.COMPLETED).isPresent()); + HoodieTimeline activeCommitTimeline = timeline.getCommitTimeline().filterCompletedInstants(); assertEquals(10, activeCommitTimeline.countInstants()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index fd0a36c7d296d..bfcbe3cfb8088 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -18,11 +18,6 @@ package org.apache.hudi.utilities; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; @@ -36,10 +31,15 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieException; - import org.apache.hudi.table.HoodieSparkTable; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -53,14 +53,15 @@ public class HoodieClusteringJob { + public static final String EXECUTE = "execute"; + public static final String SCHEDULE = "schedule"; + public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute"; private static final Logger LOG = LogManager.getLogger(HoodieClusteringJob.class); private final Config cfg; private transient FileSystem fs; private TypedProperties props; private final JavaSparkContext jsc; - public static final String EXECUTE = "execute"; - public static final String SCHEDULE = "schedule"; - public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute"; + private final HoodieTableMetaClient metaClient; public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) { this.cfg = cfg; @@ -68,6 +69,7 @@ public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) { this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : readConfigFromFileSystem(jsc, cfg); + this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true); } private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { @@ -83,8 +85,10 @@ public static class Config implements Serializable { public String basePath = null; @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true) public String tableName = null; - @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only need when set --mode execute. " - + "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required = false) + @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only used when set --mode execute. " + + "If the instant time is not provided with --mode execute, " + + "the earliest scheduled clustering instant time is used by default. " + + "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required = false) public String clusteringInstantTime = null; @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false) public int parallelism = 1; @@ -153,10 +157,6 @@ private static void validateRunningMode(Config cfg) { if (StringUtils.isNullOrEmpty(cfg.runningMode)) { cfg.runningMode = cfg.runSchedule ? SCHEDULE : EXECUTE; } - - if (cfg.runningMode.equalsIgnoreCase(EXECUTE) && cfg.clusteringInstantTime == null) { - throw new RuntimeException("--instant-time couldn't be null when executing clustering plan."); - } } public int cluster(int retry) { @@ -192,7 +192,6 @@ public int cluster(int retry) { } private String getSchemaFromLatestInstant() throws Exception { - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build(); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) { throw new HoodieException("Cannot run clustering without any completed commits"); @@ -204,6 +203,20 @@ private String getSchemaFromLatestInstant() throws Exception { private int doCluster(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { + if (StringUtils.isNullOrEmpty(cfg.clusteringInstantTime)) { + // Instant time is not specified + // Find the earliest scheduled clustering instant for execution + Option firstClusteringInstant = + metaClient.getActiveTimeline().firstInstant( + HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieInstant.State.REQUESTED); + if (firstClusteringInstant.isPresent()) { + cfg.clusteringInstantTime = firstClusteringInstant.get().getTimestamp(); + LOG.info("Found the earliest scheduled clustering instant which will be executed: " + + cfg.clusteringInstantTime); + } else { + throw new HoodieClusteringException("There is no scheduled clustering in the table."); + } + } Option commitMetadata = client.cluster(cfg.clusteringInstantTime, true).getCommitMetadata(); return handleErrors(commitMetadata.get(), cfg.clusteringInstantTime); @@ -267,7 +280,7 @@ private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { private int handleErrors(HoodieCommitMetadata metadata, String instantTime) { List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> - e.getValue().stream()).collect(Collectors.toList()); + e.getValue().stream()).collect(Collectors.toList()); long errorsCount = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum(); if (errorsCount == 0) { LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime)); @@ -277,5 +290,4 @@ private int handleErrors(HoodieCommitMetadata metadata, String instantTime) { LOG.error(String.format("Import failed with %d errors.", errorsCount)); return -1; } - } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 74a4ea59f39cd..75f55bb26544f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -21,9 +21,15 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieCompactionException; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -41,6 +47,7 @@ public class HoodieCompactor { private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class); + private static ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); private final Config cfg; private transient FileSystem fs; private TypedProperties props; @@ -67,7 +74,7 @@ public static class Config implements Serializable { public String basePath = null; @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true) public String tableName = null; - @Parameter(names = {"--instant-time", "-it"}, description = "Compaction Instant time", required = true) + @Parameter(names = {"--instant-time", "-it"}, description = "Compaction Instant time", required = false) public String compactionInstantTime = null; @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true) public int parallelism = 1; @@ -134,6 +141,21 @@ private int doCompact(JavaSparkContext jsc) throws Exception { String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); + // If no compaction instant is provided by --instant-time, find the earliest scheduled compaction + // instant from the active timeline + if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) { + HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true); + Option firstCompactionInstant = + metaClient.getActiveTimeline().firstInstant( + HoodieTimeline.COMPACTION_ACTION, HoodieInstant.State.REQUESTED); + if (firstCompactionInstant.isPresent()) { + cfg.compactionInstantTime = firstCompactionInstant.get().getTimestamp(); + LOG.info("Found the earliest scheduled compaction instant which will be executed: " + + cfg.compactionInstantTime); + } else { + throw new HoodieCompactionException("There is no scheduled compaction in the table."); + } + } JavaRDD writeResponse = client.compact(cfg.compactionInstantTime); return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse); } @@ -142,6 +164,10 @@ private int doSchedule(JavaSparkContext jsc) throws Exception { // Get schema. SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props); + if (cfg.compactionInstantTime == null) { + throw new IllegalArgumentException("No instant time is provided for scheduling compaction. " + + "Please specify the compaction instant time by using --instant-time."); + } client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Option.empty()); return 0; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 1d74aced5f1d4..8b0778b07c888 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -469,6 +469,15 @@ public Schema getTargetSchema() { }; } + public static HoodieTableMetaClient createMetaClient( + JavaSparkContext jsc, String basePath, boolean shouldLoadActiveTimelineOnLoad) { + return HoodieTableMetaClient.builder() + .setConf(jsc.hadoopConfiguration()) + .setBasePath(basePath) + .setLoadActiveTimelineOnLoad(shouldLoadActiveTimelineOnLoad) + .build(); + } + @FunctionalInterface public interface CheckedSupplier { T get() throws Throwable; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index bcf2d27cb42b3..227623eeb5a1e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -157,11 +157,13 @@ public static void cleanupClass() { } } + @Override @BeforeEach public void setup() throws Exception { super.setup(); } + @Override @AfterEach public void teardown() throws Exception { super.teardown(); @@ -869,18 +871,20 @@ private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePa return config; } - @Test - public void testHoodieAsyncClusteringJob() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception { String tableBasePath = dfsBasePath + "/asyncClustering"; HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true"); - HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, null); deltaStreamerTestRunner(ds, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); Option scheduleClusteringInstantTime = Option.empty(); try { + HoodieClusteringJob scheduleClusteringJob = + initialHoodieClusteringJob(tableBasePath, null, true, null); scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule(); } catch (Exception e) { LOG.warn("Schedule clustering failed", e); @@ -889,7 +893,7 @@ public void testHoodieAsyncClusteringJob() throws Exception { if (scheduleClusteringInstantTime.isPresent()) { LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get()); HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, - scheduleClusteringInstantTime.get(), false); + shouldPassInClusteringInstantTime ? scheduleClusteringInstantTime.get() : null, false); HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig); clusterClusteringJob.cluster(clusterClusteringConfig.retry); LOG.info("Cluster success"); @@ -988,7 +992,7 @@ public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob } @ParameterizedTest - @ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"}) + @ValueSource(strings = {"execute", "schedule", "scheduleAndExecute"}) public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception { String tableBasePath = dfsBasePath + "/asyncClustering2"; HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false"); @@ -1003,7 +1007,9 @@ public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMod LOG.info("Cluster success"); } else { LOG.warn("Import failed"); - return false; + if (!runningMode.toLowerCase().equals(HoodieClusteringJob.EXECUTE)) { + return false; + } } } catch (Exception e) { LOG.warn("ScheduleAndExecute clustering failed", e); @@ -1023,8 +1029,7 @@ public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMod return true; } case HoodieClusteringJob.EXECUTE: { - assertNotNull(exception); - assertEquals(exception.getMessage(), "--instant-time couldn't be null when executing clustering plan."); + TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs); return true; } default: