From f9992fd7af6193f95ea21636a3506b9c785695a9 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 6 Jan 2021 16:59:59 -0800 Subject: [PATCH] [HUDI-1513] Introduce WriteClient#preWrite() and relocate metadata table syncing - Syncing to metadata table, setting operation type, starting async cleaner done in preWrite() - Fixes an issues where delete() was not starting async cleaner correctly - Fixed tests and enabled metadata table for TestAsyncCompaction --- .../client/AbstractHoodieWriteClient.java | 16 +++++++-- .../HoodieBackedTableMetadataWriter.java | 2 -- .../hudi/client/HoodieFlinkWriteClient.java | 8 ++--- .../hudi/client/HoodieJavaWriteClient.java | 14 +++----- .../hudi/client/HoodieSparkCompactor.java | 2 +- .../hudi/client/SparkRDDWriteClient.java | 28 ++++++--------- .../metadata/TestHoodieBackedMetadata.java | 34 +++++++++---------- .../action/compact/TestAsyncCompaction.java | 7 ++-- 8 files changed, 54 insertions(+), 57 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index e12b3af04078..3bb734241ef3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -96,8 +96,8 @@ public abstract class AbstractHoodieWriteClient createIndex(HoodieWriteConfig writeConfig); @@ -368,6 +367,19 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan */ public abstract O delete(K keys, final String instantTime); + /** + * Common method containing steps to be performed before write (upsert/insert/... + * + * @param instantTime Instant Time + * @param hoodieTable Hoodie Table + * @return Write Status + */ + protected void preWrite(String instantTime, WriteOperationType writeOperationType) { + setOperationType(writeOperationType); + syncTableMetadata(); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + } + /** * Common method containing steps to be performed after write (upsert/insert/..) operations including auto-commit. * @param result Commit Action Result diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 823e70c83b6e..00d8acaa6067 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -36,7 +36,6 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -365,7 +364,6 @@ private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) { LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync); // Read each instant in order and sync it to metadata table - final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline(); for (HoodieInstant instant : instantsToSync) { LOG.info("Syncing instant " + instant + " to metadata table"); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index e7fda7d80392..33fc8f069a0d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -106,8 +106,7 @@ public List upsert(List> records, String instantTim HoodieTable>, List, List> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); - setOperationType(WriteOperationType.UPSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.UPSERT); HoodieWriteMetadata> result = table.upsert(context, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -125,8 +124,7 @@ public List insert(List> records, String instantTim HoodieTable>, List, List> table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime); table.validateUpsertSchema(); - setOperationType(WriteOperationType.INSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.INSERT); HoodieWriteMetadata> result = table.insert(context, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -158,7 +156,7 @@ public List bulkInsertPreppedRecords(List> preppedR public List delete(List keys, String instantTime) { HoodieTable>, List, List> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime); - setOperationType(WriteOperationType.DELETE); + preWrite(instantTime, WriteOperationType.DELETE); HoodieWriteMetadata> result = table.delete(context,instantTime, keys); return postWrite(result, instantTime, table); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 7b10843b0b0d..40bbd079b515 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -97,8 +97,7 @@ public List upsert(List> records, HoodieTable>, List, List> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); - setOperationType(WriteOperationType.UPSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.UPSERT); HoodieWriteMetadata> result = table.upsert(context, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -112,8 +111,7 @@ public List upsertPreppedRecords(List> preppedRecor HoodieTable>, List, List> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); table.validateUpsertSchema(); - setOperationType(WriteOperationType.UPSERT_PREPPED); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.UPSERT_PREPPED); HoodieWriteMetadata> result = table.upsertPrepped(context,instantTime, preppedRecords); return postWrite(result, instantTime, table); } @@ -123,8 +121,7 @@ public List insert(List> records, String instantTim HoodieTable>, List, List> table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime); table.validateUpsertSchema(); - setOperationType(WriteOperationType.INSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.INSERT); HoodieWriteMetadata> result = table.insert(context, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -138,8 +135,7 @@ public List insertPreppedRecords(List> preppedRecor HoodieTable>, List, List> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); table.validateInsertSchema(); - setOperationType(WriteOperationType.INSERT_PREPPED); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.INSERT_PREPPED); HoodieWriteMetadata> result = table.insertPrepped(context,instantTime, preppedRecords); return postWrite(result, instantTime, table); } @@ -169,7 +165,7 @@ public List delete(List keys, String instantTime) { HoodieTable>, List, List> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime); - setOperationType(WriteOperationType.DELETE); + preWrite(instantTime, WriteOperationType.DELETE); HoodieWriteMetadata> result = table.delete(context,instantTime, keys); return postWrite(result, instantTime, table); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java index ab22b412a4e6..e263caecec16 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkCompactor.java @@ -45,7 +45,7 @@ public HoodieSparkCompactor(AbstractHoodieWriteClient @Override public void compact(HoodieInstant instant) throws IOException { LOG.info("Compactor executing compaction " + instant); - SparkRDDWriteClient writeClient = (SparkRDDWriteClient)compactionClient; + SparkRDDWriteClient writeClient = (SparkRDDWriteClient) compactionClient; JavaRDD res = writeClient.compact(instant.getTimestamp()); this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status"); long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 9a22f7894d79..824a53f46f0a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -140,8 +140,7 @@ public JavaRDD upsert(JavaRDD> records, String inst HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); - setOperationType(WriteOperationType.UPSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.UPSERT); HoodieWriteMetadata> result = table.upsert(context, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); @@ -154,8 +153,7 @@ public JavaRDD upsertPreppedRecords(JavaRDD> preppe HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); table.validateUpsertSchema(); - setOperationType(WriteOperationType.UPSERT_PREPPED); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.UPSERT_PREPPED); HoodieWriteMetadata> result = table.upsertPrepped(context,instantTime, preppedRecords); return postWrite(result, instantTime, table); } @@ -165,8 +163,7 @@ public JavaRDD insert(JavaRDD> records, String inst HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.INSERT, instantTime); table.validateInsertSchema(); - setOperationType(WriteOperationType.INSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.INSERT); HoodieWriteMetadata> result = table.insert(context,instantTime, records); return postWrite(result, instantTime, table); } @@ -176,8 +173,7 @@ public JavaRDD insertPreppedRecords(JavaRDD> preppe HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); table.validateInsertSchema(); - setOperationType(WriteOperationType.INSERT_PREPPED); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.INSERT_PREPPED); HoodieWriteMetadata> result = table.insertPrepped(context,instantTime, preppedRecords); return postWrite(result, instantTime, table); } @@ -192,8 +188,7 @@ public JavaRDD insertPreppedRecords(JavaRDD> preppe public HoodieWriteResult insertOverwrite(JavaRDD> records, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime); table.validateInsertSchema(); - setOperationType(WriteOperationType.INSERT_OVERWRITE); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE); HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records); return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); } @@ -209,8 +204,7 @@ public HoodieWriteResult insertOverwrite(JavaRDD> records, final public HoodieWriteResult insertOverwriteTable(JavaRDD> records, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime); table.validateInsertSchema(); - setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE); HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records); return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); } @@ -225,8 +219,7 @@ public JavaRDD bulkInsert(JavaRDD> records, String HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime); table.validateInsertSchema(); - setOperationType(WriteOperationType.BULK_INSERT); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.BULK_INSERT); HoodieWriteMetadata> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner); return postWrite(result, instantTime, table); } @@ -236,8 +229,7 @@ public JavaRDD bulkInsertPreppedRecords(JavaRDD> pr HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime); table.validateInsertSchema(); - setOperationType(WriteOperationType.BULK_INSERT_PREPPED); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED); HoodieWriteMetadata> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner); return postWrite(result, instantTime, table); } @@ -245,14 +237,14 @@ public JavaRDD bulkInsertPreppedRecords(JavaRDD> pr @Override public JavaRDD delete(JavaRDD keys, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime); - setOperationType(WriteOperationType.DELETE); + preWrite(instantTime, WriteOperationType.DELETE); HoodieWriteMetadata> result = table.delete(context,instantTime, keys); return postWrite(result, instantTime, table); } public HoodieWriteResult deletePartitions(List partitions, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime); - setOperationType(WriteOperationType.DELETE_PARTITION); + preWrite(instantTime, WriteOperationType.DELETE_PARTITION); HoodieWriteMetadata> result = table.deletePartitions(context,instantTime, partitions); return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index 34c0a3570740..32cec71feafc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -62,7 +61,6 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.nio.file.Files; @@ -125,9 +123,10 @@ public void testDefaultNoMetadataTable() throws Exception { assertThrows(TableNotFoundException.class, () -> new HoodieTableMetaClient(hadoopConf, metadataTableBasePath)); } - // Metadata table created when enabled by config + // Metadata table created when enabled by config & sync is called try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { client.startCommitWithTime("001"); + client.syncTableMetadata(); assertTrue(fs.exists(new Path(metadataTableBasePath))); validateMetadata(client); } @@ -504,11 +503,13 @@ public void testSync(HoodieTableType tableType) throws Exception { // Enable metadata table and ensure it is synced try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details + client.syncTableMetadata(); client.restoreToInstant(restoreToInstant); assertFalse(metadata(client).isInSync()); newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); + client.syncTableMetadata(); validateMetadata(client); assertTrue(metadata(client).isInSync()); @@ -519,9 +520,8 @@ public void testSync(HoodieTableType tableType) throws Exception { * Instants on Metadata Table should be archived as per config. * Metadata Table should be automatically compacted as per config. */ - @ParameterizedTest - @ValueSource(booleans = {false}) - public void testCleaningArchivingAndCompaction(boolean asyncClean) throws Exception { + @Test + public void testCleaningArchivingAndCompaction() throws Exception { init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); @@ -530,8 +530,9 @@ public void testCleaningArchivingAndCompaction(boolean asyncClean) throws Except .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) .archiveCommitsWith(6, 8).retainCommits(1) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3) - .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(asyncClean).build()) + // don't archive the data timeline at all. + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(Integer.MAX_VALUE - 1, Integer.MAX_VALUE) + .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(true).build()) .build(); List records; @@ -551,17 +552,16 @@ public void testCleaningArchivingAndCompaction(boolean asyncClean) throws Except } HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath); + HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, config.getBasePath()); HoodieActiveTimeline metadataTimeline = metadataMetaClient.getActiveTimeline(); - // check that there are 2 compactions. - assertEquals(2, metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants()); - // check that cleaning has, once after each compaction. There will be more instances on the timeline, since it's less aggressively archived - assertEquals(4, metadataTimeline.getCleanerTimeline().filterCompletedInstants().countInstants()); + // check that there are compactions. + assertTrue(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants() > 0); + // check that cleaning has, once after each compaction. + assertTrue(metadataTimeline.getCleanerTimeline().filterCompletedInstants().countInstants() > 0); // ensure archiving has happened - List instants = metadataTimeline.getCommitsAndCompactionTimeline() - .getInstants().collect(Collectors.toList()); - Collections.reverse(instants); - long numDeltaCommits = instants.stream().filter(instant -> instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)).count(); - assertEquals(5, numDeltaCommits); + long numDataCompletedInstants = datasetMetaClient.getActiveTimeline().filterCompletedInstants().countInstants(); + long numDeltaCommits = metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(); + assertTrue(numDeltaCommits < numDataCompletedInstants, "Must have less delta commits than total completed instants on data timeline."); } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index fd6bd839cf8f..08f9283dde02 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -50,7 +51,9 @@ public class TestAsyncCompaction extends CompactionTestBase { private HoodieWriteConfig getConfig(Boolean autoCommit) { - return getConfigBuilder(autoCommit).build(); + return getConfigBuilder(autoCommit) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).validate(true).build()) + .build(); } @Test @@ -85,8 +88,6 @@ public void testRollbackForInflightCompaction() throws Exception { // Reload and rollback inflight compaction metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - // hoodieTable.rollback(jsc, - // new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false); client.rollbackInflightCompaction( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable);