Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ public void unlock() {
try {
if (LOCK.isWriteLockedByCurrentThread()) {
LOCK.writeLock().unlock();
LOG.info(getLogMessage(LockState.RELEASED));
} else {
LOG.warn("Cannot unlock because the current thread does not hold the lock.");
}
} catch (Exception e) {
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_RELEASE), e);
}
LOG.info(getLogMessage(LockState.RELEASED));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)
|| cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));

HoodieInstant inflightInstant = null;
try {
final HoodieInstant inflightInstant;
final HoodieTimer timer = new HoodieTimer();
timer.startTimer();
if (cleanInstant.isRequested()) {
Expand All @@ -218,7 +218,7 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
cleanStats
);
if (!skipLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
this.txnManager.beginTransaction(Option.of(inflightInstant), Option.empty());
}
writeTableMetadata(metadata, inflightInstant.getTimestamp());
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
Expand All @@ -229,7 +229,7 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
throw new HoodieIOException("Failed to clean up after commit", e);
} finally {
if (!skipLocking) {
this.txnManager.endTransaction(Option.empty());
this.txnManager.endTransaction(Option.of(inflightInstant));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ private HoodieRestoreMetadata finishRestore(Map<String, List<HoodieRollbackMetad

HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.convertRestoreMetadata(
instantTime, durationInMs, instantsRolledBack, instantToMetadata);
writeToMetadata(restoreMetadata);
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
HoodieInstant restoreInflightInstant = new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime);
writeToMetadata(restoreMetadata, restoreInflightInstant);
table.getActiveTimeline().saveAsComplete(restoreInflightInstant, TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
// get all pending rollbacks instants after restore instant time and delete them.
// if not, rollbacks will be considered not completed and might hinder metadata table compaction.
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getRollbackTimeline()
Expand All @@ -151,12 +151,12 @@ private HoodieRestoreMetadata finishRestore(Map<String, List<HoodieRollbackMetad
*
* @param restoreMetadata instance of {@link HoodieRestoreMetadata} to be applied to metadata.
*/
private void writeToMetadata(HoodieRestoreMetadata restoreMetadata) {
private void writeToMetadata(HoodieRestoreMetadata restoreMetadata, HoodieInstant restoreInflightInstant) {
try {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
this.txnManager.beginTransaction(Option.of(restoreInflightInstant), Option.empty());
writeTableMetadata(restoreMetadata);
} finally {
this.txnManager.endTransaction(Option.empty());
this.txnManager.endTransaction(Option.of(restoreInflightInstant));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetad
boolean enableLocking = (!skipLocking && !skipTimelinePublish);
try {
if (enableLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
this.txnManager.beginTransaction(Option.of(inflightInstant), Option.empty());
}

// If publish the rollback to the timeline, we first write the rollback metadata
Expand All @@ -261,7 +261,7 @@ protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetad
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
} finally {
if (enableLocking) {
this.txnManager.endTransaction(Option.empty());
this.txnManager.endTransaction(Option.of(inflightInstant));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,47 @@ public void testTryLockReAcquisitionByDifferentThread() {
});
}

@Test
public void testTryUnLockByDifferentThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
final AtomicBoolean writer3Completed = new AtomicBoolean(false);

// Main test thread
Assertions.assertTrue(inProcessLockProvider.tryLock());

// Another writer thread
Thread writer2 = new Thread(() -> {
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
});
writer2.start();
try {
writer2.join();
} catch (InterruptedException e) {
//
}

// try acquiring by diff thread. should fail. since main thread still have acquired the lock. if previous unblock by a different thread would have succeeded, this lock
// acquisition would succeed.
Thread writer3 = new Thread(() -> {
Assertions.assertFalse(inProcessLockProvider.tryLock(50, TimeUnit.MILLISECONDS));
writer3Completed.set(true);
});
writer3.start();
try {
writer3.join();
} catch (InterruptedException e) {
//
}

Assertions.assertTrue(writer3Completed.get());
assertDoesNotThrow(() -> {
// unlock by main thread should succeed.
inProcessLockProvider.unlock();
});
}

@Test
public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
Expand Down