Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("Enable data skipping by collecting statistics once layout optimization is complete.");

public static final ConfigProperty<Boolean> ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT = ConfigProperty
.key("hoodie.clustering.rollback.pending.replacecommit.on.conflict")
.defaultValue(false)
.sinceVersion("0.10.0")
.withDocumentation("If updates are allowed to file groups pending clustering, then set this config to rollback failed or pending clustering instants. "
+ "Pending clustering will be rolled back ONLY IF there is conflict between incoming upsert and filegroup to be clustered. "
+ "Please exercise caution while setting this config, especially when clustering is done very frequently. This could lead to race condition in "
+ "rare scenarios, for example, when the clustering completes after instants are fetched but before rollback completed.");

/**
* @deprecated Use {@link #PLAN_STRATEGY_CLASS_NAME} and its methods instead
*/
Expand Down Expand Up @@ -404,6 +413,11 @@ public Builder withPreserveHoodieCommitMetadata(Boolean preserveHoodieCommitMeta
return this;
}

public Builder withRollbackPendingClustering(Boolean rollbackPendingClustering) {
clusteringConfig.setValue(ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT, String.valueOf(rollbackPendingClustering));
return this;
}

public Builder withSpaceFillingCurveDataOptimizeEnable(Boolean enable) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_ENABLE, String.valueOf(enable));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,10 @@ public boolean isClusteringEnabled() {
return inlineClusteringEnabled() || isAsyncClusteringEnabled();
}

public boolean isRollbackPendingClustering() {
return getBoolean(HoodieClusteringConfig.ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT);
}

public int getInlineClusterMaxCommits() {
return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;

import java.util.Set;

Expand All @@ -41,8 +42,8 @@ protected UpdateStrategy(HoodieEngineContext engineContext, Set<HoodieFileGroupI
* Check the update records to the file group in clustering.
* @param taggedRecordsRDD the records to write, tagged with target file id,
* future can update tagged records location to a different fileId.
* @return the recordsRDD strategy updated
* @return the recordsRDD strategy updated and a set of file groups to be updated while pending clustering.
*/
public abstract I handleUpdate(I taggedRecordsRDD);
public abstract Pair<I, Set<HoodieFileGroupId>> handleUpdate(I taggedRecordsRDD);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;

import org.apache.spark.api.java.JavaRDD;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Allow ingestion commits during clustering job.
Expand All @@ -37,8 +42,19 @@ public SparkAllowUpdateStrategy(
super(engineContext, fileGroupsInPendingClustering);
}

private List<HoodieFileGroupId> getGroupIdsWithUpdate(JavaRDD<HoodieRecord<T>> inputRecords) {
List<HoodieFileGroupId> fileGroupIdsWithUpdates = inputRecords
.filter(record -> record.getCurrentLocation() != null)
.map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect();
return fileGroupIdsWithUpdates;
}

@Override
public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
return taggedRecordsRDD;
public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering = fileGroupIdsWithRecordUpdate.stream()
.filter(f -> fileGroupsInPendingClustering.contains(f))
.collect(Collectors.toSet());
return Pair.of(taggedRecordsRDD, fileGroupIdsWithUpdatesAndPendingClustering);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Update strategy based on following.
Expand All @@ -50,7 +54,7 @@ private List<HoodieFileGroupId> getGroupIdsWithUpdate(JavaRDD<HoodieRecord<T>> i
}

@Override
public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) {
Expand All @@ -61,7 +65,7 @@ public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedReco
throw new HoodieClusteringUpdateException(msg);
}
});
return taggedRecordsRDD;
return Pair.of(taggedRecordsRDD, Collections.emptySet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
import java.util.Set;
import java.util.Map;

import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans;

public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload> extends
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata> {

Expand Down Expand Up @@ -118,7 +120,27 @@ private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>>
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
return (JavaRDD<HoodieRecord<T>>)updateStrategy.handleUpdate(inputRecordsRDD);
Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
(Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>)updateStrategy.handleUpdate(inputRecordsRDD);
Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
return recordsAndPendingClusteringFileGroups.getLeft();
}
// there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants
// there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed
if (config.isRollbackPendingClustering()) {
Set<HoodieInstant> pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
.filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
pendingClusteringInstantsToRollback.forEach(instant -> {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers());
table.rollback(context, commitTime, instant, true, true);
});
table.getMetaClient().reloadActiveTimeline();
}
return recordsAndPendingClusteringFileGroups.getLeft();
} else {
return inputRecordsRDD;
}
Expand Down
Loading