diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 3fe28c64bc5cc..293c1d94d389a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -201,7 +201,7 @@ public boolean commitStats(String instantTime, List stats, Opti } catch (IOException e) { throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.of(inflightInstant)); } // do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period runTableServicesInline(table, metadata, extraMetadata); @@ -1063,13 +1063,14 @@ public Option scheduleTableService(Option> extraMeta public Option scheduleTableService(String instantTime, Option> extraMetadata, TableServiceType tableServiceType) { // A lock is required to guard against race conditions between an on-going writer and scheduling a table service. + final Option inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED, + tableServiceType.getAction(), instantTime)); try { - this.txnManager.beginTransaction(Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED, - tableServiceType.getAction(), instantTime)), Option.empty()); + this.txnManager.beginTransaction(inflightInstant, Option.empty()); LOG.info("Scheduling table service " + tableServiceType); return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(inflightInstant); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index a6753aaa3a2ed..3a3552e74db90 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -35,49 +35,64 @@ public class TransactionManager implements Serializable { private static final Logger LOG = LogManager.getLogger(TransactionManager.class); - private final LockManager lockManager; - private Option currentTxnOwnerInstant; - private Option lastCompletedTxnOwnerInstant; - private boolean supportsOptimisticConcurrency; + private final boolean isOptimisticConcurrencyControlEnabled; + private Option currentTxnOwnerInstant = Option.empty(); + private Option lastCompletedTxnOwnerInstant = Option.empty(); public TransactionManager(HoodieWriteConfig config, FileSystem fs) { this.lockManager = new LockManager(config, fs); - this.supportsOptimisticConcurrency = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); + this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); } - public synchronized void beginTransaction() { - if (supportsOptimisticConcurrency) { + public void beginTransaction() { + if (isOptimisticConcurrencyControlEnabled) { LOG.info("Transaction starting without a transaction owner"); lockManager.lock(); - LOG.info("Transaction started"); + LOG.info("Transaction started without a transaction owner"); } } - public synchronized void beginTransaction(Option currentTxnOwnerInstant, Option lastCompletedTxnOwnerInstant) { - if (supportsOptimisticConcurrency) { - this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant; - lockManager.setLatestCompletedWriteInstant(lastCompletedTxnOwnerInstant); - LOG.info("Latest completed transaction instant " + lastCompletedTxnOwnerInstant); - this.currentTxnOwnerInstant = currentTxnOwnerInstant; - LOG.info("Transaction starting with transaction owner " + currentTxnOwnerInstant); + public void beginTransaction(Option newTxnOwnerInstant, + Option lastCompletedTxnOwnerInstant) { + if (isOptimisticConcurrencyControlEnabled) { + LOG.info("Transaction starting for " + newTxnOwnerInstant + + " with latest completed transaction instant " + lastCompletedTxnOwnerInstant); lockManager.lock(); - LOG.info("Transaction started"); + reset(currentTxnOwnerInstant, newTxnOwnerInstant, lastCompletedTxnOwnerInstant); + LOG.info("Transaction started for " + newTxnOwnerInstant + + " with latest completed transaction instant " + lastCompletedTxnOwnerInstant); + } + } + + public void endTransaction() { + if (isOptimisticConcurrencyControlEnabled) { + LOG.info("Transaction ending without a transaction owner"); + lockManager.unlock(); + LOG.info("Transaction ended without a transaction owner"); } } - public synchronized void endTransaction() { - if (supportsOptimisticConcurrency) { + public void endTransaction(Option currentTxnOwnerInstant) { + if (isOptimisticConcurrencyControlEnabled) { LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant); + reset(currentTxnOwnerInstant, Option.empty(), Option.empty()); lockManager.unlock(); - LOG.info("Transaction ended"); - this.lastCompletedTxnOwnerInstant = Option.empty(); - lockManager.resetLatestCompletedWriteInstant(); + LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant); + } + } + + private synchronized void reset(Option callerInstant, + Option newTxnOwnerInstant, + Option lastCompletedTxnOwnerInstant) { + if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant == callerInstant) { + this.currentTxnOwnerInstant = newTxnOwnerInstant; + this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant; } } public void close() { - if (supportsOptimisticConcurrency) { + if (isOptimisticConcurrencyControlEnabled) { lockManager.close(); LOG.info("Transaction manager closed"); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java index b151879d69e1c..cab9d95df4bed 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java @@ -111,7 +111,7 @@ public void close() { } private String getLogMessage(LockState state) { - return StringUtils.join(String.valueOf(Thread.currentThread().getId()), - state.name(), " local process lock."); + return StringUtils.join("Thread ", String.valueOf(Thread.currentThread().getName()), " ", + state.name(), " in-process lock."); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index d5e87ec202392..976205f3592e8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -20,13 +20,10 @@ import java.io.Serializable; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.lock.LockProvider; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieLockException; @@ -46,11 +43,8 @@ public class LockManager implements Serializable, AutoCloseable { private final LockConfiguration lockConfiguration; private final SerializableConfiguration hadoopConf; private volatile LockProvider lockProvider; - // Holds the latest completed write instant to know which ones to check conflict against - private final AtomicReference> latestCompletedWriteInstant; public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) { - this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty()); this.writeConfig = writeConfig; this.hadoopConf = new SerializableConfiguration(fs.getConf()); this.lockConfiguration = new LockConfiguration(writeConfig.getProps()); @@ -100,22 +94,6 @@ public synchronized LockProvider getLockProvider() { return lockProvider; } - public void setLatestCompletedWriteInstant(Option instant) { - this.latestCompletedWriteInstant.set(instant); - } - - public void compareAndSetLatestCompletedWriteInstant(Option expected, Option newValue) { - this.latestCompletedWriteInstant.compareAndSet(expected, newValue); - } - - public AtomicReference> getLatestCompletedWriteInstant() { - return latestCompletedWriteInstant; - } - - public void resetLatestCompletedWriteInstant() { - this.latestCompletedWriteInstant.set(Option.empty()); - } - @Override public void close() { closeQuietly(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 173010e86672e..f1f2e53ec0f1f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -217,7 +217,7 @@ private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstan throw new HoodieIOException("Failed to clean up after commit", e); } finally { if (!skipLocking) { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.empty()); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index ce6ed5db303c7..684db03174f03 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -147,14 +147,16 @@ protected void commitOnAutoCommit(HoodieWriteMetadata result) { } protected void autoCommit(Option> extraMetadata, HoodieWriteMetadata result) { - this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)), + final Option inflightInstant = Option.of(new HoodieInstant(State.INFLIGHT, + HoodieTimeline.COMMIT_ACTION, instantTime)); + this.txnManager.beginTransaction(inflightInstant, lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty()); try { TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner()); commit(extraMetadata, result); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(inflightInstant); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java index ac8f9940d4b36..93713401496d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java @@ -112,7 +112,7 @@ private void writeToMetadata(HoodieRestoreMetadata restoreMetadata) { this.txnManager.beginTransaction(Option.empty(), Option.empty()); writeTableMetadata(restoreMetadata); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.empty()); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 54cb51f03d300..7d2c366966bf4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -266,7 +266,7 @@ protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetad throw new HoodieIOException("Error executing rollback at instant " + instantTime, e); } finally { if (!skipLocking) { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.empty()); } } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java new file mode 100644 index 0000000000000..5589dff2c4eb0 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.transaction; + +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.WriteConcurrencyMode; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieLockConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieLockException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestTransactionManager extends HoodieCommonTestHarness { + HoodieWriteConfig writeConfig; + TransactionManager transactionManager; + + @BeforeEach + private void init() throws IOException { + initPath(); + initMetaClient(); + this.writeConfig = getWriteConfig(); + this.transactionManager = new TransactionManager(this.writeConfig, this.metaClient.getFs()); + } + + private HoodieWriteConfig getWriteConfig() { + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(InProcessLockProvider.class) + .build()) + .build(); + } + + @Test + public void testSingleWriterTransaction() { + transactionManager.beginTransaction(); + transactionManager.endTransaction(); + } + + @Test + public void testSingleWriterNestedTransaction() { + transactionManager.beginTransaction(); + assertThrows(HoodieLockException.class, () -> { + transactionManager.beginTransaction(); + }); + + transactionManager.endTransaction(); + assertThrows(HoodieLockException.class, () -> { + transactionManager.endTransaction(); + }); + } + + @Test + public void testMultiWriterTransactions() { + final int threadCount = 3; + final long awaitMaxTimeoutMs = 2000L; + final CountDownLatch latch = new CountDownLatch(threadCount); + final AtomicBoolean writer1Completed = new AtomicBoolean(false); + final AtomicBoolean writer2Completed = new AtomicBoolean(false); + + // Let writer1 get the lock first, then wait for others + // to join the sync up point. + Thread writer1 = new Thread(() -> { + assertDoesNotThrow(() -> { + transactionManager.beginTransaction(); + }); + latch.countDown(); + try { + latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS); + // Following sleep is to make sure writer2 attempts + // to try lock and to get blocked on the lock which + // this thread is currently holding. + Thread.sleep(50); + } catch (InterruptedException e) { + // + } + assertDoesNotThrow(() -> { + transactionManager.endTransaction(); + }); + writer1Completed.set(true); + }); + writer1.start(); + + // Writer2 will block on trying to acquire the lock + // and will eventually get the lock before the timeout. + Thread writer2 = new Thread(() -> { + latch.countDown(); + try { + latch.await(awaitMaxTimeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // + } + assertDoesNotThrow(() -> { + transactionManager.beginTransaction(); + }); + assertDoesNotThrow(() -> { + transactionManager.endTransaction(); + }); + writer2Completed.set(true); + }); + writer2.start(); + + // Let writer1 and writer2 wait at the sync up + // point to make sure they run in parallel and + // one get blocked by the other. + latch.countDown(); + try { + writer1.join(); + writer2.join(); + } catch (InterruptedException e) { + // + } + + // Make sure both writers actually completed good + Assertions.assertTrue(writer1Completed.get()); + Assertions.assertTrue(writer2Completed.get()); + } + + @Test + public void testTransactionsWithInstantTime() { + // 1. Begin and end by the same transaction owner + Option lastCompletedInstant = getInstant("0000001"); + Option newTxnOwnerInstant = getInstant("0000002"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant); + Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant); + transactionManager.endTransaction(newTxnOwnerInstant); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + + // 2. Begin transaction with a new txn owner, but end transaction with no/wrong owner + lastCompletedInstant = getInstant("0000002"); + newTxnOwnerInstant = getInstant("0000003"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + transactionManager.endTransaction(); + // Owner reset would not happen as the end txn was invoked with an incorrect current txn owner + Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant); + Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant); + + // 3. But, we should be able to begin a new transaction for a new owner + lastCompletedInstant = getInstant("0000003"); + newTxnOwnerInstant = getInstant("0000004"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant); + Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant); + transactionManager.endTransaction(newTxnOwnerInstant); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + + // 4. Transactions with no owners should also go through + transactionManager.beginTransaction(); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + transactionManager.endTransaction(); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + } + + private Option getInstant(String timestamp) { + return Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, timestamp)); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 374dd1226ca25..2ed2536c2db7a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -359,8 +359,8 @@ public void completeCompaction( String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); + final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { - HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); // commit to data table after committing to metadata table. @@ -371,7 +371,7 @@ public void completeCompaction( LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.of(compactionInstant)); } if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index dcd241618e41d..9b2aad3ebafa1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -300,8 +300,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD writeStats = writeStatuses.map(WriteStatus::getStat).collect(); + final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); try { - HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime); this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); // commit to data table after committing to metadata table. @@ -309,7 +309,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } + final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); try { - HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); finalizeWrite(table, clusteringCommitTime, writeStats); writeTableMetadataForTableServices(table, metadata,clusteringInstant); @@ -395,7 +395,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD