Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
instantToRollback,
true,
true,
false,
false);
return rollbackActionExecutor.execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
instantToRollback,
true,
true,
false,
false);

// TODO : Get file status and create a rollback stat and file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
protected final HoodieInstant instantToRollback;
protected final boolean deleteInstants;
protected final boolean skipTimelinePublish;
protected final boolean useMarkerBasedStrategy;
private final TransactionManager txnManager;
private final boolean skipLocking;

Expand All @@ -70,8 +69,7 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipLocking) {
this(context, config, table, instantTime, instantToRollback, deleteInstants,
false, config.shouldRollbackUsingMarkers(), skipLocking);
this(context, config, table, instantTime, instantToRollback, deleteInstants, false, skipLocking);
}

public BaseRollbackActionExecutor(HoodieEngineContext context,
Expand All @@ -81,18 +79,12 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.resolvedInstant = instantToRollback;
this.deleteInstants = deleteInstants;
this.skipTimelinePublish = skipTimelinePublish;
this.useMarkerBasedStrategy = useMarkerBasedStrategy;
if (useMarkerBasedStrategy) {
ValidationUtils.checkArgument(!instantToRollback.isCompleted(),
"Cannot use marker based rollback strategy on completed instant:" + instantToRollback);
}
this.skipLocking = skipLocking;
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public BaseRollbackPlanActionExecutor(HoodieEngineContext context,
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.skipTimelinePublish = skipTimelinePublish;
this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers;
this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers && !instantToRollback.isCompleted();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, skipLocking);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, skipLocking);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,59 @@ public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, b
}
}
}

@Test
public void testFallbackToListingBasedRollbackForCompletedInstant() throws Exception {
// Let's create some commit files and base files
final String p1 = "2016/05/01";
final String p2 = "2016/05/02";
final String p3 = "2016/05/06";
final String commitTime1 = "20160501010101";
final String commitTime2 = "20160502020601";
final String commitTime3 = "20160506030611";
Map<String, String> partitionAndFileId1 = new HashMap<String, String>() {
{
put(p1, "id11");
put(p2, "id12");
put(p3, "id13");
}
};
Map<String, String> partitionAndFileId2 = new HashMap<String, String>() {
{
put(p1, "id21");
put(p2, "id22");
put(p3, "id23");
}
};
Map<String, String> partitionAndFileId3 = new HashMap<String, String>() {
{
put(p1, "id31");
put(p2, "id32");
put(p3, "id33");
}
};

HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(true) // rollback using markers to test fallback to listing based rollback for completed instant
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();

// create test table with all commits completed
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf(), config, context));
testTable.withPartitionMetaFiles(p1, p2, p3)
.addCommit(commitTime1)
.withBaseFilesInPartitions(partitionAndFileId1)
.addCommit(commitTime2)
.withBaseFilesInPartitions(partitionAndFileId2)
.addCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);

try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
client.rollback(commitTime3);
assertFalse(testTable.inflightCommitExists(commitTime3));
assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,18 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
Expand Down Expand Up @@ -2297,20 +2296,9 @@ private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollb
"With optimistic CG, first commit should succeed. commit file should be present");
// Marker directory must be removed after rollback
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
if (rollbackUsingMarkers) {
// rollback of a completed commit should fail if marked based rollback is used.
try {
client.rollback(instantTime);
fail("Rollback of completed commit should throw exception");
} catch (HoodieRollbackException e) {
// ignore
}
} else {
// rollback of a completed commit should succeed if using list based rollback
client.rollback(instantTime);
assertFalse(testTable.commitExists(instantTime),
"After explicit rollback, commit file should not be present");
}
client.rollback(instantTime);
assertFalse(testTable.commitExists(instantTime),
"After explicit rollback, commit file should not be present");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;

import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void tearDown() throws Exception {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@ValueSource(booleans = {true})
public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
//1. prepare data and assert data result
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
Expand Down Expand Up @@ -281,21 +281,6 @@ public void testRollbackForCanIndexLogFile() throws IOException {
assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size());
}

@Test
public void testFailForCompletedInstants() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
new MergeOnReadRollbackActionExecutor(context, getConfigBuilder().build(),
getHoodieTable(metaClient, getConfigBuilder().build()),
"003",
rollBackInstant,
true,
true,
true,
false).execute();
});
}

/**
* Test Cases for rolling back when there is no base file.
*/
Expand Down