Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,35 +115,31 @@ private void initKeyGenIfNeeded(boolean populateMetaFields) {
}

private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
if (config.isClusteringEnabled()) {
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
(Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>)updateStrategy.handleUpdate(inputRecordsRDD);
Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
return recordsAndPendingClusteringFileGroups.getLeft();
}
// there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants
// there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed
if (config.isRollbackPendingClustering()) {
Set<HoodieInstant> pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
.filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
pendingClusteringInstantsToRollback.forEach(instant -> {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers());
table.rollback(context, commitTime, instant, true, true);
});
table.getMetaClient().reloadActiveTimeline();
}
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
UpdateStrategy updateStrategy = (UpdateStrategy) ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
(Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>) updateStrategy.handleUpdate(inputRecordsRDD);
Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
return recordsAndPendingClusteringFileGroups.getLeft();
} else {
return inputRecordsRDD;
}
// there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants
// there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed
if (config.isRollbackPendingClustering()) {
Set<HoodieInstant> pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
.filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
pendingClusteringInstantsToRollback.forEach(instant -> {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers());
table.rollback(context, commitTime, instant, true, true);
});
table.getMetaClient().reloadActiveTimeline();
}
return recordsAndPendingClusteringFileGroups.getLeft();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@

package org.apache.hudi.client;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand All @@ -42,23 +36,31 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class TestHoodieClientMultiWriter extends HoodieClientTestBase {

Expand Down Expand Up @@ -105,7 +107,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E
try {
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e1) {
Assertions.assertTrue(e1 instanceof HoodieWriteConflictException);
assertTrue(e1 instanceof HoodieWriteConflictException);
throw new RuntimeException(e1);
}
});
Expand All @@ -116,13 +118,13 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E
try {
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e2) {
Assertions.assertTrue(e2 instanceof HoodieWriteConflictException);
assertTrue(e2 instanceof HoodieWriteConflictException);
throw new RuntimeException(e2);
}
});
future1.get();
future2.get();
Assertions.fail("Should not reach here, this means concurrent writes were handled incorrectly");
fail("Should not reach here, this means concurrent writes were handled incorrectly");
} catch (Exception e) {
// Expected to fail due to overlapping commits
}
Expand Down Expand Up @@ -206,20 +208,20 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
// Disabling embedded timeline server, it doesn't work with multiwriter
HoodieWriteConfig cfg = getConfigBuilder()
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false)
.withInlineCompaction(false).withAsyncClean(true)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withMaxNumDeltaCommitsBeforeCompaction(2).build())
.withEmbeddedTimelineServerEnabled(false)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
FileSystemViewStorageType.MEMORY).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class)
.build()).withAutoCommit(false).withProperties(properties).build();
.build()).withAutoCommit(false).withProperties(properties);
Set<String> validInstants = new HashSet<>();
// Create the first commit with inserts
HoodieWriteConfig cfg = writeConfigBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
createCommitWithInserts(cfg, client, "000", "001", 200);
validInstants.add("001");
Expand All @@ -229,22 +231,26 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
validInstants.add("002");
validInstants.add("003");
ExecutorService executors = Executors.newFixedThreadPool(2);
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
// write config with clustering enabled
HoodieWriteConfig cfg2 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
.build();
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
String newCommitTime = "004";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
createCommitWithUpserts(cfg2, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
if (tableType == HoodieTableType.MERGE_ON_READ) {
Assertions.fail("Conflicts not handled correctly");
fail("Conflicts not handled correctly");
}
validInstants.add("004");
} catch (Exception e1) {
if (tableType == HoodieTableType.MERGE_ON_READ) {
Assertions.assertTrue(e1 instanceof HoodieWriteConflictException);
assertTrue(e1 instanceof HoodieWriteConflictException);
}
}
});
Expand Down Expand Up @@ -272,7 +278,7 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
String newCommitTime = "007";
int numRecords = 100;
try {
createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords);
createCommitWithInserts(cfg2, client1, "003", newCommitTime, numRecords);
validInstants.add("007");
} catch (Exception e1) {
throw new RuntimeException(e1);
Expand Down Expand Up @@ -300,10 +306,13 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
future1.get();
future2.get();
future3.get();
Set<String> completedInstants = metaClient.getActiveTimeline().getCommitsTimeline()
validInstants.addAll(
metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()));
Set<String> completedInstants = metaClient.reloadActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toSet());
Assertions.assertTrue(validInstants.containsAll(completedInstants));
assertTrue(validInstants.containsAll(completedInstants));
}

@ParameterizedTest
Expand All @@ -314,13 +323,16 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
}
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
HoodieWriteConfig cfg = getConfigBuilder()
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class)
.build()).withAutoCommit(false).withProperties(properties).build();
.build()).withAutoCommit(false).withProperties(properties);
HoodieWriteConfig cfg = writeConfigBuilder.build();
HoodieWriteConfig cfg2 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
.build();
// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
// Start another inflight commit
Expand All @@ -333,20 +345,20 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType)
numRecords, 200, 2);
// Start and finish another commit while the previous writer for commit 003 is running
newCommitTime = "004";
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
JavaRDD<WriteStatus> result2 = updateBatch(cfg, client2, newCommitTime, "001",
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2);
JavaRDD<WriteStatus> result2 = updateBatch(cfg2, client2, newCommitTime, "001",
Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false,
numRecords, 200, 2);
client2.commit(newCommitTime, result2);
// Schedule and run clustering while previous writer for commit 003 is running
SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);
// schedule clustering
Option<String> clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER);
client3.cluster(clusterInstant.get(), true);
assertFalse(clusterInstant.isPresent());
// Attempt to commit the inflight commit 003
try {
client1.commit("003", result1);
Assertions.fail("Should have thrown a concurrent conflict exception");
fail("Should have thrown a concurrent conflict exception");
} catch (Exception e) {
// Expected
}
Expand Down Expand Up @@ -376,5 +388,4 @@ private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient
numRecords, 200, 2);
client.commit(newCommitTime, result);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1418,8 +1418,7 @@ public void testPendingClusteringRollback(boolean populateMetaFields) throws Exc
dataGen = new HoodieTestDataGenerator();
String commitTime = HoodieActiveTimeline.createNewInstantTime();
allRecords.addAll(dataGen.generateInserts(commitTime, 200));
writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields);

assertThrows(HoodieUpsertException.class, () -> writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields));
// verify pending clustering can be rolled back (even though there is a completed commit greater than pending clustering)
client.rollback(pendingClusteringInstant.getTimestamp());
metaClient.reloadActiveTimeline();
Expand Down