Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ public Option<HoodieInstant> firstInstant() {
return Option.fromJavaOptional(instants.stream().findFirst());
}

@Override
public Option<HoodieInstant> firstInstant(String action, State state) {
return Option.fromJavaOptional(instants.stream()
.filter(s -> action.equals(s.getAction()) && state.equals(s.getState())).findFirst());
}

@Override
public Option<HoodieInstant> nthInstant(int n) {
if (empty() || n >= countInstants()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ public interface HoodieTimeline extends Serializable {
*/
Option<HoodieInstant> firstInstant();

/**
* @param action Instant action String.
* @param state Instant State.
* @return first instant of a specific action and state if available
*/
Option<HoodieInstant> firstInstant(String action, State state);

/**
* @return nth completed instant from the first completed instant
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;

import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -176,6 +177,15 @@ public void testTimelineOperations() {
assertFalse(timeline.empty());
assertFalse(timeline.getCommitTimeline().filterPendingExcludingCompaction().empty());
assertEquals(12, timeline.countInstants());
assertEquals("01", timeline.firstInstant(
HoodieTimeline.COMMIT_ACTION, State.COMPLETED).get().getTimestamp());
assertEquals("21", timeline.firstInstant(
HoodieTimeline.COMMIT_ACTION, State.INFLIGHT).get().getTimestamp());
assertFalse(timeline.firstInstant(
HoodieTimeline.COMMIT_ACTION, State.REQUESTED).isPresent());
assertFalse(timeline.firstInstant(
HoodieTimeline.REPLACE_COMMIT_ACTION, State.COMPLETED).isPresent());

HoodieTimeline activeCommitTimeline = timeline.getCommitTimeline().filterCompletedInstants();
assertEquals(10, activeCommitTimeline.countInstants());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

package org.apache.hudi.utilities;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -36,10 +31,15 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;

import org.apache.hudi.table.HoodieSparkTable;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -53,21 +53,23 @@

public class HoodieClusteringJob {

public static final String EXECUTE = "execute";
public static final String SCHEDULE = "schedule";
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
private static final Logger LOG = LogManager.getLogger(HoodieClusteringJob.class);
private final Config cfg;
private transient FileSystem fs;
private TypedProperties props;
private final JavaSparkContext jsc;
public static final String EXECUTE = "execute";
public static final String SCHEDULE = "schedule";
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
private final HoodieTableMetaClient metaClient;

public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
this.cfg = cfg;
this.jsc = jsc;
this.props = cfg.propsFilePath == null
? UtilHelpers.buildProperties(cfg.configs)
: readConfigFromFileSystem(jsc, cfg);
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
}

private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
Expand All @@ -83,8 +85,10 @@ public static class Config implements Serializable {
public String basePath = null;
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
public String tableName = null;
@Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only need when set --mode execute. "
+ "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required = false)
@Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only used when set --mode execute. "
+ "If the instant time is not provided with --mode execute, "
+ "the earliest scheduled clustering instant time is used by default. "
+ "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required = false)
public String clusteringInstantTime = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)
public int parallelism = 1;
Expand Down Expand Up @@ -153,10 +157,6 @@ private static void validateRunningMode(Config cfg) {
if (StringUtils.isNullOrEmpty(cfg.runningMode)) {
cfg.runningMode = cfg.runSchedule ? SCHEDULE : EXECUTE;
}

if (cfg.runningMode.equalsIgnoreCase(EXECUTE) && cfg.clusteringInstantTime == null) {
throw new RuntimeException("--instant-time couldn't be null when executing clustering plan.");
}
}

public int cluster(int retry) {
Expand Down Expand Up @@ -192,7 +192,6 @@ public int cluster(int retry) {
}

private String getSchemaFromLatestInstant() throws Exception {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
throw new HoodieException("Cannot run clustering without any completed commits");
Expand All @@ -204,6 +203,20 @@ private String getSchemaFromLatestInstant() throws Exception {
private int doCluster(JavaSparkContext jsc) throws Exception {
String schemaStr = getSchemaFromLatestInstant();
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
if (StringUtils.isNullOrEmpty(cfg.clusteringInstantTime)) {
// Instant time is not specified
// Find the earliest scheduled clustering instant for execution
Option<HoodieInstant> firstClusteringInstant =
metaClient.getActiveTimeline().firstInstant(
HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieInstant.State.REQUESTED);
if (firstClusteringInstant.isPresent()) {
cfg.clusteringInstantTime = firstClusteringInstant.get().getTimestamp();
LOG.info("Found the earliest scheduled clustering instant which will be executed: "
+ cfg.clusteringInstantTime);
} else {
throw new HoodieClusteringException("There is no scheduled clustering in the table.");
}
}
Option<HoodieCommitMetadata> commitMetadata = client.cluster(cfg.clusteringInstantTime, true).getCommitMetadata();

return handleErrors(commitMetadata.get(), cfg.clusteringInstantTime);
Expand Down Expand Up @@ -267,7 +280,7 @@ private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {

private int handleErrors(HoodieCommitMetadata metadata, String instantTime) {
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());
e.getValue().stream()).collect(Collectors.toList());
long errorsCount = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
if (errorsCount == 0) {
LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime));
Expand All @@ -277,5 +290,4 @@ private int handleErrors(HoodieCommitMetadata metadata, String instantTime) {
LOG.error(String.format("Import failed with %d errors.", errorsCount));
return -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieCompactionException;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
Expand All @@ -41,6 +47,7 @@
public class HoodieCompactor {

private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
private static ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
private final Config cfg;
private transient FileSystem fs;
private TypedProperties props;
Expand All @@ -67,7 +74,7 @@ public static class Config implements Serializable {
public String basePath = null;
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
public String tableName = null;
@Parameter(names = {"--instant-time", "-it"}, description = "Compaction Instant time", required = true)
@Parameter(names = {"--instant-time", "-it"}, description = "Compaction Instant time", required = false)
public String compactionInstantTime = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
public int parallelism = 1;
Expand Down Expand Up @@ -134,6 +141,21 @@ private int doCompact(JavaSparkContext jsc) throws Exception {
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
// If no compaction instant is provided by --instant-time, find the earliest scheduled compaction
// instant from the active timeline
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
Option<HoodieInstant> firstCompactionInstant =
metaClient.getActiveTimeline().firstInstant(
HoodieTimeline.COMPACTION_ACTION, HoodieInstant.State.REQUESTED);
if (firstCompactionInstant.isPresent()) {
cfg.compactionInstantTime = firstCompactionInstant.get().getTimestamp();
LOG.info("Found the earliest scheduled compaction instant which will be executed: "
+ cfg.compactionInstantTime);
} else {
throw new HoodieCompactionException("There is no scheduled compaction in the table.");
}
}
JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
}
Expand All @@ -142,6 +164,10 @@ private int doSchedule(JavaSparkContext jsc) throws Exception {
// Get schema.
SparkRDDWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props);
if (cfg.compactionInstantTime == null) {
throw new IllegalArgumentException("No instant time is provided for scheduling compaction. "
+ "Please specify the compaction instant time by using --instant-time.");
}
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Option.empty());
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,15 @@ public Schema getTargetSchema() {
};
}

public static HoodieTableMetaClient createMetaClient(
JavaSparkContext jsc, String basePath, boolean shouldLoadActiveTimelineOnLoad) {
return HoodieTableMetaClient.builder()
.setConf(jsc.hadoopConfiguration())
.setBasePath(basePath)
.setLoadActiveTimelineOnLoad(shouldLoadActiveTimelineOnLoad)
.build();
}

@FunctionalInterface
public interface CheckedSupplier<T> {
T get() throws Throwable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ public static void cleanupClass() {
}
}

@Override
@BeforeEach
public void setup() throws Exception {
super.setup();
}

@Override
@AfterEach
public void teardown() throws Exception {
super.teardown();
Expand Down Expand Up @@ -869,18 +871,20 @@ private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePa
return config;
}

@Test
public void testHoodieAsyncClusteringJob() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering";

HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true");
HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, null);

deltaStreamerTestRunner(ds, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);

Option<String> scheduleClusteringInstantTime = Option.empty();
try {
HoodieClusteringJob scheduleClusteringJob =
initialHoodieClusteringJob(tableBasePath, null, true, null);
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
} catch (Exception e) {
LOG.warn("Schedule clustering failed", e);
Expand All @@ -889,7 +893,7 @@ public void testHoodieAsyncClusteringJob() throws Exception {
if (scheduleClusteringInstantTime.isPresent()) {
LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get());
HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
scheduleClusteringInstantTime.get(), false);
shouldPassInClusteringInstantTime ? scheduleClusteringInstantTime.get() : null, false);
HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
LOG.info("Cluster success");
Expand Down Expand Up @@ -988,7 +992,7 @@ public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob
}

@ParameterizedTest
@ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"})
@ValueSource(strings = {"execute", "schedule", "scheduleAndExecute"})
public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering2";
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false");
Expand All @@ -1003,7 +1007,9 @@ public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMod
LOG.info("Cluster success");
} else {
LOG.warn("Import failed");
return false;
if (!runningMode.toLowerCase().equals(HoodieClusteringJob.EXECUTE)) {
return false;
}
}
} catch (Exception e) {
LOG.warn("ScheduleAndExecute clustering failed", e);
Expand All @@ -1023,8 +1029,7 @@ public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMod
return true;
}
case HoodieClusteringJob.EXECUTE: {
assertNotNull(exception);
assertEquals(exception.getMessage(), "--instant-time couldn't be null when executing clustering plan.");
TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs);
return true;
}
default:
Expand Down