Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
} finally {
this.txnManager.endTransaction();
this.txnManager.endTransaction(Option.of(inflightInstant));
}
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);
Expand Down Expand Up @@ -1063,13 +1063,14 @@ public Option<String> scheduleTableService(Option<Map<String, String>> extraMeta
public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata,
TableServiceType tableServiceType) {
// A lock is required to guard against race conditions between an on-going writer and scheduling a table service.
final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
tableServiceType.getAction(), instantTime));
try {
this.txnManager.beginTransaction(Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
tableServiceType.getAction(), instantTime)), Option.empty());
this.txnManager.beginTransaction(inflightInstant, Option.empty());
LOG.info("Scheduling table service " + tableServiceType);
return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
} finally {
this.txnManager.endTransaction();
this.txnManager.endTransaction(inflightInstant);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,49 +35,64 @@
public class TransactionManager implements Serializable {

private static final Logger LOG = LogManager.getLogger(TransactionManager.class);

private final LockManager lockManager;
private Option<HoodieInstant> currentTxnOwnerInstant;
private Option<HoodieInstant> lastCompletedTxnOwnerInstant;
private boolean supportsOptimisticConcurrency;
private final boolean isOptimisticConcurrencyControlEnabled;
private Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();

public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
this.lockManager = new LockManager(config, fs);
this.supportsOptimisticConcurrency = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
}

public synchronized void beginTransaction() {
if (supportsOptimisticConcurrency) {
public void beginTransaction() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write actions seem to be calling this and the table services seem to be calling the one below. can we check to see if we need to streamline into one API?

if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction starting without a transaction owner");
lockManager.lock();
LOG.info("Transaction started");
LOG.info("Transaction started without a transaction owner");
}
}

public synchronized void beginTransaction(Option<HoodieInstant> currentTxnOwnerInstant, Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
if (supportsOptimisticConcurrency) {
this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
lockManager.setLatestCompletedWriteInstant(lastCompletedTxnOwnerInstant);
LOG.info("Latest completed transaction instant " + lastCompletedTxnOwnerInstant);
this.currentTxnOwnerInstant = currentTxnOwnerInstant;
LOG.info("Transaction starting with transaction owner " + currentTxnOwnerInstant);
public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction starting for " + newTxnOwnerInstant
+ " with latest completed transaction instant " + lastCompletedTxnOwnerInstant);
lockManager.lock();
LOG.info("Transaction started");
reset(currentTxnOwnerInstant, newTxnOwnerInstant, lastCompletedTxnOwnerInstant);
LOG.info("Transaction started for " + newTxnOwnerInstant
+ " with latest completed transaction instant " + lastCompletedTxnOwnerInstant);
}
}

public void endTransaction() {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction ending without a transaction owner");
lockManager.unlock();
LOG.info("Transaction ended without a transaction owner");
}
}

public synchronized void endTransaction() {
if (supportsOptimisticConcurrency) {
public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
reset(currentTxnOwnerInstant, Option.empty(), Option.empty());
lockManager.unlock();
LOG.info("Transaction ended");
this.lastCompletedTxnOwnerInstant = Option.empty();
lockManager.resetLatestCompletedWriteInstant();
LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
}
}

private synchronized void reset(Option<HoodieInstant> callerInstant,
Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant == callerInstant) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan We need to fix this as well. Usually callers pass in the same instant object and so not an issue. but, for callers who are passing in different object with the same instant also we need to reset.

Copy link
Contributor Author

@manojpec manojpec Dec 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan Filed https://issues.apache.org/jira/browse/HUDI-3064 to track the CI failing test TestHoodieClientMultiWriter flakiness . The test is buggy and FileSystemBasedLockProviderTestClass doesn't do the right tryLock. Even with the above TransactionManager fix, the test can fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.currentTxnOwnerInstant = newTxnOwnerInstant;
this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
}
}

public void close() {
if (supportsOptimisticConcurrency) {
if (isOptimisticConcurrencyControlEnabled) {
lockManager.close();
LOG.info("Transaction manager closed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void close() {
}

private String getLogMessage(LockState state) {
return StringUtils.join(String.valueOf(Thread.currentThread().getId()),
state.name(), " local process lock.");
return StringUtils.join("Thread ", String.valueOf(Thread.currentThread().getName()), " ",
state.name(), " in-process lock.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
Expand All @@ -46,11 +43,8 @@ public class LockManager implements Serializable, AutoCloseable {
private final LockConfiguration lockConfiguration;
private final SerializableConfiguration hadoopConf;
private volatile LockProvider lockProvider;
// Holds the latest completed write instant to know which ones to check conflict against
private final AtomicReference<Option<HoodieInstant>> latestCompletedWriteInstant;

public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
this.latestCompletedWriteInstant = new AtomicReference<>(Option.empty());
this.writeConfig = writeConfig;
this.hadoopConf = new SerializableConfiguration(fs.getConf());
this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
Expand Down Expand Up @@ -100,22 +94,6 @@ public synchronized LockProvider getLockProvider() {
return lockProvider;
}

public void setLatestCompletedWriteInstant(Option<HoodieInstant> instant) {
this.latestCompletedWriteInstant.set(instant);
}

public void compareAndSetLatestCompletedWriteInstant(Option<HoodieInstant> expected, Option<HoodieInstant> newValue) {
this.latestCompletedWriteInstant.compareAndSet(expected, newValue);
}

public AtomicReference<Option<HoodieInstant>> getLatestCompletedWriteInstant() {
return latestCompletedWriteInstant;
}

public void resetLatestCompletedWriteInstant() {
this.latestCompletedWriteInstant.set(Option.empty());
}

@Override
public void close() {
closeQuietly();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
throw new HoodieIOException("Failed to clean up after commit", e);
} finally {
if (!skipLocking) {
this.txnManager.endTransaction();
this.txnManager.endTransaction(Option.empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we try to set current trxn at L209. last trxn can be empty.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,16 @@ protected void commitOnAutoCommit(HoodieWriteMetadata result) {
}

protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)),
final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, instantTime));
this.txnManager.beginTransaction(inflightInstant,
lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
try {
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner());
commit(extraMetadata, result);
} finally {
this.txnManager.endTransaction();
this.txnManager.endTransaction(inflightInstant);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private void writeToMetadata(HoodieRestoreMetadata restoreMetadata) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
writeTableMetadata(restoreMetadata);
} finally {
this.txnManager.endTransaction();
this.txnManager.endTransaction(Option.empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we try to see if we can set atleast current owner at L 112.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetad
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
} finally {
if (!skipLocking) {
this.txnManager.endTransaction();
this.txnManager.endTransaction(Option.empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess we can fix L259 to send atleast the current trxn owner. may be last can be empty.

}
}
}
Expand Down
Loading