Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -852,18 +852,13 @@ protected void rollbackFailedWrites(List<String> instantsToRollback, boolean ski
if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
rollbackFailedBootstrap();
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config);
break;
} else {
rollback(instant, skipLocking);
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config);
}
}
// Delete any heartbeat files for already rolled back commits
try {
HeartbeatUtils.cleanExpiredHeartbeats(this.heartbeatClient.getAllExistingHeartbeatInstants(),
createMetaClient(true), basePath);
} catch (IOException io) {
LOG.error("Unable to delete heartbeat files", io);
}
}

protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -30,9 +29,6 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Helper class to delete heartbeat for completed or failed instants with expired heartbeats.
Expand All @@ -55,6 +51,8 @@ public static boolean deleteHeartbeatFile(FileSystem fs, String basePath, String
deleted = fs.delete(new Path(heartbeatFolderPath + Path.SEPARATOR + instantTime), false);
if (!deleted) {
LOG.error("Failed to delete heartbeat for instant " + instantTime);
} else {
LOG.info("Deleted the heartbeat for instant " + instantTime);
}
} catch (IOException io) {
LOG.error("Unable to delete heartbeat for instant " + instantTime, io);
Expand All @@ -63,20 +61,19 @@ public static boolean deleteHeartbeatFile(FileSystem fs, String basePath, String
}

/**
* Deletes the heartbeat files for instants with expired heartbeats without any active instant.
* @param allExistingHeartbeatInstants
* @param metaClient
* @param basePath
* Deletes the heartbeat file for the specified instant.
* @param fs Hadoop FileSystem instance
* @param basePath Hoodie table base path
* @param instantTime Commit instant time
* @param config HoodieWriteConfig instance
* @return Boolean indicating whether heartbeat file was deleted or not
*/
public static void cleanExpiredHeartbeats(List<String> allExistingHeartbeatInstants,
HoodieTableMetaClient metaClient, String basePath) {
Set<String> nonExpiredHeartbeatInstants = metaClient.getActiveTimeline()
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
allExistingHeartbeatInstants.stream().forEach(instant -> {
if (!nonExpiredHeartbeatInstants.contains(instant)) {
deleteHeartbeatFile(metaClient.getFs(), basePath, instant);
}
});
public static boolean deleteHeartbeatFile(FileSystem fs, String basePath, String instantTime, HoodieWriteConfig config) {
if (config.getFailedWritesCleanPolicy().isLazy()) {
return deleteHeartbeatFile(fs, basePath, instantTime);
}

return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.hudi.client;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -136,6 +138,62 @@ public void testMultiWriterWithAsyncTableServicesWithConflictMOR() throws Except
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.MERGE_ON_READ);
}

@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception {
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}

Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");

HoodieWriteConfig cfg = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withMaxNumDeltaCommitsBeforeCompaction(2)
.build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(FileSystemBasedLockProviderTestClass.class)
.build())
.withAutoCommit(false)
.withProperties(properties)
.build();

// Create the first commit
SparkRDDWriteClient<?> client = getHoodieWriteClient(cfg);
createCommitWithInsertsForPartition(cfg, client, "000", "001", 100, "2016/03/01");

int numConcurrentWriters = 5;
ExecutorService executors = Executors.newFixedThreadPool(numConcurrentWriters);

