Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieEarlyConflictException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers;
import org.apache.hudi.table.marker.WriteMarkers;

import java.io.IOException;
import java.util.ConcurrentModificationException;

/**
* This strategy is used for timeline server based marker writers, trying to do early conflict detection.
* It will call timeline server /v1/hoodie/marker/check-marker-conflict API to check if there is any marker conflict.
*/
public class AsyncTimelineMarkerConflictResolutionStrategy extends SimpleConcurrentFileWritesConflictResolutionStrategy {

@Override
public boolean hasMarkerConflict(WriteMarkers writeMarkers, HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) {
try {
assert writeMarkers instanceof TimelineServerBasedWriteMarkers;
return writeMarkers.hasMarkerConflict(config, partitionPath, fileId);
} catch (IOException e) {
throw new HoodieIOException("IOException occurs during checking markers conflict");
}
}

@Override
public Option<HoodieCommitMetadata> resolveMarkerConflict(WriteMarkers writeMarkers, String partitionPath, String dataFileName) {
throw new HoodieEarlyConflictException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@

package org.apache.hudi.client.transaction;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;

import java.util.stream.Stream;

Expand All @@ -43,13 +46,25 @@ public interface ConflictResolutionStrategy {
Stream<HoodieInstant> getCandidateInstants(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant, Option<HoodieInstant> lastSuccessfulInstant);

/**
* Implementations of this method will determine whether a conflict exists between 2 commits.
* Implementations of this method will determine whether a marker conflict exists between multi-writers.
* @param writeMarkers
* @param config
* @param fs
* @param partitionPath
* @param fileId
* @return
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
boolean hasMarkerConflict(WriteMarkers writeMarkers, HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId);

/**
* Implementations of this method will determine whether a commit conflict exists between 2 commits.
* @param thisOperation
* @param otherOperation
* @return
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation);
boolean hasCommitConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation);

/**
* Implementations of this method will determine how to resolve a conflict between 2 commits.
Expand All @@ -58,7 +73,13 @@ public interface ConflictResolutionStrategy {
* @return
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
Option<HoodieCommitMetadata> resolveCommitConflict(HoodieTable table,
ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) throws HoodieWriteConflictException;

/**
* Implementations of this method will determine how to resolve a marker/inflight conflict between multi writers.
* @return
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
Option<HoodieCommitMetadata> resolveMarkerConflict(WriteMarkers writeMarkers, String partitionPath, String dataFileName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction;


/**
* This strategy is used for direct marker writers, trying to do early conflict detection with Transaction.
* It will use fileSystem api like list and exist directly to check if there is any marker file conflict.
*/
public class DirectMarkerWithTransactionConflictResolutionStrategy extends SimpleDirectMarkerConflictResolutionStrategy
implements TransactionConflictResolutionStrategy {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

package org.apache.hudi.client.transaction;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieEarlyConflictException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -70,7 +74,12 @@ public Stream<HoodieInstant> getCandidateInstants(HoodieActiveTimeline activeTim
}

@Override
public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
public boolean hasMarkerConflict(WriteMarkers writeMarkers, HoodieWriteConfig config, FileSystem fs, String partitionPath, String dataFileName) {
return false;
}

@Override
public boolean hasCommitConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
// TODO : UUID's can clash even for insert/insert, handle that case.
Set<String> fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds();
Set<String> fileIdsSetForSecondInstant = otherOperation.getMutatedFileIds();
Expand All @@ -85,7 +94,7 @@ public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperatio
}

@Override
public Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
public Option<HoodieCommitMetadata> resolveCommitConflict(HoodieTable table,
ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
// A completed COMPACTION action eventually shows up as a COMMIT action on the timeline.
// We need to ensure we handle this during conflict resolution and not treat the commit from a
Expand All @@ -102,4 +111,9 @@ public Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
throw new HoodieWriteConflictException(new ConcurrentModificationException("Cannot resolve conflicts for overlapping writes"));
}

@Override
public Option<HoodieCommitMetadata> resolveMarkerConflict(WriteMarkers writeMarkers, String partitionPath, String dataFileName) {
throw new HoodieEarlyConflictException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieEarlyConflictException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ConcurrentModificationException;

/**
* This strategy is used for direct marker writers, trying to do early conflict detection.
* It will use fileSystem api like list and exist directly to check if there is any marker file conflict.
*/
public class SimpleDirectMarkerConflictResolutionStrategy extends SimpleConcurrentFileWritesConflictResolutionStrategy {
private static final Logger LOG = LogManager.getLogger(SimpleDirectMarkerConflictResolutionStrategy.class);

@Override
public boolean hasMarkerConflict(WriteMarkers writeMarkers, HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) {
try {
return writeMarkers.hasMarkerConflict(config, partitionPath, fileId);
} catch (IOException e) {
LOG.warn("Exception occurs during create marker file in eager conflict detection mode.");
throw new HoodieIOException("Exception occurs during create marker file in eager conflict detection mode.", e);
}
}

@Override
public Option<HoodieCommitMetadata> resolveMarkerConflict(WriteMarkers writeMarkers, String partitionPath, String dataFileName) {
throw new HoodieEarlyConflictException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction;

/**
* The conflict resolution strategy for early conflict detection.
* Any strategy implements this interface will check marker conflict,
* try to resolve conflict and create marker in a Transaction
*/
public interface TransactionConflictResolutionStrategy {
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
}

public TransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) {
this.lockManager = new LockManager(config, fs, partitionPath, fileId);
this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
}

public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
if (isOptimisticConcurrencyControlEnabled) {
Expand All @@ -57,6 +62,14 @@ public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
}
}

public void beginTransaction(String partitionPath, String fileId) {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction starting for " + partitionPath + "/" + fileId);
lockManager.lock();
LOG.info("Transaction started for " + partitionPath + "/" + fileId);
}
}

public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
Expand All @@ -67,6 +80,14 @@ public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
}
}

public void endTransaction(String filePath) {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction ending with transaction for " + filePath);
lockManager.unlock();
LOG.info("Transaction ended with transaction for " + filePath);
}
}

private synchronized boolean reset(Option<HoodieInstant> callerInstant,
Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieLockConfig;
Expand Down Expand Up @@ -57,6 +58,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
}

/**
* Try to have a lock at partitionPath + fileID level for different write handler.
* @param writeConfig
* @param fs
* @param partitionPath
* @param fileId
*/
public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String partitionPath, String fileId) {
this.writeConfig = writeConfig;
this.hadoopConf = new SerializableConfiguration(fs.getConf());
TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" + fileId);
this.lockConfiguration = new LockConfiguration(props);
maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
}

/**
* rebuild lock related configs, only support ZK related lock for now.
*/
private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, String key) {
TypedProperties props = new TypedProperties(writeConfig.getProps());
props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key);
return props;
}

public void lock() {
if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
LockProvider lockProvider = getLockProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
instantStream.forEach(instant -> {
try {
ConcurrentOperation otherOperation = new ConcurrentOperation(instant, table.getMetaClient());
if (resolutionStrategy.hasConflict(thisOperation, otherOperation)) {
if (resolutionStrategy.hasCommitConflict(thisOperation, otherOperation)) {
LOG.info("Conflict encountered between current instant = " + thisOperation + " and instant = "
+ otherOperation + ", attempting to resolve it...");
resolutionStrategy.resolveConflict(table, thisOperation, otherOperation);
resolutionStrategy.resolveCommitConflict(table, thisOperation, otherOperation);
}
} catch (IOException io) {
throw new HoodieWriteConflictException("Unable to resolve conflict, if present", io);
Expand Down
Loading