Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,15 @@ public HoodieHeartbeatClient getHeartbeatClient() {
* @param metadata Current committing instant's metadata
* @param pendingInflightAndRequestedInstants Pending instants on the timeline
*
* @see {@link BaseHoodieWriteClient#preCommit}
* @see {@link BaseHoodieTableServiceClient#preCommit}
* @see BaseHoodieWriteClient#preCommit
* @see BaseHoodieTableServiceClient#preCommit
*/
protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata metadata, Set<String> pendingInflightAndRequestedInstants) {
Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
try {
// Because HoodieTable is newly initialized, no need to reload active timeline here
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants);
Option.of(metadata), config, pendingInflightAndRequestedInstants);
metrics.emitConflictResolutionSuccessful();
} catch (HoodieWriteConflictException e) {
metrics.emitConflictResolutionFailed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public Option<String> scheduleTableService(String instantTime, Option<Map<String
final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED,
tableServiceType.getAction(), instantTime));
try {
this.txnManager.beginTransaction(inflightInstant, Option.empty());
this.txnManager.beginTransaction(inflightInstant);
LOG.info("Scheduling table service " + tableServiceType);
return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
} finally {
Expand Down Expand Up @@ -809,7 +809,7 @@ protected void removeInflightFilesAlreadyRolledBack(List<String> instantsToRollb
.collect(Collectors.toList());
LOG.info("Rollback completed instants {}", rollbackCompletedInstants);
try {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
this.txnManager.beginTransaction(Option.empty());
rollbackCompletedInstants.forEach(instant -> {
// remove pending commit files.
HoodieInstant hoodieInstant = activeTimeline
Expand Down Expand Up @@ -922,17 +922,6 @@ public void rollbackFailedBootstrap() {
}
}

/**
* Some writers use SparkAllowUpdateStrategy and treat replacecommit plan as revocable plan.
* In those cases, their ConflictResolutionStrategy implementation should run conflict resolution
* even for clustering operations.
*
* @return boolean
*/
protected boolean isPreCommitRequired() {
return this.config.getWriteConflictResolutionStrategy().isPreCommitRequired();
}

private Option<String> delegateToTableServiceManager(TableServiceType tableServiceType, HoodieTable table) {
if (!config.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.compaction)) {
return Option.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
private final SupportsUpgradeDowngrade upgradeDowngradeHelper;
private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;

protected transient Timer.Context writeTimer = null;

protected Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxnAndMetadata = Option.empty();
protected Set<String> pendingInflightAndRequestedInstants = Collections.emptySet();

protected BaseHoodieTableServiceClient<O> tableServiceClient;

/**
Expand Down Expand Up @@ -227,8 +223,7 @@ public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStat
extraMetadata, operationType, config.getWriteSchema(), commitActionType);
HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, commitActionType, instantTime);
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
this.txnManager.beginTransaction(Option.of(inflightInstant),
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
this.txnManager.beginTransaction(Option.of(inflightInstant));
try {
preCommit(inflightInstant, metadata);
if (extraPreCommitFunc.isPresent()) {
Expand Down Expand Up @@ -507,11 +502,10 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan
public void preWrite(String instantTime, WriteOperationType writeOperationType,
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = txnManager.isLockRequired()
? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
this.pendingInflightAndRequestedInstants.remove(instantTime);
tableServiceClient.setPendingInflightAndRequestedInstants(this.pendingInflightAndRequestedInstants);
if (txnManager.isLockRequired() && config.getWriteConflictResolutionStrategy().isPendingInstantsBeforeWriteRequired()) {
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstantsWithoutCurrent(metaClient, instantTime);
tableServiceClient.setPendingInflightAndRequestedInstants(this.pendingInflightAndRequestedInstants);
}
tableServiceClient.startAsyncCleanerService(this);
tableServiceClient.startAsyncArchiveService(this);
}
Expand Down Expand Up @@ -970,7 +964,7 @@ public void dropIndex(List<MetadataPartitionType> partitionTypes) {
HoodieTable table = createTable(config, hadoopConf);
String dropInstant = HoodieActiveTimeline.createNewInstantTime();
HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
this.txnManager.beginTransaction(Option.of(ownerInstant));
try {
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table: " + config.getTableName());
table.getMetadataWriter(dropInstant).ifPresent(w -> {
Expand Down Expand Up @@ -1181,7 +1175,7 @@ protected void doInitTable(WriteOperationType operationType, HoodieTableMetaClie
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
this.txnManager.beginTransaction(ownerInstant, Option.empty());
this.txnManager.beginTransaction(ownerInstant);
try {
tryUpgrade(metaClient, instantTime);
initMetadataTable(instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc
try {
if (acquireLock) {
// there is no owner or instant time per se for archival.
txnManager.beginTransaction(Option.empty(), Option.empty());
txnManager.beginTransaction(Option.empty());
}
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
verifyLastMergeArchiveFilesIfNecessary(context);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ private void init(HoodieInstant instant) {
if (instant.isCompleted()) {
this.mutatedPartitionAndFileIds = getPartitionAndFileIdWithoutSuffixFromSpecificRecord(
this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats());
// for replacecommit, FG before and after replace need to be considered
Map<String, List<String>> partitionToReplaceFileIds = this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToReplaceFileIds();
this.mutatedPartitionAndFileIds.addAll(CommitUtils.flattenPartitionToReplaceFileIds(partitionToReplaceFileIds));
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getOperationType());
Expand Down Expand Up @@ -163,6 +164,7 @@ private void init(HoodieInstant instant) {
case LOG_COMPACTION_ACTION:
this.mutatedPartitionAndFileIds = CommitUtils.getPartitionAndFileIdWithoutSuffix(this.metadataWrapper.getCommitMetadata().getPartitionToWriteStats());
this.operationType = this.metadataWrapper.getCommitMetadata().getOperationType();
// for replacecommit, FG before and after replace need to be considered
if (this.operationType.equals(WriteOperationType.CLUSTER) || WriteOperationType.isOverwrite(this.operationType)) {
HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata) this.metadataWrapper.getCommitMetadata();
mutatedPartitionAndFileIds.addAll(CommitUtils.flattenPartitionToReplaceFileIds(replaceCommitMetadata.getPartitionToReplaceFileIds()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;

import java.util.Set;
import java.util.stream.Stream;

/**
Expand All @@ -40,7 +40,9 @@ public interface ConflictResolutionStrategy {
* Stream of instants to check conflicts against.
* @return
*/
Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient metaClient, HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient metaClient, HoodieInstant currentInstant,
Option<Set<String>> pendingInstantsBeforeWritten);

/**
* Implementations of this method will determine whether a conflict exists between 2 commits.
Expand All @@ -58,14 +60,17 @@ public interface ConflictResolutionStrategy {
* @return
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) throws HoodieWriteConflictException;
void resolveConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) throws HoodieWriteConflictException;

/**
* Whether to record the pending instants before write.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
boolean isPendingInstantsBeforeWriteRequired();

/**
* Write clients uses their preCommit API to run conflict resolution.
* This method determines whether to execute preCommit for table services like clustering.
* @return boolean
* Whether the write operation needs to do conflict detection.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
boolean isPreCommitRequired();
boolean isConflictResolveRequired(WriteOperationType operationType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void beginTransaction(String newTxnOwnerInstantTime) {
LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and " + filePath);
lockManager.lock();

reset(currentTxnOwnerInstant, Option.of(getInstant(newTxnOwnerInstantTime)), Option.empty());
reset(currentTxnOwnerInstant, Option.of(getInstant(newTxnOwnerInstantTime)));
LOG.info("Transaction started for " + newTxnOwnerInstantTime + " and " + filePath);
}
}
Expand All @@ -60,7 +60,7 @@ public void endTransaction(String currentTxnOwnerInstantTime) {
if (isLockRequired) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstantTime
+ " for " + filePath);
if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)), Option.empty(), Option.empty())) {
if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)), Option.empty())) {
lockManager.unlock();
LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstantTime
+ " for " + filePath);
Expand Down
Loading