List<Future<?>> futures = new ArrayList<>(numConcurrentWriters);
for (int loop = 0; loop < numConcurrentWriters; loop++) {
String newCommitTime = "00" + (loop + 2);
String partition = "2016/03/0" + (loop + 2);
futures.add(executors.submit(() -> {
try {
SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg);
createCommitWithInsertsForPartition(cfg, writeClient, "001", newCommitTime, 100, partition);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}

futures.forEach(f -> {
try {
f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
Expand Down Expand Up @@ -294,6 +352,14 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
}
}

private void createCommitWithInsertsForPartition(HoodieWriteConfig cfg, SparkRDDWriteClient client,
String prevCommitTime, String newCommitTime, int numRecords,
String partition) throws Exception {
JavaRDD<WriteStatus> result = insertBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::insert,
false, false, numRecords, numRecords, 1, Option.of(partition));
assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
}

private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client,
String prevCommitTime, String newCommitTime, int numRecords) throws Exception {
// Finish first base commmit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public void testMORTable() throws Exception {

// Insert with original schema is allowed now
insertBatch(hoodieWriteConfig, client, "009", "008", numRecords, SparkRDDWriteClient::insert,
false, false, 0, 0, 0);
false, false, 0, 0, 0, Option.empty());
checkLatestDeltaCommit("009");
checkReadRecords("000", 3 * numRecords);
}
Expand Down Expand Up @@ -438,7 +438,7 @@ public void testCopyOnWriteTable() throws Exception {

// Insert with original schema is allowed now
insertBatch(hoodieWriteConfig, client, "007", "003", numRecords, SparkRDDWriteClient::insert,
false, true, numRecords, 2 * numRecords, 1);
false, true, numRecords, 2 * numRecords, 1, Option.empty());
checkReadRecords("000", 2 * numRecords);

// Update with original schema is allowed now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,18 @@ private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionFor
};
}

private Function3<List<HoodieRecord>, String, Integer, String> wrapRecordsGenFunctionForPreppedCalls(
final HoodieWriteConfig writeConfig, final Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction) {
return (commit, numRecords, partition) -> {
final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig);
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords, partition);
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table);
return taggedRecords.collect();
};
}

/**
* Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys
* to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
Expand Down Expand Up @@ -285,6 +297,15 @@ public Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(bool
}
}

public Function3<List<HoodieRecord>, String, Integer, String> generateWrapRecordsForPartitionFn(boolean isPreppedAPI,
HoodieWriteConfig writeConfig, Function3<List<HoodieRecord>, String, Integer, String> wrapped) {
if (isPreppedAPI) {
return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
} else {
return wrapped;
}
}

/**
* Generate wrapper for delete key generation function for testing Prepped APIs.
*
Expand Down Expand Up @@ -355,12 +376,22 @@ public JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, Spar
public JavaRDD<WriteStatus> insertBatch(HoodieWriteConfig writeConfig, SparkRDDWriteClient client, String newCommitTime,
String initCommitTime, int numRecordsInThisCommit,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, Option<String> partition) throws Exception {

return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false);
if (partition.isPresent()) {
final Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction =
generateWrapRecordsForPartitionFn(isPreppedAPI, writeConfig, dataGen::generateInsertsForPartition);

return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false,
partition.get());
} else {
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);

return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false);
}
}

public JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, SparkRDDWriteClient client, String newCommitTime,
Expand Down Expand Up @@ -453,6 +484,16 @@ public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String newCom
writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, doCommit, true);
}

public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime,
Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
boolean doCommit, String partition) throws Exception {
return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime, numRecordsInThisCommit, recordGenFunction,
writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, doCommit, true, partition);
}

/**
* Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
*
Expand All @@ -478,10 +519,35 @@ public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String newCom
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean doCommit,
boolean filterForCommitTimeWithAssert) throws Exception {

List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
return writeBatchHelper(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
numRecordsInThisCommit, records, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
expTotalCommits, doCommit, filterForCommitTimeWithAssert);
}

public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime,
Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean doCommit,
boolean filterForCommitTimeWithAssert,
String partition) throws Exception {

List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit, partition);
return writeBatchHelper(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
numRecordsInThisCommit, records, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
expTotalCommits, doCommit, filterForCommitTimeWithAssert);
}

private JavaRDD<WriteStatus> writeBatchHelper(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime,
Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
int numRecordsInThisCommit, List<HoodieRecord> records,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords,
int expTotalCommits, boolean doCommit, boolean filterForCommitTimeWithAssert) throws IOException {
// Write 1 (only inserts)
client.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);

JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
Expand Down