Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
List<ListingBasedRollbackRequest> rollbackRequests = null;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
table.getMetaClient().getBasePath(), config);
table.getMetaClient().getBasePath());
} else {
rollbackRequests = RollbackUtils
.generateRollbackRequestsUsingFileListingMOR(instantToRollback, table, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.hudi.table.action.rollback;

import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -42,6 +38,10 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -56,7 +56,8 @@ public class RollbackUtils {

/**
* Get Latest version of Rollback plan corresponding to a clean instant.
* @param metaClient Hoodie Table Meta Client
*
* @param metaClient Hoodie Table Meta Client
* @param rollbackInstant Instant referring to rollback action
* @return Rollback plan corresponding to rollback instant
* @throws IOException
Expand Down Expand Up @@ -106,12 +107,10 @@ static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRoll
* Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type.
* @param engineContext instance of {@link HoodieEngineContext} to use.
* @param basePath base path of interest.
* @param config instance of {@link HoodieWriteConfig} to use.
* @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
*/
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext,
String basePath, HoodieWriteConfig config) {
return FSUtils.getAllPartitionPaths(engineContext, config.getMetadataConfig(), basePath).stream()
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, String basePath) {
return FSUtils.getAllPartitionPaths(engineContext, basePath, false, false).stream()
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
.collect(Collectors.toList());
}
Expand All @@ -127,7 +126,7 @@ public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListin
public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException {
String commit = instantToRollback.getTimestamp();
HoodieWriteConfig config = table.getConfig();
List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I see various versions of FSUtils.getAllPartitionPaths(...) which have similar code. Maybe remove code duplication there.
not essential though.

if (partitions.isEmpty()) {
return new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected void recreateMarkers(final String commitInstantTime,
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig());
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -93,6 +92,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -307,9 +307,8 @@ public void testMetadataTableServices() throws Exception {
/**
* Test rollback of various table operations sync to Metadata Table correctly.
*/
//@ParameterizedTest
//@EnumSource(HoodieTableType.class)
@Disabled
@ParameterizedTest
@EnumSource(HoodieTableType.class)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test along with testErrorCases was flaky in CI and to unblock CI disabled it sometime back. now with this fix, should not be flaky anymore.

public void testRollbackOperations(HoodieTableType tableType) throws Exception {
init(tableType);
doWriteInsertAndUpsert(testTable);
Expand Down Expand Up @@ -1087,18 +1086,72 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
}

/**
* Tests rollback of a commit which has new partitions which is not present in hudi table prior to the commit being rolledback.
*
* @throws Exception
*/
@Test
public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
List<HoodieRecord> upsertRecords = new ArrayList<>();
for (HoodieRecord entry : records) {
if (entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
|| entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) {
upsertRecords.add(entry);
}
}
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// There is no way to simulate failed commit on the main dataset, hence we simply delete the completed
// instant so that only the inflight is left over.
String commitInstantFileName = HoodieTimeline.makeCommitFileName(newCommitTime);
assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME,
commitInstantFileName), false));
}

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
true)) {
String newCommitTime = client.startCommit();
// Next insert
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
}
}

/**
* Test various error scenarios.
*/
//@Test
@Disabled
@Test
public void testErrorCases() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

// TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table
// should be rolled back to last valid commit.
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
Expand All @@ -1111,6 +1164,7 @@ public void testErrorCases() throws Exception {
records = dataGen.generateInserts(newCommitTime, 5);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// There is no way to simulate failed commit on the main dataset, hence we simply delete the completed
// instant so that only the inflight is left over.
Expand All @@ -1119,7 +1173,9 @@ public void testErrorCases() throws Exception {
commitInstantFileName), false));
}

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
true)) {
String newCommitTime = client.startCommit();
// Next insert
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics, enableFullScan).build();
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics,
enableFullScan, true).build();
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
}

Expand Down Expand Up @@ -265,11 +266,11 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, bo

protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics) {
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true);
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, true);
}

protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics, boolean enableFullScan) {
boolean enableMetrics, boolean enableFullScan, boolean useRollbackUsingMarkers) {
Properties properties = new Properties();
properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName());
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
Expand All @@ -292,6 +293,7 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea
.withExecutorMetrics(true).build())
.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
.usePrefix("unit-test").build())
.withRollbackUsingMarkers(useRollbackUsingMarkers)
.withProperties(properties);
}

Expand Down