Skip to content

Commit f0585fa

Browse files
authored
[HUDI-2474] Refreshing timeline for every operation in Hudi when metadata is enabled (apache#3698)
1 parent 9067657 commit f0585fa

File tree

7 files changed

+52
-21
lines changed

7 files changed

+52
-21
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java

+17-10
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,11 @@ protected void commit(HoodieTable table, String commitActionType, String instant
220220
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
221221
}
222222

223-
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);
223+
protected HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
224+
return createTable(config, hadoopConf, false);
225+
}
226+
227+
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
224228

225229
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
226230
try {
@@ -272,7 +276,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
272276
*/
273277
public void rollbackFailedBootstrap() {
274278
LOG.info("Rolling back pending bootstrap if present");
275-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
279+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
276280
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
277281
Option<String> instant = Option.fromJavaOptional(
278282
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
@@ -451,6 +455,9 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
451455

452456
protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
453457
if (config.inlineTableServices()) {
458+
if (config.isMetadataTableEnabled()) {
459+
table.getHoodieView().sync();
460+
}
454461
// Do an inline compaction if enabled
455462
if (config.inlineCompactionEnabled()) {
456463
runAnyPendingCompactions(table);
@@ -515,7 +522,7 @@ protected void autoCleanOnCommit() {
515522
* @param comment - Comment for the savepoint
516523
*/
517524
public void savepoint(String user, String comment) {
518-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
525+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
519526
if (table.getCompletedCommitsTimeline().empty()) {
520527
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
521528
}
@@ -539,7 +546,7 @@ public void savepoint(String user, String comment) {
539546
* @param comment - Comment for the savepoint
540547
*/
541548
public void savepoint(String instantTime, String user, String comment) {
542-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
549+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
543550
table.savepoint(context, instantTime, user, comment);
544551
}
545552

@@ -551,7 +558,7 @@ public void savepoint(String instantTime, String user, String comment) {
551558
* @return true if the savepoint was deleted successfully
552559
*/
553560
public void deleteSavepoint(String savepointTime) {
554-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
561+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
555562
SavepointHelpers.deleteSavepoint(table, savepointTime);
556563
}
557564

@@ -566,7 +573,7 @@ public void deleteSavepoint(String savepointTime) {
566573
* @return true if the savepoint was restored to successfully
567574
*/
568575
public void restoreToSavepoint(String savepointTime) {
569-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
576+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
570577
SavepointHelpers.validateSavepointPresence(table, savepointTime);
571578
restoreToInstant(savepointTime);
572579
SavepointHelpers.validateSavepointRestore(table, savepointTime);
@@ -624,7 +631,7 @@ public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws H
624631
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
625632
Timer.Context timerContext = metrics.getRollbackCtx();
626633
try {
627-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
634+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
628635
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
629636
if (timerContext != null) {
630637
final long durationInMs = metrics.getDurationInMs(timerContext.stop());
@@ -957,17 +964,17 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
957964
switch (tableServiceType) {
958965
case CLUSTER:
959966
LOG.info("Scheduling clustering at instant time :" + instantTime);
960-
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
967+
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
961968
.scheduleClustering(context, instantTime, extraMetadata);
962969
return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
963970
case COMPACT:
964971
LOG.info("Scheduling compaction at instant time :" + instantTime);
965-
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
972+
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
966973
.scheduleCompaction(context, instantTime, extraMetadata);
967974
return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
968975
case CLEAN:
969976
LOG.info("Scheduling cleaning at instant time :" + instantTime);
970-
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
977+
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
971978
.scheduleCleaning(context, instantTime, extraMetadata);
972979
return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
973980
default:

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Optio
121121
}
122122

123123
@Override
124-
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
124+
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration hadoopConf,
125+
boolean refreshTimeline) {
125126
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
126127
}
127128

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ public boolean commit(String instantTime,
8989

9090
@Override
9191
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config,
92-
Configuration hadoopConf) {
92+
Configuration hadoopConf,
93+
boolean refreshTimeline) {
9394
return HoodieJavaTable.create(config, context);
9495
}
9596

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,9 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Op
126126

127127
@Override
128128
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> createTable(HoodieWriteConfig config,
129-
Configuration hadoopConf) {
130-
return HoodieSparkTable.create(config, context);
129+
Configuration hadoopConf,
130+
boolean refreshTimeline) {
131+
return HoodieSparkTable.create(config, context, refreshTimeline);
131132
}
132133

133134
@Override
@@ -319,7 +320,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
319320

320321
@Override
321322
protected JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
322-
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
323+
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled());
323324
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
324325
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
325326
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
@@ -338,7 +339,7 @@ protected JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean sho
338339

339340
@Override
340341
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
341-
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
342+
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled());
342343
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
343344
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
344345
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
@@ -438,7 +439,7 @@ private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<Wri
438439
setWriteSchemaForDeletes(metaClient);
439440
}
440441
// Create a Hoodie table which encapsulated the commits and files visible
441-
HoodieSparkTable<T> table = HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
442+
HoodieSparkTable<T> table = HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled());
442443
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
443444
writeTimer = metrics.getCommitCtx();
444445
} else {

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java

+22-3
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,43 @@ protected HoodieSparkTable(HoodieWriteConfig config, HoodieEngineContext context
4242
}
4343

4444
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
45+
return create(config, context, false);
46+
}
47+
48+
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context,
49+
boolean refreshTimeline) {
4550
HoodieTableMetaClient metaClient =
4651
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
4752
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
4853
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
49-
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
54+
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
5055
}
5156

5257
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
5358
HoodieSparkEngineContext context,
5459
HoodieTableMetaClient metaClient) {
60+
return create(config, context, metaClient, false);
61+
}
62+
63+
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
64+
HoodieSparkEngineContext context,
65+
HoodieTableMetaClient metaClient,
66+
boolean refreshTimeline) {
67+
HoodieSparkTable hoodieSparkTable;
5568
switch (metaClient.getTableType()) {
5669
case COPY_ON_WRITE:
57-
return new HoodieSparkCopyOnWriteTable<>(config, context, metaClient);
70+
hoodieSparkTable = new HoodieSparkCopyOnWriteTable<>(config, context, metaClient);
71+
break;
5872
case MERGE_ON_READ:
59-
return new HoodieSparkMergeOnReadTable<>(config, context, metaClient);
73+
hoodieSparkTable = new HoodieSparkMergeOnReadTable<>(config, context, metaClient);
74+
break;
6075
default:
6176
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
6277
}
78+
if (refreshTimeline) {
79+
hoodieSparkTable.getHoodieView().sync();
80+
}
81+
return hoodieSparkTable;
6382
}
6483

6584
@Override

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java

+2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.spark.api.java.JavaRDD;
7272
import org.junit.jupiter.api.AfterEach;
7373
import org.junit.jupiter.api.Assertions;
74+
import org.junit.jupiter.api.Disabled;
7475
import org.junit.jupiter.api.Tag;
7576
import org.junit.jupiter.params.ParameterizedTest;
7677
import org.junit.jupiter.params.provider.Arguments;
@@ -547,6 +548,7 @@ public void testSync(HoodieTableType tableType) throws Exception {
547548
*/
548549
@ParameterizedTest
549550
@EnumSource(HoodieTableType.class)
551+
@Disabled
550552
public void testCleaningArchivingAndCompaction(HoodieTableType tableType) throws Exception {
551553
init(tableType);
552554
doWriteOperationsAndBootstrapMetadata(testTable);

hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ public HoodieTimeline getTimeline() {
479479

480480
@Override
481481
public void sync() {
482-
// noop
482+
refresh();
483483
}
484484

485485
@Override

0 commit comments

Comments
 (0)