diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/AsyncTimelineMarkerConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/AsyncTimelineMarkerConflictResolutionStrategy.java new file mode 100644 index 0000000000000..3ef6000b1292f --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/AsyncTimelineMarkerConflictResolutionStrategy.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieEarlyConflictException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers; +import org.apache.hudi.table.marker.WriteMarkers; + +import java.io.IOException; +import java.util.ConcurrentModificationException; + +/** + * This strategy is used for timeline server based marker writers, trying to do early conflict detection. + * It will call timeline server /v1/hoodie/marker/check-marker-conflict API to check if there is any marker conflict. + */ +public class AsyncTimelineMarkerConflictResolutionStrategy extends SimpleConcurrentFileWritesConflictResolutionStrategy { + + @Override + public boolean hasMarkerConflict(WriteMarkers writeMarkers, HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) { + try { + assert writeMarkers instanceof TimelineServerBasedWriteMarkers; + return writeMarkers.hasMarkerConflict(config, partitionPath, fileId); + } catch (IOException e) { + throw new HoodieIOException("IOException occurs during checking markers conflict"); + } + } + + @Override + public Option resolveMarkerConflict(WriteMarkers writeMarkers, String partitionPath, String dataFileName) { + throw new HoodieEarlyConflictException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes")); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java index d1e988adb59ae..3d061f448072b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConflictResolutionStrategy.java @@ -18,14 +18,17 @@ package org.apache.hudi.client.transaction; +import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.WriteMarkers; import java.util.stream.Stream; @@ -43,13 +46,25 @@ public interface ConflictResolutionStrategy { Stream getCandidateInstants(HoodieActiveTimeline activeTimeline, HoodieInstant currentInstant, Option lastSuccessfulInstant); /** - * Implementations of this method will determine whether a conflict exists between 2 commits. + * Implementations of this method will determine whether a marker conflict exists between multi-writers. + * @param writeMarkers + * @param config + * @param fs + * @param partitionPath + * @param fileId + * @return + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + boolean hasMarkerConflict(WriteMarkers writeMarkers, HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId); + + /** + * Implementations of this method will determine whether a commit conflict exists between 2 commits. * @param thisOperation * @param otherOperation * @return */ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation); + boolean hasCommitConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation); /** * Implementations of this method will determine how to resolve a conflict between 2 commits. @@ -58,7 +73,13 @@ public interface ConflictResolutionStrategy { * @return */ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - Option resolveConflict(HoodieTable table, + Option resolveCommitConflict(HoodieTable table, ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) throws HoodieWriteConflictException; + /** + * Implementations of this method will determine how to resolve a marker/inflight conflict between multi writers. + * @return + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + Option resolveMarkerConflict(WriteMarkers writeMarkers, String partitionPath, String dataFileName); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerWithTransactionConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerWithTransactionConflictResolutionStrategy.java new file mode 100644 index 0000000000000..f52664f77b512 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerWithTransactionConflictResolutionStrategy.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction; + + +/** + * This strategy is used for direct marker writers, trying to do early conflict detection with Transaction. + * It will use fileSystem api like list and exist directly to check if there is any marker file conflict. + */ +public class DirectMarkerWithTransactionConflictResolutionStrategy extends SimpleDirectMarkerConflictResolutionStrategy + implements TransactionConflictResolutionStrategy { +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java index 938a40684a092..613b0f6334c65 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -18,6 +18,7 @@ package org.apache.hudi.client.transaction; +import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -25,8 +26,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieEarlyConflictException; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.WriteMarkers; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -70,7 +74,12 @@ public Stream getCandidateInstants(HoodieActiveTimeline activeTim } @Override - public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) { + public boolean hasMarkerConflict(WriteMarkers writeMarkers, HoodieWriteConfig config, FileSystem fs, String partitionPath, String dataFileName) { + return false; + } + + @Override + public boolean hasCommitConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) { // TODO : UUID's can clash even for insert/insert, handle that case. Set fileIdsSetForFirstInstant = thisOperation.getMutatedFileIds(); Set fileIdsSetForSecondInstant = otherOperation.getMutatedFileIds(); @@ -85,7 +94,7 @@ public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperatio } @Override - public Option resolveConflict(HoodieTable table, + public Option resolveCommitConflict(HoodieTable table, ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) { // A completed COMPACTION action eventually shows up as a COMMIT action on the timeline. // We need to ensure we handle this during conflict resolution and not treat the commit from a @@ -102,4 +111,9 @@ public Option resolveConflict(HoodieTable table, throw new HoodieWriteConflictException(new ConcurrentModificationException("Cannot resolve conflicts for overlapping writes")); } + @Override + public Option resolveMarkerConflict(WriteMarkers writeMarkers, String partitionPath, String dataFileName) { + throw new HoodieEarlyConflictException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes")); + } + } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleDirectMarkerConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleDirectMarkerConflictResolutionStrategy.java new file mode 100644 index 0000000000000..4f38e9c36b01f --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleDirectMarkerConflictResolutionStrategy.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieEarlyConflictException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.marker.WriteMarkers; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ConcurrentModificationException; + +/** + * This strategy is used for direct marker writers, trying to do early conflict detection. + * It will use fileSystem api like list and exist directly to check if there is any marker file conflict. + */ +public class SimpleDirectMarkerConflictResolutionStrategy extends SimpleConcurrentFileWritesConflictResolutionStrategy { + private static final Logger LOG = LogManager.getLogger(SimpleDirectMarkerConflictResolutionStrategy.class); + + @Override + public boolean hasMarkerConflict(WriteMarkers writeMarkers, HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) { + try { + return writeMarkers.hasMarkerConflict(config, partitionPath, fileId); + } catch (IOException e) { + LOG.warn("Exception occurs during create marker file in eager conflict detection mode."); + throw new HoodieIOException("Exception occurs during create marker file in eager conflict detection mode.", e); + } + } + + @Override + public Option resolveMarkerConflict(WriteMarkers writeMarkers, String partitionPath, String dataFileName) { + throw new HoodieEarlyConflictException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes")); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionConflictResolutionStrategy.java new file mode 100644 index 0000000000000..7a10bcc102e9d --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionConflictResolutionStrategy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction; + +/** + * The conflict resolution strategy for early conflict detection. + * Any strategy implements this interface will check marker conflict, + * try to resolve conflict and create marker in a Transaction + */ +public interface TransactionConflictResolutionStrategy { +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index aef1fee5e0794..d43d4cf419eca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -45,6 +45,11 @@ public TransactionManager(HoodieWriteConfig config, FileSystem fs) { this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); } + public TransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) { + this.lockManager = new LockManager(config, fs, partitionPath, fileId); + this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); + } + public void beginTransaction(Option newTxnOwnerInstant, Option lastCompletedTxnOwnerInstant) { if (isOptimisticConcurrencyControlEnabled) { @@ -57,6 +62,14 @@ public void beginTransaction(Option newTxnOwnerInstant, } } + public void beginTransaction(String partitionPath, String fileId) { + if (isOptimisticConcurrencyControlEnabled) { + LOG.info("Transaction starting for " + partitionPath + "/" + fileId); + lockManager.lock(); + LOG.info("Transaction started for " + partitionPath + "/" + fileId); + } + } + public void endTransaction(Option currentTxnOwnerInstant) { if (isOptimisticConcurrencyControlEnabled) { LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant); @@ -67,6 +80,14 @@ public void endTransaction(Option currentTxnOwnerInstant) { } } + public void endTransaction(String filePath) { + if (isOptimisticConcurrencyControlEnabled) { + LOG.info("Transaction ending with transaction for " + filePath); + lockManager.unlock(); + LOG.info("Transaction ended with transaction for " + filePath); + } + } + private synchronized boolean reset(Option callerInstant, Option newTxnOwnerInstant, Option lastCompletedTxnOwnerInstant) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index ca15c4fdc2a13..4a20d08bd49a7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.lock.LockProvider; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieLockConfig; @@ -57,6 +58,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) { Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())); } + /** + * Try to have a lock at partitionPath + fileID level for different write handler. + * @param writeConfig + * @param fs + * @param partitionPath + * @param fileId + */ + public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String partitionPath, String fileId) { + this.writeConfig = writeConfig; + this.hadoopConf = new SerializableConfiguration(fs.getConf()); + TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" + fileId); + this.lockConfiguration = new LockConfiguration(props); + maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, + Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())); + maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, + Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())); + } + + /** + * rebuild lock related configs, only support ZK related lock for now. + */ + private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, String key) { + TypedProperties props = new TypedProperties(writeConfig.getProps()); + props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key); + return props; + } + public void lock() { if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { LockProvider lockProvider = getLockProvider(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index ec15effdc4663..cc51afcf0bf0b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -79,10 +79,10 @@ public static Option resolveWriteConflictIfAny( instantStream.forEach(instant -> { try { ConcurrentOperation otherOperation = new ConcurrentOperation(instant, table.getMetaClient()); - if (resolutionStrategy.hasConflict(thisOperation, otherOperation)) { + if (resolutionStrategy.hasCommitConflict(thisOperation, otherOperation)) { LOG.info("Conflict encountered between current instant = " + thisOperation + " and instant = " + otherOperation + ", attempting to resolve it..."); - resolutionStrategy.resolveConflict(table, thisOperation, otherOperation); + resolutionStrategy.resolveCommitConflict(table, thisOperation, otherOperation); } } catch (IOException io) { throw new HoodieWriteConflictException("Unable to resolve conflict, if present", io); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index 9ea28fbbd42e7..ed13bc5447540 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -17,7 +17,6 @@ package org.apache.hudi.config; -import org.apache.hudi.client.transaction.ConflictResolutionStrategy; import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy; import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; import org.apache.hudi.common.config.ConfigClassProperty; @@ -182,6 +181,25 @@ public class HoodieLockConfig extends HoodieConfig { .withDocumentation("Lock provider class name, this should be subclass of " + "org.apache.hudi.client.transaction.ConflictResolutionStrategy"); + public static final ConfigProperty EARLY_CONFLICT_DETECTION_ENABLE = ConfigProperty + .key(LOCK_PREFIX + "early.conflict.detection.enable") + .defaultValue(false) + .sinceVersion("0.12.0") + .withDocumentation("Enable early conflict detection based on markers. It will try to detect writing conflict before create markers and fast fail" + + " which will release cluster resources as soon as possible."); + + public static final ConfigProperty MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = ConfigProperty + .key(LOCK_PREFIX + "early.conflict.async.checker.batch.interval") + .defaultValue(30000L) + .sinceVersion("0.12.0") + .withDocumentation("Used for timeline based marker AsyncTimelineMarkerConflictResolutionStrategy. The time to delay first async marker conflict checking."); + + public static final ConfigProperty MARKER_CONFLICT_CHECKER_PERIOD = ConfigProperty + .key(LOCK_PREFIX + "early.conflict.async.checker.period") + .defaultValue(30000L) + .sinceVersion("0.12.0") + .withDocumentation("Used for timeline based marker AsyncTimelineMarkerConflictResolutionStrategy. The period between each marker conflict checking."); + /** @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} and its methods instead */ @Deprecated public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP = WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key(); @@ -299,8 +317,23 @@ public HoodieLockConfig.Builder withLockWaitTimeInMillis(Long waitTimeInMillis) return this; } - public HoodieLockConfig.Builder withConflictResolutionStrategy(ConflictResolutionStrategy conflictResolutionStrategy) { - lockConfig.setValue(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME, conflictResolutionStrategy.getClass().getName()); + public HoodieLockConfig.Builder withConflictResolutionStrategy(String conflictResolutionStrategy) { + lockConfig.setValue(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME, conflictResolutionStrategy); + return this; + } + + public HoodieLockConfig.Builder withEarlyConflictDetectionEnable(boolean enable) { + lockConfig.setValue(EARLY_CONFLICT_DETECTION_ENABLE, String.valueOf(enable)); + return this; + } + + public HoodieLockConfig.Builder withMarkerConflictCheckerBatchInterval(long interval) { + lockConfig.setValue(MARKER_CONFLICT_CHECKER_BATCH_INTERVAL, String.valueOf(interval)); + return this; + } + + public HoodieLockConfig.Builder withMarkerConflictCheckerPeriod(long period) { + lockConfig.setValue(MARKER_CONFLICT_CHECKER_PERIOD, String.valueOf(period)); return this; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 4d07097c07c88..6939f35e07f23 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2027,6 +2027,14 @@ public ConflictResolutionStrategy getWriteConflictResolutionStrategy() { return ReflectionUtils.loadClass(getString(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME)); } + public String getMarkerConflictCheckerBatchInterval() { + return String.valueOf(getLong(HoodieLockConfig.MARKER_CONFLICT_CHECKER_BATCH_INTERVAL)); + } + + public String getMarkerConflictCheckerPeriod() { + return String.valueOf(getLong(HoodieLockConfig.MARKER_CONFLICT_CHECKER_PERIOD)); + } + public Long getLockAcquireWaitTimeoutInMs() { return getLong(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS); } @@ -2035,6 +2043,10 @@ public WriteConcurrencyMode getWriteConcurrencyMode() { return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE)); } + public boolean isEarlyConflictDetectionEnable() { + return getBoolean(HoodieLockConfig.EARLY_CONFLICT_DETECTION_ENABLE); + } + /** * Are any table services configured to run inline for both scheduling and execution? * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 426e20f83b034..9cf032e3676b5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -180,7 +180,19 @@ private void init(HoodieRecord record) { // base file to denote some log appends happened on a slice. writeToken will still fence concurrent // writers. // https://issues.apache.org/jira/browse/HUDI-1517 - createMarkerFile(partitionPath, FSUtils.makeBaseFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension())); + Option finalFileSlice = fileSlice; + createMarkerFile(partitionPath, FSUtils.makeBaseFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()), (table) -> { + table.getMetaClient().reloadActiveTimeline(); + table.getHoodieView().sync(); + SliceView currentSliceView = table.getSliceView(); + Option currentFileSlice = currentSliceView.getLatestFileSlice(partitionPath, fileId); + if (currentFileSlice.isPresent()) { + return !currentFileSlice.get().equals(finalFileSlice.get()); + } else { + FileSlice current = new FileSlice(partitionPath, baseInstantTime, this.fileId); + return !current.equals(finalFileSlice.get()); + } + }); this.writer = createLogWriter(fileSlice, baseInstantTime); } catch (Exception e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 82c6de576149f..1be68d529a976 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -198,7 +198,16 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo // Create Marker file, // uses name of `newFilePath` instead of `newFileName` // in case the sub-class may roll over the file handle name. - createMarkerFile(partitionPath, newFilePath.getName()); + createMarkerFile(partitionPath, newFilePath.getName(), (table) -> { + table.getMetaClient().reloadActiveTimeline(); + table.getHoodieView().sync(); + HoodieBaseFile currentBaseFileToMerge = table.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get(); + if (!currentBaseFileToMerge.equals(baseFileToMerge)) { + LOG.warn("Base file to merge is changed because of multi-writer"); + return true; + } + return false; + }); // Create the writer for writing the new version file fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index b7fdbecfd56d1..a30cc03cffddf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -20,6 +20,9 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.transaction.ConflictResolutionStrategy; +import org.apache.hudi.client.transaction.TransactionConflictResolutionStrategy; +import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; @@ -33,6 +36,7 @@ import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.avro.Schema; @@ -47,6 +51,7 @@ import java.util.Collections; import java.util.List; import java.util.HashMap; +import java.util.function.Function; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; @@ -177,14 +182,62 @@ protected Path makeNewFilePath(String partitionPath, String fileName) { return new Path(config.getBasePath(), relativePath); } + protected void createMarkerFile(String partitionPath, String dataFileName) { + createMarkerFile(partitionPath, dataFileName, (table) -> false); + } + /** * Creates an empty marker file corresponding to storage writer path. * * @param partitionPath Partition path */ - protected void createMarkerFile(String partitionPath, String dataFileName) { - WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime) - .create(partitionPath, dataFileName, getIOType()); + protected void createMarkerFile(String partitionPath, String dataFileName, + Function conflictChecker) { + WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime); + + // do early conflict detection before create markers. + if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() + && config.isEarlyConflictDetectionEnable()) { + + ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy(); + if (resolutionStrategy instanceof TransactionConflictResolutionStrategy) { + createMarkerFileWithTransaction(resolutionStrategy, writeMarkers, partitionPath, dataFileName, conflictChecker); + } else { + createMarkerFileWithEarlyConflictDetection(resolutionStrategy, writeMarkers, partitionPath, dataFileName, conflictChecker); + } + + } else { + // create marker directly + writeMarkers.create(partitionPath, dataFileName, getIOType()); + } + } + + private Option createMarkerFileWithEarlyConflictDetection(ConflictResolutionStrategy resolutionStrategy,WriteMarkers writeMarkers, + String partitionPath, String dataFileName, Function conflictChecker) { + if (resolutionStrategy.hasMarkerConflict(writeMarkers, config, fs, partitionPath, fileId) + || conflictChecker.apply(hoodieTable)) { + resolutionStrategy.resolveMarkerConflict(writeMarkers, partitionPath, fileId); + } + return writeMarkers.create(partitionPath, dataFileName, getIOType()); + } + + private Option createMarkerFileWithTransaction(ConflictResolutionStrategy resolutionStrategy, + WriteMarkers writeMarkers, String partitionPath, + String dataFileName, Function conflictChecker) { + TransactionManager txnManager = new TransactionManager(config, fs, partitionPath, fileId); + try { + // Need to do transaction before create marker file when using early conflict detection + txnManager.beginTransaction(partitionPath, fileId); + return createMarkerFileWithEarlyConflictDetection(resolutionStrategy, writeMarkers, partitionPath, dataFileName, conflictChecker); + + } catch (Exception e) { + LOG.warn("Exception occurs during create marker file in early conflict detection mode."); + throw e; + } finally { + // End transaction after created marker file. + txnManager.endTransaction(partitionPath + "/" + fileId); + txnManager.close(); + } } public Schema getWriterSchemaWithMetaFields() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java index e813382079634..cf0ba6a68ec20 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java @@ -26,6 +26,8 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; @@ -40,9 +42,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Stream; /** * Marker operations of directly accessing the file system to create and delete @@ -140,6 +144,56 @@ public Set allMarkerFilePaths() throws IOException { return markerFiles; } + /** + * We need to do list operation here. + * In order to reduce the list pressure as much as possible, first we build path prefix in advance: '$base_path/.temp/instant_time/partition_path', + * and only list these specific partition_paths we need instead of list all the '$base_path/.temp/' + * @param config + * @param partitionPath + * @param fileId + * @return true if current fileID is already existed under .temp/instant_time/partition_path/.. + * @throws IOException + */ + @Override + public boolean hasMarkerConflict(HoodieWriteConfig config, String partitionPath, String fileId) throws IOException { + String tempFolderPath = getTempFolderPath(); + long res = Arrays.stream(fs.listStatus(new Path(tempFolderPath))) + .parallel() + .map(FileStatus::getPath) + .filter(markerPath -> { + return !markerPath.getName().equalsIgnoreCase(instantTime); + }) + .flatMap(currentMarkerDirPath -> { + try { + Path markerPartitionPath; + if (StringUtils.isNullOrEmpty(partitionPath)) { + markerPartitionPath = currentMarkerDirPath; + } else { + markerPartitionPath = new Path(currentMarkerDirPath, partitionPath); + } + + if (!StringUtils.isNullOrEmpty(partitionPath) && !fs.exists(markerPartitionPath)) { + return Stream.empty(); + } else { + return Arrays.stream(fs.listStatus(markerPartitionPath)).parallel() + .filter((path) -> path.toString().contains(fileId)); + } + } catch (IOException e) { + throw new HoodieIOException("IOException occurs during checking marker file conflict"); + } + }).count(); + + if (res != 0L) { + LOG.warn("Detected conflict marker files: " + partitionPath + "/" + fileId + " for " + instantTime); + return true; + } + return false; + } + + private String getTempFolderPath() { + return basePath + Path.SEPARATOR + HoodieTableMetaClient.TEMPFOLDER_NAME; + } + /** * Creates a marker file based on the full marker name excluding the base path and instant. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java index 4879e0bc60c94..b89593dd1a399 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRemoteException; import org.apache.hudi.table.HoodieTable; @@ -44,10 +45,15 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.table.marker.MarkerOperation.ALL_MARKERS_URL; +import static org.apache.hudi.common.table.marker.MarkerOperation.CHECK_MARKER_CONFLICT_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.CREATE_AND_MERGE_MARKERS_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.CREATE_MARKER_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.DELETE_MARKER_DIR_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKERS_DIR_EXISTS_URL; +import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_BASEPATH_PARAM; +import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_CONFLICT_CHECKER_BATCH_INTERVAL; +import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_CONFLICT_CHECKER_HEART_BEAT_INTERVAL; +import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_CONFLICT_CHECKER_PERIOD; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_DIR_PATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PARAM; @@ -157,6 +163,22 @@ protected Option create(String partitionPath, String dataFileName, IOType } } + @Override + public boolean hasMarkerConflict(HoodieWriteConfig config, String partitionPath, String fileId) throws IOException { + Map paramsMap = new HashMap<>(); + paramsMap.put(MARKER_CONFLICT_CHECKER_BATCH_INTERVAL, config.getMarkerConflictCheckerBatchInterval()); + paramsMap.put(MARKER_CONFLICT_CHECKER_PERIOD, config.getMarkerConflictCheckerPeriod()); + paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); + paramsMap.put(MARKER_BASEPATH_PARAM, basePath); + paramsMap.put(MARKER_CONFLICT_CHECKER_HEART_BEAT_INTERVAL, String.valueOf(config.getHoodieClientHeartbeatIntervalInMs())); + try { + return executeRequestToTimelineServer( + CHECK_MARKER_CONFLICT_URL, paramsMap, new TypeReference() {}, RequestMethod.GET); + } catch (IOException e) { + throw new HoodieRemoteException("Failed to check marker file conflict " + fileId, e); + } + } + private T executeRequestToTimelineServer(String requestPath, Map queryParameters, TypeReference reference, RequestMethod method) throws IOException { URIBuilder builder = diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java index 07428dd936469..266558b796a13 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hadoop.fs.Path; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -165,4 +166,14 @@ protected Path getMarkerPath(String partitionPath, String dataFileName, IOType t * @return the marker path or empty option if already exists and {@code checkIfExists} is true */ abstract Option create(String partitionPath, String dataFileName, IOType type, boolean checkIfExists); + + /** + * check if there is any marker conflict during early conflict detection. + * @param config + * @param partitionPath + * @param fileId + * @return + * @throws IOException + */ + public abstract boolean hasMarkerConflict(HoodieWriteConfig config, String partitionPath, String fileId) throws IOException; } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java index e7cc296ff6ae4..cf96543340dae 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java @@ -114,9 +114,9 @@ public void testConcurrentWritesWithInterleavingSuccesssfulCommit() throws Excep Assertions.assertTrue(candidateInstants.size() == 1); ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); - Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + Assertions.assertTrue(strategy.hasCommitConflict(thisCommitOperation, thatCommitOperation)); try { - strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + strategy.resolveCommitConflict(null, thisCommitOperation, thatCommitOperation); Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict"); } catch (HoodieWriteConflictException e) { // expected @@ -146,9 +146,9 @@ public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exc Assertions.assertTrue(candidateInstants.size() == 1); ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); - Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + Assertions.assertTrue(strategy.hasCommitConflict(thisCommitOperation, thatCommitOperation)); try { - strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + strategy.resolveCommitConflict(null, thisCommitOperation, thatCommitOperation); Assertions.fail("Cannot reach here, should have thrown a conflict"); } catch (HoodieWriteConflictException e) { // expected @@ -178,9 +178,9 @@ public void testConcurrentWritesWithInterleavingSuccessfulCompaction() throws Ex Assertions.assertTrue(candidateInstants.size() == 1); ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); - Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + Assertions.assertTrue(strategy.hasCommitConflict(thisCommitOperation, thatCommitOperation)); try { - strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + strategy.resolveCommitConflict(null, thisCommitOperation, thatCommitOperation); Assertions.fail("Cannot reach here, should have thrown a conflict"); } catch (HoodieWriteConflictException e) { // expected @@ -233,9 +233,9 @@ public void testConcurrentWritesWithInterleavingScheduledCluster() throws Except Assertions.assertTrue(candidateInstants.size() == 1); ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); - Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + Assertions.assertTrue(strategy.hasCommitConflict(thisCommitOperation, thatCommitOperation)); try { - strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + strategy.resolveCommitConflict(null, thisCommitOperation, thatCommitOperation); Assertions.fail("Cannot reach here, should have thrown a conflict"); } catch (HoodieWriteConflictException e) { // expected @@ -265,9 +265,9 @@ public void testConcurrentWritesWithInterleavingSuccessfulCluster() throws Excep Assertions.assertTrue(candidateInstants.size() == 1); ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); - Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + Assertions.assertTrue(strategy.hasCommitConflict(thisCommitOperation, thatCommitOperation)); try { - strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + strategy.resolveCommitConflict(null, thisCommitOperation, thatCommitOperation); Assertions.fail("Cannot reach here, should have thrown a conflict"); } catch (HoodieWriteConflictException e) { // expected @@ -297,9 +297,9 @@ public void testConcurrentWritesWithInterleavingSuccessfulReplace() throws Excep Assertions.assertTrue(candidateInstants.size() == 1); ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient); ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata); - Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + Assertions.assertTrue(strategy.hasCommitConflict(thisCommitOperation, thatCommitOperation)); try { - strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + strategy.resolveCommitConflict(null, thisCommitOperation, thatCommitOperation); Assertions.fail("Cannot reach here, should have thrown a conflict"); } catch (HoodieWriteConflictException e) { // expected @@ -472,9 +472,9 @@ public void testConcurrentWritesWithPendingInstants() throws Exception { // check C3 has conflict with C1,C11,C12,C4 for (HoodieInstant instant : completedInstantsDuringCurrentWriteOperation) { ConcurrentOperation thatCommitOperation = new ConcurrentOperation(instant, metaClient); - Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation)); + Assertions.assertTrue(strategy.hasCommitConflict(thisCommitOperation, thatCommitOperation)); try { - strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation); + strategy.resolveCommitConflict(null, thisCommitOperation, thatCommitOperation); } catch (HoodieWriteConflictException e) { // expected } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index b514896aa1e3a..24a7673aa035b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -33,6 +33,7 @@ import java.util.Iterator; import java.util.List; +import java.util.function.Function; /** * A {@link HoodieAppendHandle} that supports APPEND write incrementally(mini-batches). @@ -74,6 +75,12 @@ protected void createMarkerFile(String partitionPath, String dataFileName) { writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType()); } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName, + Function conflictChecker) { + createMarkerFile(partitionPath, dataFileName); + } + @Override public boolean canWrite(HoodieRecord record) { return true; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 777e228c9510d..bce9e269a3234 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.List; +import java.util.function.Function; /** * A {@link HoodieCreateHandle} that supports CREATE write incrementally(mini-batches). @@ -107,6 +108,12 @@ protected void createMarkerFile(String partitionPath, String dataFileName) { writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType()); } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName, + Function conflictChecker) { + createMarkerFile(partitionPath, dataFileName); + } + @Override public Path makeNewPath(String partitionPath) { Path path = super.makeNewPath(partitionPath); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java index cf912f620a568..8d295e0412243 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.function.Function; /** * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers). @@ -109,6 +110,12 @@ protected void createMarkerFile(String partitionPath, String dataFileName) { writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType()); } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName, + Function conflictChecker) { + createMarkerFile(partitionPath, dataFileName); + } + @Override protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) { // old and new file name expects to be the same. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 1bff89713b7f2..7bf0d022cc402 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.function.Function; /** * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers). @@ -121,6 +122,12 @@ protected void createMarkerFile(String partitionPath, String dataFileName) { writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType()); } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName, + Function conflictChecker) { + createMarkerFile(partitionPath, dataFileName); + } + @Override protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) { // If the data file already exists, it means the write task write merge data bucket multiple times diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 268674e78d87a..9d1c550066d10 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import org.apache.hudi.client.transaction.AsyncTimelineMarkerConflictResolutionStrategy; +import org.apache.hudi.client.transaction.SimpleDirectMarkerConflictResolutionStrategy; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -31,24 +33,30 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieLockConfig; +import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -65,10 +73,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -92,6 +102,68 @@ public void clean() throws IOException { cleanupResources(); } + /** + * Test multi-writers with early conflict detect enable, including + * 1. MOR + Direct marker + * 2. COW + Direct marker + * 3. MOR + Timeline server based marker + * 4. COW + Timeline server based marker + * + * ---|---------|--------------------|--------------------------------------|-------------------------> time + * init 001 + * 002 start writing + * 003 start which has conflict with 002 + * and failed soon + * 002 commit successfully + * @param tableType + * @param markerType + * @throws Exception + */ + @ParameterizedTest + @MethodSource("configParams") + public void testHoodieClientBasicMultiWriterWithEarlyConflictDetection(String tableType, String markerType) throws Exception { + if (tableType.equalsIgnoreCase(HoodieTableType.MERGE_ON_READ.name())) { + setUpMORTestTable(); + } + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + + HoodieWriteConfig writeConfig = buildWriteConfigForEarlyConflictDetect(markerType, properties); + // Create the first commit + final String nextCommitTime1 = "001"; + createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", nextCommitTime1, 2000, true); + + final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig); + final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig); + + final String nextCommitTime2 = "002"; + + // start to write commit 002 + final JavaRDD writeStatusList2 = startCommitForUpdate(writeConfig, client2, nextCommitTime2, 1000); + final String nextCommitTime3 = "003"; + + // start to write commit 003 + // this commit 003 will failed quickly because early conflict detection before create marker. + assertThrows(SparkException.class, () -> { + final JavaRDD writeStatusList3 = startCommitForUpdate(writeConfig, client3, nextCommitTime3, 1000); + client3.commit(nextCommitTime3, writeStatusList3); + }, "Early conflict detected but cannot resolve conflicts for overlapping writes"); + + // start to commit 002 and success + assertDoesNotThrow(() -> { + client2.commit(nextCommitTime2, writeStatusList2); + }); + + List completedInstant = metaClient.reloadActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + + assertEquals(2, completedInstant.size()); + assertTrue(completedInstant.contains(nextCommitTime1)); + assertTrue(completedInstant.contains(nextCommitTime2)); + FileIOUtils.deleteDirectory(new File(basePath)); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception { @@ -114,7 +186,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E .build()).withAutoCommit(false).withProperties(properties).build(); // Create the first commit - createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true); + createCommitWithBulkInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true); final int threadCount = 2; final ExecutorService executors = Executors.newFixedThreadPool(2); @@ -290,7 +362,7 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t // Create the first commit with inserts HoodieWriteConfig cfg = writeConfigBuilder.build(); SparkRDDWriteClient client = getHoodieWriteClient(cfg); - createCommitWithInserts(cfg, client, "000", "001", 200, true); + createCommitWithBulkInserts(cfg, client, "000", "001", 200, true); validInstants.add("001"); // Create 2 commits with upserts createCommitWithUpserts(cfg, client, "001", "000", "002", 100); @@ -365,7 +437,7 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t final int numRecords = 100; latchCountDownAndWait(runCountDownLatch, 30000); assertDoesNotThrow(() -> { - createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords, true); + createCommitWithBulkInserts(cfg, client1, "003", newCommitTime, numRecords, true); validInstants.add("007"); }); }); @@ -426,7 +498,7 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) .build(); // Create the first commit - createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, true); + createCommitWithBulkInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, true); // Start another inflight commit String newCommitTime = "003"; int numRecords = 100; @@ -475,7 +547,7 @@ public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception HoodieWriteConfig cfg2 = writeConfigBuilder.build(); // Create the first commit - createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 5000, false); + createCommitWithBulkInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 5000, false); // Start another inflight commit String newCommitTime1 = "003"; String newCommitTime2 = "004"; @@ -559,7 +631,7 @@ public void testHoodieClientMultiWriterAutoCommitNonConflict() throws Exception HoodieWriteConfig cfg2 = writeConfigBuilder.build(); // Create the first commit - createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, false); + createCommitWithBulkInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200, false); // Start another inflight commit String newCommitTime1 = "003"; String newCommitTime2 = "004"; @@ -593,7 +665,7 @@ private void createCommitWithInsertsForPartition(HoodieWriteConfig cfg, SparkRDD assertTrue(client.commit(newCommitTime, result), "Commit should succeed"); } - private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, + private void createCommitWithBulkInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommitTime, String newCommitTime, int numRecords, boolean doCommit) throws Exception { // Finish first base commit @@ -604,6 +676,17 @@ private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient } } + private void createCommitWithInserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, + String prevCommitTime, String newCommitTime, int numRecords, + boolean doCommit) throws Exception { + // Finish first base commit + JavaRDD result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, SparkRDDWriteClient::insert, + false, false, numRecords); + if (doCommit) { + assertTrue(client.commit(newCommitTime, result), "Commit should succeed"); + } + } + private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient client, String prevCommit, String commitTimeBetweenPrevAndNew, String newCommitTime, int numRecords) throws Exception { @@ -641,4 +724,56 @@ private JavaRDD startCommitForUpdate(HoodieWriteConfig writeConfig, assertNoWriteErrors(statuses); return result; } + + public static Stream configParams() { + Object[][] data = + new Object[][] {{"COPY_ON_WRITE", MarkerType.TIMELINE_SERVER_BASED.name()}, {"MERGE_ON_READ", MarkerType.TIMELINE_SERVER_BASED.name()}, + {"MERGE_ON_READ", MarkerType.DIRECT.name()}, {"COPY_ON_WRITE", MarkerType.DIRECT.name()}}; + return Stream.of(data).map(Arguments::of); + } + + private HoodieWriteConfig buildWriteConfigForEarlyConflictDetect(String markerType, Properties properties) { + if (markerType.equalsIgnoreCase(MarkerType.DIRECT.name())) { + return getConfigBuilder() + .withHeartbeatIntervalInMs(3600 * 1000) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.MEMORY) + .withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoClean(false).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .withAutoArchive(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withMarkersType(MarkerType.DIRECT.name()) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) + .withEarlyConflictDetectionEnable(true) + .withConflictResolutionStrategy(SimpleDirectMarkerConflictResolutionStrategy.class.getName()) + .withMarkerConflictCheckerBatchInterval(0) + .withMarkerConflictCheckerPeriod(100) + .build()) + .withAutoCommit(false).withProperties(properties).build(); + } else { + return getConfigBuilder() + .withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(20 * 1024).build()) + .withHeartbeatIntervalInMs(3600 * 1000) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.MEMORY) + .withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoClean(false).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .withAutoArchive(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withMarkersType(MarkerType.TIMELINE_SERVER_BASED.name()) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) + .withEarlyConflictDetectionEnable(true) + .withConflictResolutionStrategy(AsyncTimelineMarkerConflictResolutionStrategy.class.getName()) + .withMarkerConflictCheckerBatchInterval(0) + .withMarkerConflictCheckerPeriod(100) + .build()) + .withAutoCommit(false).withProperties(properties).build(); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java index 94da60c39c1ab..74d6a1d1273f5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java @@ -29,9 +29,14 @@ public class MarkerOperation implements Serializable { public static final String MARKER_DIR_PATH_PARAM = "markerdirpath"; public static final String MARKER_NAME_PARAM = "markername"; + public static final String MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = "batchinterval"; + public static final String MARKER_CONFLICT_CHECKER_PERIOD = "period"; + public static final String MARKER_CONFLICT_CHECKER_HEART_BEAT_INTERVAL = "heartbeatinterval"; + public static final String MARKER_BASEPATH_PARAM = "basepath"; // GET requests public static final String ALL_MARKERS_URL = String.format("%s/%s", BASE_URL, "all"); + public static final String CHECK_MARKER_CONFLICT_URL = String.format("%s/%s", BASE_URL, "check-marker-conflict"); public static final String CREATE_AND_MERGE_MARKERS_URL = String.format("%s/%s", BASE_URL, "create-and-merge"); public static final String MARKERS_DIR_EXISTS_URL = String.format("%s/%s", BASE_URL, "dir/exists"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java index 6a9e2e1b35f50..c9c0f1dff5108 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java @@ -90,6 +90,7 @@ public static String readAsUTFString(InputStream input, int length) throws IOExc public static List readAsUTFStringLines(InputStream input) { List lines = new ArrayList<>(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); + lines = bufferedReader.lines().collect(Collectors.toList()); closeQuietly(bufferedReader); return lines; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java index 555a036b9f834..2dc61d5b14a77 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java @@ -39,11 +39,15 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.apache.hudi.common.util.FileIOUtils.closeQuietly; @@ -199,6 +203,10 @@ public static Map> readTimelineServerBasedMarkersFromFileSys * @return markers in a {@code Set} of String. */ public static Set readMarkersFromFile(Path markersFilePath, SerializableConfiguration conf) { + return readMarkersFromFile(markersFilePath, conf, false); + } + + public static Set readMarkersFromFile(Path markersFilePath, SerializableConfiguration conf, boolean ignoreException) { FSDataInputStream fsDataInputStream = null; Set markers = new HashSet<>(); try { @@ -207,10 +215,56 @@ public static Set readMarkersFromFile(Path markersFilePath, Serializable fsDataInputStream = fs.open(markersFilePath); markers = new HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream)); } catch (IOException e) { - throw new HoodieIOException("Failed to read MARKERS file " + markersFilePath, e); + if (ignoreException) { + LOG.warn("IOException occurs during read MARKERS file, ", e); + } else { + throw new HoodieIOException("Failed to read MARKERS file " + markersFilePath, e); + } } finally { closeQuietly(fsDataInputStream); } return markers; } + + /** + * Reads files containing the markers written by timeline-server-based marker mechanism locally instead of using cluster Context. + * + * @param markerDir marker directory. + * @param fileSystem file system to use. + * @return A {@code Map} of file name to the set of markers stored in the file. + */ + public static Set readTimelineServerBasedMarkersFromFileSystemLocally(String markerDir, FileSystem fileSystem) { + Path dirPath = new Path(markerDir); + try { + if (fileSystem.exists(dirPath)) { + Predicate prefixFilter = fileStatus -> + fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX); + Predicate markerTypeFilter = fileStatus -> + !fileStatus.getPath().getName().equals(MARKER_TYPE_FILENAME); + + CopyOnWriteArraySet result = new CopyOnWriteArraySet<>(); + FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); + List subPaths = Arrays.stream(fileStatuses) + .filter(prefixFilter.and(markerTypeFilter)) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + + if (subPaths.size() > 0) { + SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf()); + subPaths.stream().parallel().forEach(subPath -> { + result.addAll(readMarkersFromFile(new Path(subPath), conf, true)); + }); + } + return result; + } + return new HashSet<>(); + } catch (Exception ioe) { + LOG.warn("IOException occurs during read TimelineServer based markers from fileSystem", ioe); + return new HashSet<>(); + } + } + + public static List getAllMarkerDir(Path tempPath, FileSystem fs) throws IOException { + return Arrays.stream(fs.listStatus(tempPath)).map(FileStatus::getPath).collect(Collectors.toList()); + } } \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieEarlyConflictException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieEarlyConflictException.java new file mode 100644 index 0000000000000..5b00c878812e1 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieEarlyConflictException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +/** + *

+ * Exception thrown for Hoodie failures. The root of the exception hierarchy. + *

+ *

+ * Hoodie Write clients will throw this exception before create corresponding marker and before do actual writing work. This is a runtime (unchecked) + * exception. + *

+ */ +public class HoodieEarlyConflictException extends HoodieException { + + public HoodieEarlyConflictException(String msg) { + super(msg); + } + + public HoodieEarlyConflictException(Throwable e) { + super(e); + } + + public HoodieEarlyConflictException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 08dadae74d252..2d15ad07db6bf 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -463,6 +464,17 @@ private void registerMarkerAPI() { writeValueAsString(ctx, exist); }, false)); + app.get(MarkerOperation.CHECK_MARKER_CONFLICT_URL, new ViewHandler(ctx -> { + metricsRegistry.add("CHECK_MARKER_CONFLICT", 1); + boolean hasConflict = markerHandler.checkMarkerConflict( + Long.parseLong(Objects.requireNonNull(ctx.queryParam(MarkerOperation.MARKER_CONFLICT_CHECKER_BATCH_INTERVAL, "30000"))), + Long.parseLong(Objects.requireNonNull(ctx.queryParam(MarkerOperation.MARKER_CONFLICT_CHECKER_PERIOD, "30000"))), + ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, ""), + ctx.queryParam(MarkerOperation.MARKER_BASEPATH_PARAM, ""), + Long.parseLong(Objects.requireNonNull(ctx.queryParam(MarkerOperation.MARKER_CONFLICT_CHECKER_HEART_BEAT_INTERVAL, "60000")))); + writeValueAsString(ctx, hasConflict); + }, false)); + app.post(MarkerOperation.CREATE_MARKER_URL, new ViewHandler(ctx -> { metricsRegistry.add("CREATE_MARKER", 1); ctx.result(markerHandler.createMarker( diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java index e793c20432f92..c388f061b731e 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.timeline.service.TimelineService; +import org.apache.hudi.timeline.service.handlers.marker.MarkerCheckerRunnable; import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable; import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture; import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState; @@ -38,11 +39,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -81,6 +84,8 @@ public class MarkerHandler extends Handler { private transient HoodieEngineContext hoodieEngineContext; private ScheduledFuture dispatchingThreadFuture; private boolean firstCreationRequestSeen; + private AtomicBoolean hasConflict = new AtomicBoolean(false); + private final ConcurrentHashMap checkers; public MarkerHandler(Configuration conf, TimelineService.Config timelineServiceConfig, HoodieEngineContext hoodieEngineContext, FileSystem fileSystem, @@ -97,6 +102,7 @@ public MarkerHandler(Configuration conf, TimelineService.Config timelineServiceC this.markerCreationDispatchingRunnable = new MarkerCreationDispatchingRunnable(markerDirStateMap, batchingExecutorService); this.firstCreationRequestSeen = false; + this.checkers = new ConcurrentHashMap<>(); } /** @@ -108,6 +114,9 @@ public void stop() { } dispatchingExecutorService.shutdown(); batchingExecutorService.shutdown(); + synchronized (checkers) { + checkers.values().forEach(ExecutorService::shutdown); + } } /** @@ -138,6 +147,30 @@ public boolean doesMarkerDirExist(String markerDir) { return markerDirState.exists(); } + /** + * Check if there is any marker conflict for current markerDir + * For new markerDir init and create MarkerCheckerRunnable which will trigger at fixed rate + * @param batchInterval + * @param period + * @param markerDir + * @param basePath + * @param maxAllowableHeartbeatIntervalInMs + * @return + */ + public boolean checkMarkerConflict(long batchInterval, long period, String markerDir, String basePath, long maxAllowableHeartbeatIntervalInMs) { + synchronized (checkers) { + if (checkers.containsKey(markerDir)) { + return hasConflict.get(); + } else { + ScheduledExecutorService markerChecker = Executors.newSingleThreadScheduledExecutor(); + markerChecker.scheduleAtFixedRate(new MarkerCheckerRunnable(hasConflict, this, markerDir, basePath, + hoodieEngineContext, parallelism, fileSystem, maxAllowableHeartbeatIntervalInMs), batchInterval, period, TimeUnit.MILLISECONDS); + checkers.put(markerDir, markerChecker); + return false; + } + } + } + /** * Generates a future for an async marker creation request * diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java new file mode 100644 index 0000000000000..6f2b40bfdd029 --- /dev/null +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline.service.handlers.marker; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.MarkerUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.timeline.service.handlers.MarkerHandler; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +public class MarkerCheckerRunnable implements Runnable { + private static final Logger LOG = LogManager.getLogger(MarkerCheckerRunnable.class); + + private MarkerHandler markerHandler; + private String markerDir; + private String basePath; + private HoodieEngineContext hoodieEngineContext; + private int parallelism; + private FileSystem fs; + private AtomicBoolean hasConflict; + private long maxAllowableHeartbeatIntervalInMs; + + public MarkerCheckerRunnable(AtomicBoolean hasConflict, MarkerHandler markerHandler, String markerDir, String basePath, + HoodieEngineContext hoodieEngineContext, int parallelism, FileSystem fileSystem, long maxAllowableHeartbeatIntervalInMs) { + this.markerHandler = markerHandler; + this.markerDir = markerDir; + this.basePath = basePath; + this.hoodieEngineContext = hoodieEngineContext; + this.parallelism = parallelism; + this.fs = fileSystem; + this.hasConflict = hasConflict; + this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs; + } + + @Override + public void run() { + try { + if (!fs.exists(new Path(markerDir))) { + return; + } + + HoodieTimer timer = new HoodieTimer().startTimer(); + Set currentInstantAllMarkers = markerHandler.getAllMarkers(markerDir); + Path tempPath = new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.TEMPFOLDER_NAME); + + List instants = MarkerUtils.getAllMarkerDir(tempPath, fs); + List candidate = getCandidateInstants(instants, markerDirToInstantTime(markerDir)); + Set tableMarkers = candidate.stream().flatMap(instant -> { + return MarkerUtils.readTimelineServerBasedMarkersFromFileSystemLocally(instant, fs).stream(); + }).collect(Collectors.toSet()); + + Set currentFileIDs = currentInstantAllMarkers.stream().map(this::makerToPartitionAndFileID).collect(Collectors.toSet()); + Set tableFilesIDs = tableMarkers.stream().map(this::makerToPartitionAndFileID).collect(Collectors.toSet()); + + currentFileIDs.retainAll(tableFilesIDs); + + if (!currentFileIDs.isEmpty()) { + LOG.warn("Conflict writing detected based on markers!\n" + + "Conflict markers: " + currentInstantAllMarkers + "\n" + + "Table markers: " + tableMarkers); + hasConflict.compareAndSet(false, true); + } + LOG.info("Finish batch marker checker in " + timer.endTimer() + " ms"); + + } catch (IOException e) { + throw new HoodieIOException("IOException occurs during checking marker conflict"); + } + } + + /** + * Get Candidate Instant to do conflict checking: + * 1. Skip current writer related instant(currentInstantTime) + * 2. Skip all instants after currentInstantTime + * 3. Skip dead writers related instants based on heart-beat + * @param instants + * @return + */ + private List getCandidateInstants(List instants, String currentInstantTime) { + return instants.stream().map(Path::toString).filter(instantPath -> { + String instantTime = markerDirToInstantTime(instantPath); + return instantTime.compareToIgnoreCase(currentInstantTime) < 0; + }).filter(instantPath -> { + try { + return !isHeartbeatExpired(markerDirToInstantTime(instantPath)); + } catch (IOException e) { + return false; + } + }).collect(Collectors.toList()); + } + + /** + * Get fileID from full marker path, for example: + * 20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0_85-15-1390_20220620181735781.parquet.marker.MERGE + * ==> get 20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0 + * @param marker + * @return + */ + private String makerToPartitionAndFileID(String marker) { + String[] ele = marker.split("_"); + return ele[0]; + } + + /** + * Get instantTime from full marker path, for example: + * /var/folders/t3/th1dw75d0yz2x2k2qt6ys9zh0000gp/T/junit6502909693741900820/dataset/.hoodie/.temp/003 + * ==> 003 + * @param marker + * @return + */ + private static String markerDirToInstantTime(String marker) { + String[] ele = marker.split("/"); + return ele[ele.length - 1]; + } + + /** + * Use modification time as last heart beat time + * @param fs + * @param basePath + * @param instantTime + * @return + * @throws IOException + */ + public Long getLastHeartbeatTime(FileSystem fs, String basePath, String instantTime) throws IOException { + Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + instantTime); + if (fs.exists(heartbeatFilePath)) { + return fs.getFileStatus(heartbeatFilePath).getModificationTime(); + } else { + // NOTE : This can happen when a writer is upgraded to use lazy cleaning and the last write had failed + return 0L; + } + } + + public boolean isHeartbeatExpired(String instantTime) throws IOException { + Long currentTime = System.currentTimeMillis(); + Long lastHeartbeatTime = getLastHeartbeatTime(fs, basePath, instantTime); + if (currentTime - lastHeartbeatTime > this.maxAllowableHeartbeatIntervalInMs) { + LOG.warn("Heartbeat expired, for instant: " + instantTime); + return true; + } + return false; + } +}