Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void init() throws Exception {
.withBaseFilesInPartitions(partitionAndFileId);
// generate two rollback
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();

try (AbstractHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<String> ROLLBACK_USING_MARKERS_ENABLE = ConfigProperty
.key("hoodie.rollback.using.markers")
.defaultValue("false")
.defaultValue("true")
.withDocumentation("Enables a more efficient mechanism for rollbacks based on the marker files generated "
+ "during the writes. Turned off by default.");
+ "during the writes. Turned on by default.");

public static final ConfigProperty<String> TIMELINE_LAYOUT_VERSION_NUM = ConfigProperty
.key("hoodie.timeline.layout.version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -180,6 +181,8 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.withAutoCommit(true)
.withAvroSchemaValidate(true)
.withEmbeddedTimelineServerEnabled(false)
.withMarkersType(MarkerType.DIRECT.name())
.withRollbackUsingMarkers(false)
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
.forTable(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.table.marker;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -45,12 +43,6 @@ public static WriteMarkers get(MarkerType markerType, HoodieTable table, String
case DIRECT:
return new DirectWriteMarkers(table, instantTime);
case TIMELINE_SERVER_BASED:
String basePath = table.getMetaClient().getBasePath();
if (StorageSchemes.HDFS.getScheme().equals(
FSUtils.getFs(basePath, table.getContext().getHadoopConf().newCopy()).getScheme())) {
throw new HoodieException("Timeline-server-based markers are not supported for HDFS: "
+ "base path " + basePath);
}
return new TimelineServerBasedWriteMarkers(table, instantTime);
default:
throw new HoodieException("The marker type \"" + markerType.name() + "\" is not supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public void testRollbackCommit() throws Exception {
.withBaseFilesInPartitions(partitionAndFileId3);

HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
Expand Down Expand Up @@ -308,6 +309,7 @@ public void testAutoRollbackInflightCommit() throws Exception {

// Set Failed Writes rollback to EAGER
config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
final String commitTime5 = "20160506030631";
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void testCopyOnWriteTable() throws Exception {
.setTimelineLayoutVersion(VERSION_1)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());

HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA).withRollbackUsingMarkers(false).build();
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);

// Initial inserts with TRIP_EXAMPLE_SCHEMA
Expand Down Expand Up @@ -507,11 +507,14 @@ private List<HoodieRecord> convertToSchema(List<HoodieRecord> records, String sc
}

private HoodieWriteConfig getWriteConfig(String schema) {
return getWriteConfigBuilder(schema).build();
}

private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) {
return getConfigBuilder(schema)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withAvroSchemaValidate(true)
.build();
.withAvroSchemaValidate(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() throws E
final int numRecords = 10;
final String oldPartitionPath = "1970/01/01";
final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName();
HoodieWriteConfig config = getConfig(true, true);
HoodieWriteConfig config = getConfigBuilder(100, true, true).withRollbackUsingMarkers(false).build();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);

try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
Expand Down Expand Up @@ -337,6 +337,7 @@ public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() throws E
public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
// Load to memory
HoodieWriteConfig config = getConfigBuilder(100, false, false)
.withRollbackUsingMarkers(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
Expand Down Expand Up @@ -383,7 +384,7 @@ public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
@Test
public void testSimpleTagLocationWithInvalidCommit() throws Exception {
// Load to memory
HoodieWriteConfig config = getConfig();
HoodieWriteConfig config = getConfigBuilder(100, false, false).withRollbackUsingMarkers(false).build();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);

Expand Down Expand Up @@ -425,6 +426,7 @@ public void testSimpleTagLocationWithInvalidCommit() throws Exception {
public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
// Load to memory
HoodieWriteConfig config = getConfigBuilder(100, false, false)
.withRollbackUsingMarkers(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ private static Stream<Arguments> indexTypeParams() {
private HoodieWriteConfig config;

private void setUp(IndexType indexType, boolean populateMetaFields) throws Exception {
setUp(indexType, populateMetaFields, true);
setUp(indexType, populateMetaFields, true, true);
}

private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadata) throws Exception {
private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadata, boolean rollbackUsingMarkers) throws Exception {
this.indexType = indexType;
initPath();
initSparkContexts();
Expand All @@ -111,6 +111,7 @@ private void setUp(IndexType indexType, boolean populateMetaFields, boolean enab
: getPropertiesForKeyGen());
config = getConfigBuilder()
.withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen())
.withRollbackUsingMarkers(rollbackUsingMarkers)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
.build()).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()).build();
writeClient = getHoodieWriteClient(config);
Expand Down Expand Up @@ -225,7 +226,7 @@ public void testTagLocationAndDuplicateUpdate(IndexType indexType, boolean popul
@ParameterizedTest
@MethodSource("indexTypeParams")
public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, boolean populateMetaFields) throws Exception {
setUp(indexType, populateMetaFields, true);
setUp(indexType, populateMetaFields, true, false);
String newCommitTime = writeClient.startCommit();
int totalRecords = 20 + random.nextInt(20);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ public void testFailForCompletedInstants() {
public void testRollbackWhenFirstCommitFail() throws Exception {

HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withRollbackUsingMarkers(false)
.withPath(basePath).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();

try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
client.startCommitWithTime("001");
client.insert(jsc.emptyRDD(), "001");
Expand Down