Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ public HoodieRestoreMetadata restoreToInstant(final String instantTime, boolean
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
Timer.Context timerContext = metrics.getRollbackCtx();
try {
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.of(restoreInstantTime), initialMetadataTableIfNecessary);
Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
if (restorePlanOption.isPresent()) {
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
Expand Down Expand Up @@ -1035,7 +1035,8 @@ public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
public void dropIndex(List<MetadataPartitionType> partitionTypes) {
HoodieTable table = createTable(config, hadoopConf);
String dropInstant = HoodieActiveTimeline.createNewInstantTime();
this.txnManager.beginTransaction();
HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
try {
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table");
table.getMetadataWriter(dropInstant).ifPresent(w -> {
Expand All @@ -1046,7 +1047,7 @@ public void dropIndex(List<MetadataPartitionType> partitionTypes) {
}
});
} finally {
this.txnManager.endTransaction();
this.txnManager.endTransaction(Option.of(ownerInstant));
}
}

Expand Down Expand Up @@ -1432,13 +1433,16 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option<S
}

HoodieTable table;

this.txnManager.beginTransaction();
Option<HoodieInstant> ownerInstant = Option.empty();
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
this.txnManager.beginTransaction(ownerInstant, Option.empty());
try {
tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
} finally {
this.txnManager.endTransaction();
this.txnManager.endTransaction(ownerInstant);
}

// Validate table properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException
public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
try {
if (acquireLock) {
txnManager.beginTransaction();
// there is no owner or instant time per se for archival.
txnManager.beginTransaction(Option.empty(), Option.empty());
}
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
verifyLastMergeArchiveFilesIfNecessary(context);
Expand All @@ -179,7 +180,7 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc
} finally {
close();
if (acquireLock) {
txnManager.endTransaction();
txnManager.endTransaction(Option.empty());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
}

public void beginTransaction() {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction starting without a transaction owner");
lockManager.lock();
LOG.info("Transaction started without a transaction owner");
}
}

public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
if (isOptimisticConcurrencyControlEnabled) {
Expand All @@ -65,30 +57,25 @@ public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
}
}

public void endTransaction() {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction ending without a transaction owner");
lockManager.unlock();
LOG.info("Transaction ended without a transaction owner");
}
}

public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
reset(currentTxnOwnerInstant, Option.empty(), Option.empty());
lockManager.unlock();
LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
lockManager.unlock();
LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
}
}
}

