diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java index ceaabddec7d2e..17bc48f66f0c4 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java @@ -90,6 +90,7 @@ public void init() throws Exception { .withBaseFilesInPartitions(partitionAndFileId); // generate two rollback HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withRollbackUsingMarkers(false) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); try (AbstractHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 73629b4c97b8f..496cd2d057ddc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -120,9 +120,9 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty ROLLBACK_USING_MARKERS_ENABLE = ConfigProperty .key("hoodie.rollback.using.markers") - .defaultValue("false") + .defaultValue("true") .withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated " - + "during the writes. Turned off by default."); + + "during the writes. Turned on by default."); public static final ConfigProperty TIMELINE_LAYOUT_VERSION_NUM = ConfigProperty .key("hoodie.timeline.layout.version") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 71d61938c7aa5..6cbc595c55974 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -45,6 +45,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -180,6 +181,8 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi .withAutoCommit(true) .withAvroSchemaValidate(true) .withEmbeddedTimelineServerEnabled(false) + .withMarkersType(MarkerType.DIRECT.name()) + .withRollbackUsingMarkers(false) .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath())) .withSchema(HoodieMetadataRecord.getClassSchema().toString()) .forTable(tableName) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java index 044b258e8893c..e7d31f3a07b24 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java @@ -18,8 +18,6 @@ package org.apache.hudi.table.marker; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; @@ -45,12 +43,6 @@ public static WriteMarkers get(MarkerType markerType, HoodieTable table, String case DIRECT: return new DirectWriteMarkers(table, instantTime); case TIMELINE_SERVER_BASED: - String basePath = table.getMetaClient().getBasePath(); - if (StorageSchemes.HDFS.getScheme().equals( - FSUtils.getFs(basePath, table.getContext().getHadoopConf().newCopy()).getScheme())) { - throw new HoodieException("Timeline-server-based markers are not supported for HDFS: " - + "base path " + basePath); - } return new TimelineServerBasedWriteMarkers(table, instantTime); default: throw new HoodieException("The marker type \"" + markerType.name() + "\" is not supported."); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 32ac8689687cf..6412113a59768 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -201,6 +201,7 @@ public void testRollbackCommit() throws Exception { .withBaseFilesInPartitions(partitionAndFileId3); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withRollbackUsingMarkers(false) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); @@ -308,6 +309,7 @@ public void testAutoRollbackInflightCommit() throws Exception { // Set Failed Writes rollback to EAGER config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withRollbackUsingMarkers(false) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); final String commitTime5 = "20160506030631"; try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 29bad0d949c33..9bca10892e28c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -316,7 +316,7 @@ public void testCopyOnWriteTable() throws Exception { .setTimelineLayoutVersion(VERSION_1) .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); - HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA); + HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA).withRollbackUsingMarkers(false).build(); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); // Initial inserts with TRIP_EXAMPLE_SCHEMA @@ -507,11 +507,14 @@ private List convertToSchema(List records, String sc } private HoodieWriteConfig getWriteConfig(String schema) { + return getWriteConfigBuilder(schema).build(); + } + + private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) { return getConfigBuilder(schema) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withAvroSchemaValidate(true) - .build(); + .withAvroSchemaValidate(true); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java index db17ceae92af6..43bf36c501f9b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java @@ -278,7 +278,7 @@ public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() throws E final int numRecords = 10; final String oldPartitionPath = "1970/01/01"; final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName(); - HoodieWriteConfig config = getConfig(true, true); + HoodieWriteConfig config = getConfigBuilder(100, true, true).withRollbackUsingMarkers(false).build(); SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { @@ -337,6 +337,7 @@ public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() throws E public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { // Load to memory HoodieWriteConfig config = getConfigBuilder(100, false, false) + .withRollbackUsingMarkers(false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); @@ -383,7 +384,7 @@ public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { @Test public void testSimpleTagLocationWithInvalidCommit() throws Exception { // Load to memory - HoodieWriteConfig config = getConfig(); + HoodieWriteConfig config = getConfigBuilder(100, false, false).withRollbackUsingMarkers(false).build(); SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); @@ -425,6 +426,7 @@ public void testSimpleTagLocationWithInvalidCommit() throws Exception { public void testEnsureTagLocationUsesCommitTimeline() throws Exception { // Load to memory HoodieWriteConfig config = getConfigBuilder(100, false, false) + .withRollbackUsingMarkers(false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java index c741c0266cc71..fa5a3537bb1f8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java @@ -98,10 +98,10 @@ private static Stream indexTypeParams() { private HoodieWriteConfig config; private void setUp(IndexType indexType, boolean populateMetaFields) throws Exception { - setUp(indexType, populateMetaFields, true); + setUp(indexType, populateMetaFields, true, true); } - private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadata) throws Exception { + private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadata, boolean rollbackUsingMarkers) throws Exception { this.indexType = indexType; initPath(); initSparkContexts(); @@ -111,6 +111,7 @@ private void setUp(IndexType indexType, boolean populateMetaFields, boolean enab : getPropertiesForKeyGen()); config = getConfigBuilder() .withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()) + .withRollbackUsingMarkers(rollbackUsingMarkers) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType) .build()).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()).build(); writeClient = getHoodieWriteClient(config); @@ -225,7 +226,7 @@ public void testTagLocationAndDuplicateUpdate(IndexType indexType, boolean popul @ParameterizedTest @MethodSource("indexTypeParams") public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, boolean populateMetaFields) throws Exception { - setUp(indexType, populateMetaFields, true); + setUp(indexType, populateMetaFields, true, false); String newCommitTime = writeClient.startCommit(); int totalRecords = 20 + random.nextInt(20); List records = dataGen.generateInserts(newCommitTime, totalRecords); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 1c6015f70840a..4e98b220f3613 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -162,8 +162,8 @@ public void testFailForCompletedInstants() { public void testRollbackWhenFirstCommitFail() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withRollbackUsingMarkers(false) .withPath(basePath).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { client.startCommitWithTime("001"); client.insert(jsc.emptyRDD(), "001");