> getPendingRollbackInfos
return infoMap;
}
+ /**
+ * Rolls back the failed delta commits corresponding to the indexing action.
+ * Such delta commits are identified based on the suffix `METADATA_INDEXER_TIME_SUFFIX` ("004").
+ *
+ * TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks
+ * in the metadata table is landed.
+ *
+ * @return {@code true} if rollback happens; {@code false} otherwise.
+ */
+ protected boolean rollbackFailedIndexingCommits() {
+ HoodieTable table = createTable(config, hadoopConf);
+ List instantsToRollback = getFailedIndexingCommitsToRollback(table.getMetaClient());
+ Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
+ instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
+ rollbackFailedWrites(pendingRollbacks);
+ return !pendingRollbacks.isEmpty();
+ }
+
+ protected List getFailedIndexingCommitsToRollback(HoodieTableMetaClient metaClient) {
+ Stream inflightInstantsStream = metaClient.getCommitsTimeline()
+ .filter(instant -> !instant.isCompleted()
+ && isIndexingCommit(instant.getTimestamp()))
+ .getInstantsAsStream();
+ return inflightInstantsStream.filter(instant -> {
+ try {
+ return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+ } catch (IOException io) {
+ throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
+ }
+ }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ }
+
/**
* Rollback all failed writes.
+ *
* @return true if rollback was triggered. false otherwise.
*/
protected Boolean rollbackFailedWrites() {
@@ -699,6 +737,19 @@ protected List getInstantsToRollback(HoodieTableMetaClient metaClient, H
Stream inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient)
.getReverseOrderedInstants();
if (cleaningPolicy.isEager()) {
+ // Metadata table uses eager cleaning policy, but we need to exclude inflight delta commits
+ // from the async indexer (`HoodieIndexer`).
+ // TODO(HUDI-5733): This should be cleaned up once the proper fix of rollbacks in the
+ // metadata table is landed.
+ if (HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString())) {
+ return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
+ if (curInstantTime.isPresent()) {
+ return !entry.equals(curInstantTime.get());
+ } else {
+ return !isIndexingCommit(entry);
+ }
+ }).collect(Collectors.toList());
+ }
return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
if (curInstantTime.isPresent()) {
return !entry.equals(curInstantTime.get());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index c3260914bd51b..3fe0fdd330f0b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -491,7 +491,7 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan
public void preWrite(String instantTime, WriteOperationType writeOperationType,
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
- this.lastCompletedTxnAndMetadata = txnManager.isOptimisticConcurrencyControlEnabled()
+ this.lastCompletedTxnAndMetadata = txnManager.isLockRequired()
? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
this.pendingInflightAndRequestedInstants.remove(instantTime);
@@ -519,6 +519,8 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType,
*/
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option