private synchronized void reset(Option<HoodieInstant> callerInstant,
private synchronized boolean reset(Option<HoodieInstant> callerInstant,
Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant.get().equals(callerInstant.get())) {
this.currentTxnOwnerInstant = newTxnOwnerInstant;
this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
return true;
}
return false;
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,14 @@ private void updateTableConfigAndTimeline(HoodieInstant indexInstant,
HoodieIndexCommitMetadata indexCommitMetadata) throws IOException {
try {
// update the table config and timeline in a lock as there could be another indexer running
txnManager.beginTransaction();
txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
updateMetadataPartitionsTableConfig(table.getMetaClient(),
finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
table.getActiveTimeline().saveAsComplete(
new HoodieInstant(true, INDEXING_ACTION, indexInstant.getTimestamp()),
TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
} finally {
txnManager.endTransaction();
txnManager.endTransaction(Option.of(indexInstant));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,28 @@ private HoodieWriteConfig getWriteConfig() {

@Test
public void testSingleWriterTransaction() {
transactionManager.beginTransaction();
transactionManager.endTransaction();
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
transactionManager.endTransaction(newTxnOwnerInstant);
}

@Test
public void testSingleWriterNestedTransaction() {
transactionManager.beginTransaction();
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);

Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000003");
Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000004");

assertThrows(HoodieLockException.class, () -> {
transactionManager.beginTransaction();
transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1);
});

transactionManager.endTransaction();
transactionManager.endTransaction(newTxnOwnerInstant);
assertDoesNotThrow(() -> {
transactionManager.endTransaction();
transactionManager.endTransaction(newTxnOwnerInstant1);
});
}

Expand All @@ -94,11 +102,16 @@ public void testMultiWriterTransactions() {
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);

Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000001");
Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000002");
Option<HoodieInstant> lastCompletedInstant2 = getInstant("0000003");
Option<HoodieInstant> newTxnOwnerInstant2 = getInstant("0000004");

// Let writer1 get the lock first, then wait for others
// to join the sync up point.
Thread writer1 = new Thread(() -> {
assertDoesNotThrow(() -> {
transactionManager.beginTransaction();
transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1);
});
latch.countDown();
try {
Expand All @@ -111,7 +124,7 @@ public void testMultiWriterTransactions() {
//
}
assertDoesNotThrow(() -> {
transactionManager.endTransaction();
transactionManager.endTransaction(newTxnOwnerInstant1);
});
writer1Completed.set(true);
});
Expand All @@ -127,10 +140,10 @@ public void testMultiWriterTransactions() {
//
}
assertDoesNotThrow(() -> {
transactionManager.beginTransaction();
transactionManager.beginTransaction(newTxnOwnerInstant2, lastCompletedInstant2);
});
assertDoesNotThrow(() -> {
transactionManager.endTransaction();
transactionManager.endTransaction(newTxnOwnerInstant2);
});
writer2Completed.set(true);
});
Expand All @@ -152,6 +165,32 @@ public void testMultiWriterTransactions() {
Assertions.assertTrue(writer2Completed.get());
}

@Test
public void testEndTransactionByDiffOwner() throws InterruptedException {
// 1. Begin and end by the same transaction owner
Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);

CountDownLatch countDownLatch = new CountDownLatch(1);
// Another writer thread
Thread writer2 = new Thread(() -> {
Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000003");
transactionManager.endTransaction(newTxnOwnerInstant1);
countDownLatch.countDown();
});

writer2.start();
countDownLatch.await(30, TimeUnit.SECONDS);
// should not have reset the state within transaction manager since the owner is different.
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc, even in this scenario, without this patch, reset would be called and it would return w/o resetting but then unlock would also be called which we don't want. And this patch fixes this behavior. So, can we verify that not only state is not reset but also unlock is not called?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make reset method just visible for testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we usually don't use VisibleForTesting annotation. lets sync up on this. we might have tests in InProcessLockProvider to assert for the condition you are talking about

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok found it: TestInProcessLockProvider#testLockReAcquisitionByDifferentThread. That's good enough.
Since we already use Mocito, maybe we can use its verify API to just verify calls.

Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner().isPresent());

transactionManager.endTransaction(newTxnOwnerInstant);
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
}

@Test
public void testTransactionsWithInstantTime() {
// 1. Begin and end by the same transaction owner
Expand All @@ -164,14 +203,15 @@ public void testTransactionsWithInstantTime() {
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());

// 2. Begin transaction with a new txn owner, but end transaction with no/wrong owner
// 2. Begin transaction with a new txn owner, but end transaction with wrong owner
lastCompletedInstant = getInstant("0000002");
newTxnOwnerInstant = getInstant("0000003");
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
transactionManager.endTransaction();
transactionManager.endTransaction(getInstant("0000004"));
// Owner reset would not happen as the end txn was invoked with an incorrect current txn owner
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant);
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant);
transactionManager.endTransaction(newTxnOwnerInstant);

// 3. But, we should be able to begin a new transaction for a new owner
lastCompletedInstant = getInstant("0000003");
Expand All @@ -183,15 +223,7 @@ public void testTransactionsWithInstantTime() {
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());

// 4. Transactions with no owners should also go through
transactionManager.beginTransaction();
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
transactionManager.endTransaction();
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());

// 5. Transactions with new instants but with same timestamps should properly reset owners
// 4. Transactions with new instants but with same timestamps should properly reset owners
transactionManager.beginTransaction(getInstant("0000005"), Option.empty());
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
Expand Down