Skip to content

Commit 369dc27

Browse files
committed
[HUDI-1399] support a independent clustering spark job to asynchronously clustering
1 parent 9dfdb71 commit 369dc27

File tree

6 files changed

+63
-21
lines changed

6 files changed

+63
-21
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,12 +711,30 @@ public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTabl
711711
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
712712
}
713713

714+
/**
715+
* Get inflight time line exclude compaction and clustering.
716+
* @param table
717+
* @return
718+
*/
719+
private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTable<T, I, K, O> table) {
720+
HoodieTimeline inflightTimelineWithReplaceCommit = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
721+
HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> {
722+
if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
723+
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
724+
return !instantPlan.isPresent();
725+
} else {
726+
return true;
727+
}
728+
});
729+
return inflightTimelineExcludeClusteringCommit;
730+
}
731+
714732
/**
715733
* Cleanup all pending commits.
716734
*/
717735
private void rollbackPendingCommits() {
718736
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
719-
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
737+
HoodieTimeline inflightTimeline = getInflightTimelineExcludeCompactionAndClustering(table);
720738
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
721739
.collect(Collectors.toList());
722740
for (String commit : commits) {

hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,18 @@ private Schema getTableAvroSchemaFromDataFile() throws Exception {
152152
* @throws Exception
153153
*/
154154
public Schema getTableAvroSchema() throws Exception {
155-
Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true);
155+
return getTableAvroSchema(true);
156+
}
157+
158+
/**
159+
* Gets schema for a hoodie table in Avro format, can choice if include metadata fields.
160+
*
161+
* @param includeMetadataFields choice if include metadata fields
162+
* @return Avro schema for this table
163+
* @throws Exception
164+
*/
165+
public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception {
166+
Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(includeMetadataFields);
156167
return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : getTableAvroSchemaFromDataFile();
157168
}
158169

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public interface HoodieTimeline extends Serializable {
109109
/**
110110
* Filter this timeline to just include the in-flights excluding compaction instants.
111111
*
112-
* @return New instance of HoodieTimeline with just in-flights excluding compaction inflights
112+
* @return New instance of HoodieTimeline with just in-flights excluding compaction instants
113113
*/
114114
HoodieTimeline filterPendingExcludingCompaction();
115115

hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.hudi.client.WriteStatus;
2828
import org.apache.hudi.common.config.TypedProperties;
2929
import org.apache.hudi.common.fs.FSUtils;
30+
import org.apache.hudi.common.table.HoodieTableMetaClient;
31+
import org.apache.hudi.common.table.TableSchemaResolver;
3032
import org.apache.hudi.common.util.Option;
3133
import org.apache.log4j.LogManager;
3234
import org.apache.log4j.Logger;
@@ -70,8 +72,6 @@ public static class Config implements Serializable {
7072
public String clusteringInstantTime = null;
7173
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)
7274
public int parallelism = 1;
73-
@Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true)
74-
public String schemaFile = null;
7575
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
7676
public String sparkMaster = null;
7777
@Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
@@ -120,19 +120,28 @@ public int cluster(int retry) {
120120
try {
121121
do {
122122
if (cfg.runSchedule) {
123+
LOG.info("Do schedule");
123124
ret = doSchedule(jsc);
124125
} else {
126+
LOG.info("Do cluster");
125127
ret = doCluster(jsc);
126128
}
127129
} while (ret != 0 && retry-- > 0);
128130
} catch (Throwable t) {
129-
LOG.error(t);
131+
LOG.error("Cluster failed", t);
130132
}
131133
return ret;
132134
}
133135

136+
private String getSchemaFromLatestInstant() throws Exception {
137+
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath, true);
138+
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
139+
Schema schema = schemaUtil.getTableAvroSchema(false);
140+
return schema.toString();
141+
}
142+
134143
private int doCluster(JavaSparkContext jsc) throws Exception {
135-
String schemaStr = new Schema.Parser().parse(fs.open(new Path(cfg.schemaFile))).toString();
144+
String schemaStr = getSchemaFromLatestInstant();
136145
SparkRDDWriteClient client =
137146
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
138147
JavaRDD<WriteStatus> writeResponse =
@@ -141,7 +150,7 @@ private int doCluster(JavaSparkContext jsc) throws Exception {
141150
}
142151

143152
private int doSchedule(JavaSparkContext jsc) throws Exception {
144-
String schemaStr = new Schema.Parser().parse(fs.open(new Path(cfg.schemaFile))).toString();
153+
String schemaStr = getSchemaFromLatestInstant();
145154
SparkRDDWriteClient client =
146155
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
147156
return client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()) ? 0 : -1;

hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
296296
// Retrieve the previous round checkpoints, if any
297297
Option<String> resumeCheckpointStr = Option.empty();
298298
if (commitTimelineOpt.isPresent()) {
299-
// TODO: now not support replace action
299+
// TODO: now not support replace action HUDI-1500
300300
Option<HoodieInstant> lastCommit = commitTimelineOpt.get()
301301
.filter(instant -> !instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant();
302302
if (lastCommit.isPresent()) {

hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import java.util.ArrayList;
8989
import java.util.Arrays;
9090
import java.util.Collections;
91+
import java.util.Date;
9192
import java.util.List;
9293
import java.util.Properties;
9394
import java.util.Random;
@@ -640,8 +641,8 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir
640641
cfg.tableType = tableType.name();
641642
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
642643
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
643-
644-
deltaStreamerTestRunner(cfg, (r) -> {
644+
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
645+
deltaStreamerTestRunner(ds, cfg, (r) -> {
645646
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
646647
TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs);
647648
TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
@@ -654,8 +655,7 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir
654655
});
655656
}
656657

657-
private void deltaStreamerTestRunner(HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
658-
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
658+
private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
659659
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
660660
try {
661661
ds.sync();
@@ -683,8 +683,8 @@ public void testInlineClustering() throws Exception {
683683
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
684684
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true"));
685685
cfg.configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2"));
686-
687-
deltaStreamerTestRunner(cfg, (r) -> {
686+
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
687+
deltaStreamerTestRunner(ds, cfg, (r) -> {
688688
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
689689
int pendingReplaceSize = metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
690690
int completeReplaceSize = metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
@@ -698,10 +698,8 @@ private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePa
698698
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
699699
config.basePath = basePath;
700700
config.clusteringInstantTime = clusteringInstantTime;
701-
config.schemaFile = dfsBasePath + "/target.avsc";
702701
config.runSchedule = runSchedule;
703702
config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
704-
705703
return config;
706704
}
707705

@@ -717,19 +715,25 @@ public void testHoodieAsyncClusteringJob() throws Exception {
717715
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
718716
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
719717
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
720-
721-
deltaStreamerTestRunner(cfg, (r) -> {
722-
TestHelpers.assertAtLeastNCommits(3, tableBasePath, dfs);
723-
String clusterInstantTime = HoodieActiveTimeline.createNewInstantTime();
718+
cfg.configs.add(String.format("%s=true", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
719+
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
720+
deltaStreamerTestRunner(ds, cfg, (r) -> {
721+
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
722+
// for not confiict with delta streamer commit, just add 3600s
723+
String clusterInstantTime = HoodieActiveTimeline.COMMIT_FORMATTER
724+
.format(new Date(System.currentTimeMillis() + 3600 * 1000));
725+
LOG.info("Cluster instant time " + clusterInstantTime);
724726
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
725727
clusterInstantTime, true);
726728
HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig);
727729
int scheduleClusteringResult = scheduleClusteringJob.cluster(scheduleClusteringConfig.retry);
728730
if (scheduleClusteringResult == 0) {
731+
LOG.info("Schedule clustering success, now cluster");
729732
HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
730733
clusterInstantTime, false);
731734
HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
732735
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
736+
LOG.info("Cluster success");
733737
} else {
734738
LOG.warn("Schedule clustering failed");
735739
}

0 commit comments

Comments
 (0)