diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 70e3cebce4a15..0ad944966be52 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -795,7 +795,7 @@ public HoodieRestoreMetadata restoreToInstant(final String instantTime, boolean final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime(); Timer.Context timerContext = metrics.getRollbackCtx(); try { - HoodieTable table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary); + HoodieTable table = initTable(WriteOperationType.UNKNOWN, Option.of(restoreInstantTime), initialMetadataTableIfNecessary); Option restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime); if (restorePlanOption.isPresent()) { HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime); @@ -1035,7 +1035,8 @@ public Option index(String indexInstantTime) { public void dropIndex(List partitionTypes) { HoodieTable table = createTable(config, hadoopConf); String dropInstant = HoodieActiveTimeline.createNewInstantTime(); - this.txnManager.beginTransaction(); + HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant); + this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty()); try { context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table"); table.getMetadataWriter(dropInstant).ifPresent(w -> { @@ -1046,7 +1047,7 @@ public void dropIndex(List partitionTypes) { } }); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(Option.of(ownerInstant)); } } @@ -1432,13 +1433,16 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option ownerInstant = Option.empty(); + if (instantTime.isPresent()) { + ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get())); + } + this.txnManager.beginTransaction(ownerInstant, Option.empty()); try { tryUpgrade(metaClient, instantTime); table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary); } finally { - this.txnManager.endTransaction(); + this.txnManager.endTransaction(ownerInstant); } // Validate table properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index b0d473be04e6d..fc78d933fa6c4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -157,7 +157,8 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException { try { if (acquireLock) { - txnManager.beginTransaction(); + // there is no owner or instant time per se for archival. + txnManager.beginTransaction(Option.empty(), Option.empty()); } List instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); verifyLastMergeArchiveFilesIfNecessary(context); @@ -179,7 +180,7 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc } finally { close(); if (acquireLock) { - txnManager.endTransaction(); + txnManager.endTransaction(Option.empty()); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index d9b9d3d269bf7..aef1fee5e0794 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -45,14 +45,6 @@ public TransactionManager(HoodieWriteConfig config, FileSystem fs) { this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); } - public void beginTransaction() { - if (isOptimisticConcurrencyControlEnabled) { - LOG.info("Transaction starting without a transaction owner"); - lockManager.lock(); - LOG.info("Transaction started without a transaction owner"); - } - } - public void beginTransaction(Option newTxnOwnerInstant, Option lastCompletedTxnOwnerInstant) { if (isOptimisticConcurrencyControlEnabled) { @@ -65,30 +57,25 @@ public void beginTransaction(Option newTxnOwnerInstant, } } - public void endTransaction() { - if (isOptimisticConcurrencyControlEnabled) { - LOG.info("Transaction ending without a transaction owner"); - lockManager.unlock(); - LOG.info("Transaction ended without a transaction owner"); - } - } - public void endTransaction(Option currentTxnOwnerInstant) { if (isOptimisticConcurrencyControlEnabled) { LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant); - reset(currentTxnOwnerInstant, Option.empty(), Option.empty()); - lockManager.unlock(); - LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant); + if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) { + lockManager.unlock(); + LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant); + } } } - private synchronized void reset(Option callerInstant, + private synchronized boolean reset(Option callerInstant, Option newTxnOwnerInstant, Option lastCompletedTxnOwnerInstant) { if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant.get().equals(callerInstant.get())) { this.currentTxnOwnerInstant = newTxnOwnerInstant; this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant; + return true; } + return false; } public void close() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java index 8c86a298f8a4b..339e95b9e08d4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java @@ -232,14 +232,14 @@ private void updateTableConfigAndTimeline(HoodieInstant indexInstant, HoodieIndexCommitMetadata indexCommitMetadata) throws IOException { try { // update the table config and timeline in a lock as there could be another indexer running - txnManager.beginTransaction(); + txnManager.beginTransaction(Option.of(indexInstant), Option.empty()); updateMetadataPartitionsTableConfig(table.getMetaClient(), finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet())); table.getActiveTimeline().saveAsComplete( new HoodieInstant(true, INDEXING_ACTION, indexInstant.getTimestamp()), TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata)); } finally { - txnManager.endTransaction(); + txnManager.endTransaction(Option.of(indexInstant)); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java index 22f8017841a83..6573560e752eb 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java @@ -69,20 +69,28 @@ private HoodieWriteConfig getWriteConfig() { @Test public void testSingleWriterTransaction() { - transactionManager.beginTransaction(); - transactionManager.endTransaction(); + Option lastCompletedInstant = getInstant("0000001"); + Option newTxnOwnerInstant = getInstant("0000002"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + transactionManager.endTransaction(newTxnOwnerInstant); } @Test public void testSingleWriterNestedTransaction() { - transactionManager.beginTransaction(); + Option lastCompletedInstant = getInstant("0000001"); + Option newTxnOwnerInstant = getInstant("0000002"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + + Option lastCompletedInstant1 = getInstant("0000003"); + Option newTxnOwnerInstant1 = getInstant("0000004"); + assertThrows(HoodieLockException.class, () -> { - transactionManager.beginTransaction(); + transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1); }); - transactionManager.endTransaction(); + transactionManager.endTransaction(newTxnOwnerInstant); assertDoesNotThrow(() -> { - transactionManager.endTransaction(); + transactionManager.endTransaction(newTxnOwnerInstant1); }); } @@ -94,11 +102,16 @@ public void testMultiWriterTransactions() { final AtomicBoolean writer1Completed = new AtomicBoolean(false); final AtomicBoolean writer2Completed = new AtomicBoolean(false); + Option lastCompletedInstant1 = getInstant("0000001"); + Option newTxnOwnerInstant1 = getInstant("0000002"); + Option lastCompletedInstant2 = getInstant("0000003"); + Option newTxnOwnerInstant2 = getInstant("0000004"); + // Let writer1 get the lock first, then wait for others // to join the sync up point. Thread writer1 = new Thread(() -> { assertDoesNotThrow(() -> { - transactionManager.beginTransaction(); + transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1); }); latch.countDown(); try { @@ -111,7 +124,7 @@ public void testMultiWriterTransactions() { // } assertDoesNotThrow(() -> { - transactionManager.endTransaction(); + transactionManager.endTransaction(newTxnOwnerInstant1); }); writer1Completed.set(true); }); @@ -127,10 +140,10 @@ public void testMultiWriterTransactions() { // } assertDoesNotThrow(() -> { - transactionManager.beginTransaction(); + transactionManager.beginTransaction(newTxnOwnerInstant2, lastCompletedInstant2); }); assertDoesNotThrow(() -> { - transactionManager.endTransaction(); + transactionManager.endTransaction(newTxnOwnerInstant2); }); writer2Completed.set(true); }); @@ -152,6 +165,32 @@ public void testMultiWriterTransactions() { Assertions.assertTrue(writer2Completed.get()); } + @Test + public void testEndTransactionByDiffOwner() throws InterruptedException { + // 1. Begin and end by the same transaction owner + Option lastCompletedInstant = getInstant("0000001"); + Option newTxnOwnerInstant = getInstant("0000002"); + transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); + + CountDownLatch countDownLatch = new CountDownLatch(1); + // Another writer thread + Thread writer2 = new Thread(() -> { + Option newTxnOwnerInstant1 = getInstant("0000003"); + transactionManager.endTransaction(newTxnOwnerInstant1); + countDownLatch.countDown(); + }); + + writer2.start(); + countDownLatch.await(30, TimeUnit.SECONDS); + // should not have reset the state within transaction manager since the owner is different. + Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner().isPresent()); + + transactionManager.endTransaction(newTxnOwnerInstant); + Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); + Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); + } + @Test public void testTransactionsWithInstantTime() { // 1. Begin and end by the same transaction owner @@ -164,14 +203,15 @@ public void testTransactionsWithInstantTime() { Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); - // 2. Begin transaction with a new txn owner, but end transaction with no/wrong owner + // 2. Begin transaction with a new txn owner, but end transaction with wrong owner lastCompletedInstant = getInstant("0000002"); newTxnOwnerInstant = getInstant("0000003"); transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant); - transactionManager.endTransaction(); + transactionManager.endTransaction(getInstant("0000004")); // Owner reset would not happen as the end txn was invoked with an incorrect current txn owner Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant); Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant); + transactionManager.endTransaction(newTxnOwnerInstant); // 3. But, we should be able to begin a new transaction for a new owner lastCompletedInstant = getInstant("0000003"); @@ -183,15 +223,7 @@ public void testTransactionsWithInstantTime() { Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); - // 4. Transactions with no owners should also go through - transactionManager.beginTransaction(); - Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); - Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); - transactionManager.endTransaction(); - Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent()); - Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent()); - - // 5. Transactions with new instants but with same timestamps should properly reset owners + // 4. Transactions with new instants but with same timestamps should properly reset owners transactionManager.beginTransaction(getInstant("0000005"), Option.empty()); Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent()); Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());