diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 4d5375894d7e3..7051cb344ff98 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -89,6 +89,15 @@ public void startServer() throws IOException { .markerParallelism(writeConfig.getMarkersDeleteParallelism()); } + if (writeConfig.isEarlyConflictDetectionEnable()) { + timelineServiceConfBuilder.earlyConflictDetectionEnable(true) + .earlyConflictDetectionStrategy(writeConfig.getEarlyConflictDetectionStrategyClassName()) + .earlyConflictDetectionCheckCommitConflict(writeConfig.earlyConflictDetectionCheckCommitConflict()) + .asyncConflictDetectorBatchIntervalMs(writeConfig.getAsyncConflictDetectorBatchIntervalMs()) + .asyncConflictDetectorBatchPeriodMs(writeConfig.getAsyncConflictDetectorPeriodMs()) + .earlyConflictDetectionMaxAllowableHeartbeatIntervalInMs(writeConfig.getHoodieClientHeartbeatIntervalInMs()); + } + server = new TimelineService(context, hadoopConf.newCopy(), timelineServiceConfBuilder.build(), FSUtils.getFs(basePath, hadoopConf.newCopy()), viewManager); serverPort = server.startService(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java index a20469429030a..eb12ab634fdac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java @@ -18,18 +18,21 @@ package org.apache.hudi.client.heartbeat; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; +import static org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils.getLastHeartbeatTime; + /** * Helper class to delete heartbeat for completed or failed instants with expired heartbeats. */ @@ -89,7 +92,7 @@ public static void abortIfHeartbeatExpired(String instantTime, HoodieTable table try { if (config.getFailedWritesCleanPolicy().isLazy() && heartbeatClient.isHeartbeatExpired(instantTime)) { throw new HoodieException("Heartbeat for instant " + instantTime + " has expired, last heartbeat " - + HoodieHeartbeatClient.getLastHeartbeatTime(table.getMetaClient().getFs(), config.getBasePath(), instantTime)); + + getLastHeartbeatTime(table.getMetaClient().getFs(), config.getBasePath(), instantTime)); } } catch (IOException io) { throw new HoodieException("Unable to read heartbeat", io); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java index 341d72c754a95..50d83bb13980b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java @@ -18,17 +18,19 @@ package org.apache.hudi.client.heartbeat; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieHeartbeatException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import javax.annotation.concurrent.NotThreadSafe; + import java.io.IOException; import java.io.OutputStream; import java.io.Serializable; @@ -37,9 +39,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.Timer; import java.util.TimerTask; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils.getLastHeartbeatTime; /** * This class creates heartbeat for hudi client. This heartbeat is used to ascertain whether the running job is or not. @@ -205,16 +209,6 @@ public void stop() throws HoodieException { instantToHeartbeatMap.values().forEach(heartbeat -> stop(heartbeat.getInstantTime())); } - public static Long getLastHeartbeatTime(FileSystem fs, String basePath, String instantTime) throws IOException { - Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + instantTime); - if (fs.exists(heartbeatFilePath)) { - return fs.getFileStatus(heartbeatFilePath).getModificationTime(); - } else { - // NOTE : This can happen when a writer is upgraded to use lazy cleaning and the last write had failed - return 0L; - } - } - public static Boolean heartbeatExists(FileSystem fs, String basePath, String instantTime) throws IOException { Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + instantTime); if (fs.exists(heartbeatFilePath)) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java new file mode 100644 index 0000000000000..84fac2db004a8 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/DirectMarkerTransactionManager.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.transaction; + +import org.apache.hudi.client.transaction.lock.LockManager; +import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; + +import org.apache.hadoop.fs.FileSystem; + +import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; + +/** + * This class allows clients to start and end transactions for creating direct marker, used by + * `SimpleTransactionDirectMarkerBasedDetectionStrategy`, when early conflict + * detection is enabled. Anything done between a start and end transaction is guaranteed to be + * atomic. + */ +public class DirectMarkerTransactionManager extends TransactionManager { + private final String filePath; + + public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) { + super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), + config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()); + this.filePath = partitionPath + "/" + fileId; + } + + public void beginTransaction(String newTxnOwnerInstantTime) { + if (isOptimisticConcurrencyControlEnabled) { + LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and " + filePath); + lockManager.lock(); + + reset(currentTxnOwnerInstant, Option.of(getInstant(newTxnOwnerInstantTime)), Option.empty()); + LOG.info("Transaction started for " + newTxnOwnerInstantTime + " and " + filePath); + } + } + + public void endTransaction(String currentTxnOwnerInstantTime) { + if (isOptimisticConcurrencyControlEnabled) { + LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstantTime + + " for " + filePath); + if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)), Option.empty(), Option.empty())) { + lockManager.unlock(); + LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstantTime + + " for " + filePath); + } + } + } + + /** + * Rebuilds lock related configs. Only support ZK related lock for now. + * + * @param writeConfig Hudi write configs. + * @param partitionPath Relative partition path. + * @param fileId File ID. + * @return Updated lock related configs. + */ + private static TypedProperties createUpdatedLockProps( + HoodieWriteConfig writeConfig, String partitionPath, String fileId) { + if (!ZookeeperBasedLockProvider.class.getName().equals(writeConfig.getLockProviderClass())) { + throw new HoodieNotSupportedException("Only Support ZK-based lock for DirectMarkerTransactionManager now."); + } + TypedProperties props = new TypedProperties(writeConfig.getProps()); + props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, partitionPath + "/" + fileId); + return props; + } + + private HoodieInstant getInstant(String instantTime) { + return new HoodieInstant(HoodieInstant.State.INFLIGHT, EMPTY_STRING, instantTime); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index bcf8ef6ea5045..7fddf8a944b06 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -18,11 +18,12 @@ package org.apache.hudi.client.transaction; -import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.client.transaction.lock.LockManager; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -34,15 +35,20 @@ */ public class TransactionManager implements Serializable { - private static final Logger LOG = LogManager.getLogger(TransactionManager.class); - private final LockManager lockManager; - private final boolean isOptimisticConcurrencyControlEnabled; - private Option currentTxnOwnerInstant = Option.empty(); + protected static final Logger LOG = LogManager.getLogger(TransactionManager.class); + protected final LockManager lockManager; + protected final boolean isOptimisticConcurrencyControlEnabled; + protected Option currentTxnOwnerInstant = Option.empty(); private Option lastCompletedTxnOwnerInstant = Option.empty(); public TransactionManager(HoodieWriteConfig config, FileSystem fs) { - this.lockManager = new LockManager(config, fs); - this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl(); + this(new LockManager(config, fs), + config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()); + } + + protected TransactionManager(LockManager lockManager, boolean isOptimisticConcurrencyControlEnabled) { + this.lockManager = lockManager; + this.isOptimisticConcurrencyControlEnabled = isOptimisticConcurrencyControlEnabled; } public void beginTransaction(Option newTxnOwnerInstant, @@ -67,9 +73,9 @@ public void endTransaction(Option currentTxnOwnerInstant) { } } - private synchronized boolean reset(Option callerInstant, - Option newTxnOwnerInstant, - Option lastCompletedTxnOwnerInstant) { + protected synchronized boolean reset(Option callerInstant, + Option newTxnOwnerInstant, + Option lastCompletedTxnOwnerInstant) { if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant.get().equals(callerInstant.get())) { this.currentTxnOwnerInstant = newTxnOwnerInstant; this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index 2697700036de6..effcdf091ae2f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.lock.LockProvider; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieLockConfig; @@ -52,9 +53,13 @@ public class LockManager implements Serializable, AutoCloseable { private volatile LockProvider lockProvider; public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) { + this(writeConfig, fs, writeConfig.getProps()); + } + + public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, TypedProperties lockProps) { this.writeConfig = writeConfig; this.hadoopConf = new SerializableConfiguration(fs.getConf()); - this.lockConfiguration = new LockConfiguration(writeConfig.getProps()); + this.lockConfiguration = new LockConfiguration(lockProps); maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())); maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 7367826e50df4..d114cc3d77881 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -100,6 +100,7 @@ import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY; import static org.apache.hudi.common.util.queue.ExecutorType.DISRUPTOR; import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY; +import static org.apache.hudi.table.marker.ConflictDetectionUtils.getDefaultEarlyConflictDetectionStrategy; /** * Class storing configs for the HoodieWriteClient. @@ -118,6 +119,8 @@ public class HoodieWriteConfig extends HoodieConfig { // It is here so that both the client and deltastreamer use the same reference public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; + public static final String CONCURRENCY_PREFIX = "hoodie.write.concurrency."; + public static final ConfigProperty TBL_NAME = ConfigProperty .key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY) .noDefaultValue() @@ -550,6 +553,50 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. " + "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation"); + public static final ConfigProperty EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty + .key(CONCURRENCY_PREFIX + "early.conflict.detection.strategy") + .noDefaultValue() + .sinceVersion("0.13.0") + .withInferFunction(cfg -> { + MarkerType markerType = MarkerType.valueOf(cfg.getStringOrDefault(MARKERS_TYPE).toUpperCase()); + return Option.of(getDefaultEarlyConflictDetectionStrategy(markerType)); + }) + .withDocumentation("The class name of the early conflict detection strategy to use. " + + "This should be a subclass of " + + "`org.apache.hudi.common.conflict.detection.EarlyConflictDetectionStrategy`."); + + public static final ConfigProperty EARLY_CONFLICT_DETECTION_ENABLE = ConfigProperty + .key(CONCURRENCY_PREFIX + "early.conflict.detection.enable") + .defaultValue(false) + .sinceVersion("0.13.0") + .withDocumentation("Whether to enable early conflict detection based on markers. " + + "It eagerly detects writing conflict before create markers and fails fast if a " + + "conflict is detected, to release cluster compute resources as soon as possible."); + + public static final ConfigProperty ASYNC_CONFLICT_DETECTOR_BATCH_INTERVAL_MS = ConfigProperty + .key(CONCURRENCY_PREFIX + "early.conflict.async.detector.batch.interval_ms") + .defaultValue(30000L) + .sinceVersion("0.13.0") + .withDocumentation("Used for timeline-server-based markers with " + + "`AsyncTimelineServerBasedDetectionStrategy`. " + + "The time in milliseconds to delay first async marker conflict detection."); + + public static final ConfigProperty ASYNC_CONFLICT_DETECTOR_PERIOD_MS = ConfigProperty + .key(CONCURRENCY_PREFIX + "early.conflict.async.detector.period_ms") + .defaultValue(30000L) + .sinceVersion("0.13.0") + .withDocumentation("Used for timeline-server-based markers with " + + "`AsyncTimelineServerBasedDetectionStrategy`. " + + "The period in milliseconds between consecutive runs of async marker conflict detection."); + + public static final ConfigProperty EARLY_CONFLICT_DETECTION_CHECK_COMMIT_CONFLICT = ConfigProperty + .key(CONCURRENCY_PREFIX + "early.conflict.check.commit.conflict") + .defaultValue(false) + .sinceVersion("0.13.0") + .withDocumentation("Whether to enable commit conflict checking or not during early " + + "conflict detection."); + + private ConsistencyGuardConfig consistencyGuardConfig; private FileSystemRetryConfig fileSystemRetryConfig; @@ -2196,6 +2243,14 @@ public ConflictResolutionStrategy getWriteConflictResolutionStrategy() { return ReflectionUtils.loadClass(getString(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME)); } + public Long getAsyncConflictDetectorBatchIntervalMs() { + return getLong(ASYNC_CONFLICT_DETECTOR_BATCH_INTERVAL_MS); + } + + public Long getAsyncConflictDetectorPeriodMs() { + return getLong(ASYNC_CONFLICT_DETECTOR_PERIOD_MS); + } + public Long getLockAcquireWaitTimeoutInMs() { return getLong(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS); } @@ -2204,6 +2259,18 @@ public WriteConcurrencyMode getWriteConcurrencyMode() { return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE)); } + public boolean isEarlyConflictDetectionEnable() { + return getBoolean(EARLY_CONFLICT_DETECTION_ENABLE); + } + + public String getEarlyConflictDetectionStrategyClassName() { + return getString(EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME); + } + + public boolean earlyConflictDetectionCheckCommitConflict() { + return getBoolean(EARLY_CONFLICT_DETECTION_CHECK_COMMIT_CONFLICT); + } + // misc configs public Boolean doSkipDefaultPartitionValidation() { return getBoolean(SKIP_DEFAULT_PARTITION_VALIDATION); @@ -2738,6 +2805,31 @@ public Builder doSkipDefaultPartitionValidation(boolean skipDefaultPartitionVali return this; } + public Builder withEarlyConflictDetectionEnable(boolean enable) { + writeConfig.setValue(EARLY_CONFLICT_DETECTION_ENABLE, String.valueOf(enable)); + return this; + } + + public Builder withAsyncConflictDetectorBatchIntervalMs(long intervalMs) { + writeConfig.setValue(ASYNC_CONFLICT_DETECTOR_BATCH_INTERVAL_MS, String.valueOf(intervalMs)); + return this; + } + + public Builder withAsyncConflictDetectorPeriodMs(long periodMs) { + writeConfig.setValue(ASYNC_CONFLICT_DETECTOR_PERIOD_MS, String.valueOf(periodMs)); + return this; + } + + public Builder withEarlyConflictDetectionCheckCommitConflict(boolean enable) { + writeConfig.setValue(EARLY_CONFLICT_DETECTION_CHECK_COMMIT_CONFLICT, String.valueOf(enable)); + return this; + } + + public Builder withEarlyConflictDetectionStrategy(String className) { + writeConfig.setValue(EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME, className); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java index a8b7965d8007a..d108a3f5de8ba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieIOHandle.java @@ -39,5 +39,5 @@ public abstract class HoodieIOHandle { this.fs = getFileSystem(); } - protected abstract FileSystem getFileSystem(); + public abstract FileSystem getFileSystem(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java index ebe6361fdf55f..0d41dd49b1243 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java @@ -44,7 +44,7 @@ public HoodieReadHandle(HoodieWriteConfig config, HoodieTable hoodie } @Override - protected FileSystem getFileSystem() { + public FileSystem getFileSystem() { return hoodieTable.getMetaClient().getFs(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 151f2e6f99fc6..8e470471db91a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -18,10 +18,6 @@ package org.apache.hudi.io; -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; @@ -33,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; @@ -42,6 +39,11 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkersFactory; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -134,7 +136,7 @@ protected Path makeNewFilePath(String partitionPath, String fileName) { */ protected void createMarkerFile(String partitionPath, String dataFileName) { WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime) - .create(partitionPath, dataFileName, getIOType()); + .create(partitionPath, dataFileName, getIOType(), config, fileId, hoodieTable.getMetaClient().getActiveTimeline()); } public Schema getWriterSchemaWithMetaFields() { @@ -186,10 +188,22 @@ public String getPartitionPath() { public abstract IOType getIOType(); @Override - protected FileSystem getFileSystem() { + public FileSystem getFileSystem() { return hoodieTable.getMetaClient().getFs(); } + public HoodieWriteConfig getConfig() { + return this.config; + } + + public HoodieTableMetaClient getHoodieTableMetaClient() { + return hoodieTable.getMetaClient(); + } + + public String getFileId() { + return this.fileId; + } + protected int getPartitionId() { return taskContextSupplier.getPartitionIdSupplier().get(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/ConflictDetectionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/ConflictDetectionUtils.java new file mode 100644 index 0000000000000..26d8c5b54a242 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/ConflictDetectionUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.marker; + +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy; + +/** + * Utils for early conflict detection. + */ +public class ConflictDetectionUtils { + /** + * @param markerType Marker type. + * @return The class name of the default strategy for early conflict detection. + */ + public static String getDefaultEarlyConflictDetectionStrategy(MarkerType markerType) { + switch (markerType) { + case DIRECT: + return SimpleDirectMarkerBasedDetectionStrategy.class.getName(); + case TIMELINE_SERVER_BASED: + default: + return AsyncTimelineServerBasedDetectionStrategy.class.getName(); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java index 27136dd1d2d95..6c97e063323c8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java @@ -19,13 +19,18 @@ package org.apache.hudi.table.marker; import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.conflict.detection.DirectMarkerBasedDetectionStrategy; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; @@ -44,6 +49,8 @@ import java.util.List; import java.util.Set; +import static org.apache.hudi.table.marker.ConflictDetectionUtils.getDefaultEarlyConflictDetectionStrategy; + /** * Marker operations of directly accessing the file system to create and delete * marker files. Each data file has a corresponding marker file. @@ -155,6 +162,23 @@ protected Option create(String partitionPath, String dataFileName, IOType return create(getMarkerPath(partitionPath, dataFileName, type), checkIfExists); } + @Override + public Option createWithEarlyConflictDetection(String partitionPath, String dataFileName, IOType type, boolean checkIfExists, + HoodieWriteConfig config, String fileId, HoodieActiveTimeline activeTimeline) { + String strategyClassName = config.getEarlyConflictDetectionStrategyClassName(); + if (!ReflectionUtils.isSubClass(strategyClassName, DirectMarkerBasedDetectionStrategy.class)) { + LOG.warn("Cannot use " + strategyClassName + " for direct markers."); + strategyClassName = getDefaultEarlyConflictDetectionStrategy(MarkerType.DIRECT); + LOG.warn("Falling back to " + strategyClassName); + } + DirectMarkerBasedDetectionStrategy strategy = + (DirectMarkerBasedDetectionStrategy) ReflectionUtils.loadClass(strategyClassName, + fs, partitionPath, fileId, instantTime, activeTimeline, config); + + strategy.detectAndResolveConflictIfNecessary(); + return create(getMarkerPath(partitionPath, dataFileName, type), checkIfExists); + } + private Option create(Path markerPath, boolean checkIfExists) { HoodieTimer timer = HoodieTimer.start(); Path dirPath = markerPath.getParent(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleDirectMarkerBasedDetectionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleDirectMarkerBasedDetectionStrategy.java new file mode 100644 index 0000000000000..954755da35ae6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleDirectMarkerBasedDetectionStrategy.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.marker; + +import org.apache.hudi.common.conflict.detection.DirectMarkerBasedDetectionStrategy; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.MarkerUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieEarlyConflictDetectionException; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This strategy is used for direct marker writers, trying to do early conflict detection. + * It will use fileSystem api like list and exist directly to check if there is any marker file + * conflict, without any locking. + */ +public class SimpleDirectMarkerBasedDetectionStrategy extends DirectMarkerBasedDetectionStrategy { + + private static final Logger LOG = LogManager.getLogger(SimpleDirectMarkerBasedDetectionStrategy.class); + private final String basePath; + private final boolean checkCommitConflict; + private final Set completedCommitInstants; + private final long maxAllowableHeartbeatIntervalInMs; + + public SimpleDirectMarkerBasedDetectionStrategy(HoodieWrapperFileSystem fs, String partitionPath, String fileId, String instantTime, + HoodieActiveTimeline activeTimeline, HoodieWriteConfig config) { + super(fs, partitionPath, fileId, instantTime, activeTimeline, config); + this.basePath = config.getBasePath(); + this.checkCommitConflict = config.earlyConflictDetectionCheckCommitConflict(); + this.completedCommitInstants = new HashSet<>(activeTimeline.getCommitsTimeline().filterCompletedInstants().getInstants()); + this.maxAllowableHeartbeatIntervalInMs = config.getHoodieClientHeartbeatIntervalInMs() * config.getHoodieClientHeartbeatTolerableMisses(); + + } + + @Override + public boolean hasMarkerConflict() { + try { + return checkMarkerConflict(basePath, maxAllowableHeartbeatIntervalInMs) + || (checkCommitConflict && MarkerUtils.hasCommitConflict(activeTimeline, Stream.of(fileId).collect(Collectors.toSet()), completedCommitInstants)); + } catch (IOException e) { + LOG.warn("Exception occurs during create marker file in eager conflict detection mode."); + throw new HoodieIOException("Exception occurs during create marker file in eager conflict detection mode.", e); + } + } + + @Override + public void resolveMarkerConflict(String basePath, String partitionPath, String dataFileName) { + throw new HoodieEarlyConflictDetectionException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes")); + } + + @Override + public void detectAndResolveConflictIfNecessary() throws HoodieEarlyConflictDetectionException { + if (hasMarkerConflict()) { + resolveMarkerConflict(basePath, partitionPath, fileId); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java new file mode 100644 index 0000000000000..d10d74abb01b9 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedDetectionStrategy.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.marker; + +import org.apache.hudi.client.transaction.DirectMarkerTransactionManager; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieEarlyConflictDetectionException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * This strategy is used for direct marker writers, trying to do early conflict detection. + * It will use fileSystem api like list and exist directly to check if there is any marker file + * conflict, with transaction locks using {@link DirectMarkerTransactionManager}. + */ +public class SimpleTransactionDirectMarkerBasedDetectionStrategy + extends SimpleDirectMarkerBasedDetectionStrategy { + + private static final Logger LOG = LogManager.getLogger( + SimpleTransactionDirectMarkerBasedDetectionStrategy.class); + + public SimpleTransactionDirectMarkerBasedDetectionStrategy( + HoodieWrapperFileSystem fs, String partitionPath, String fileId, String instantTime, + HoodieActiveTimeline activeTimeline, HoodieWriteConfig config) { + super(fs, partitionPath, fileId, instantTime, activeTimeline, config); + } + + @Override + public void detectAndResolveConflictIfNecessary() throws HoodieEarlyConflictDetectionException { + DirectMarkerTransactionManager txnManager = + new DirectMarkerTransactionManager((HoodieWriteConfig) config, fs, partitionPath, fileId); + try { + // Need to do transaction before create marker file when using early conflict detection + txnManager.beginTransaction(instantTime); + super.detectAndResolveConflictIfNecessary(); + + } catch (Exception e) { + LOG.warn("Exception occurs during create marker file in early conflict detection mode within transaction."); + throw e; + } finally { + // End transaction after created marker file. + txnManager.endTransaction(instantTime); + txnManager.close(); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java index 2de9c9fdb8df5..76eb26b035522 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java @@ -21,9 +21,12 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieEarlyConflictDetectionException; import org.apache.hudi.exception.HoodieRemoteException; import org.apache.hudi.table.HoodieTable; @@ -38,6 +41,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -48,6 +52,7 @@ import static org.apache.hudi.common.table.marker.MarkerOperation.CREATE_MARKER_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.DELETE_MARKER_DIR_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKERS_DIR_EXISTS_URL; +import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_BASEPATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_DIR_PATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PARAM; @@ -132,14 +137,46 @@ protected Option create(String partitionPath, String dataFileName, IOType HoodieTimer timer = HoodieTimer.start(); String markerFileName = getMarkerFileName(dataFileName, type); - Map paramsMap = new HashMap<>(); - paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); - if (StringUtils.isNullOrEmpty(partitionPath)) { - paramsMap.put(MARKER_NAME_PARAM, markerFileName); + Map paramsMap = getConfigMap(partitionPath, markerFileName, false); + boolean success = executeCreateMarkerRequest(paramsMap, partitionPath, markerFileName); + LOG.info("[timeline-server-based] Created marker file " + partitionPath + "/" + markerFileName + + " in " + timer.endTimer() + " ms"); + if (success) { + return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, partitionPath), markerFileName)); } else { - paramsMap.put(MARKER_NAME_PARAM, partitionPath + "/" + markerFileName); + return Option.empty(); + } + } + + @Override + public Option createWithEarlyConflictDetection(String partitionPath, String dataFileName, IOType type, boolean checkIfExists, + HoodieWriteConfig config, String fileId, HoodieActiveTimeline activeTimeline) { + HoodieTimer timer = new HoodieTimer().startTimer(); + String markerFileName = getMarkerFileName(dataFileName, type); + Map paramsMap = getConfigMap(partitionPath, markerFileName, true); + + boolean success = executeCreateMarkerRequest(paramsMap, partitionPath, markerFileName); + + LOG.info("[timeline-server-based] Created marker file with early conflict detection " + partitionPath + "/" + markerFileName + + " in " + timer.endTimer() + " ms"); + + if (success) { + return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, partitionPath), markerFileName)); + } else { + // this failed may due to early conflict detection, so we need to throw out. + throw new HoodieEarlyConflictDetectionException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes")); } + } + /** + * Executes marker creation request with specific parameters. + * + * @param paramsMap Parameters to be included in the marker request. + * @param partitionPath Relative partition path. + * @param markerFileName Marker file name. + * @return {@code true} if successful; {@code false} otherwise. + */ + private boolean executeCreateMarkerRequest(Map paramsMap, String partitionPath, String markerFileName) { boolean success; try { success = executeRequestToTimelineServer( @@ -148,13 +185,31 @@ protected Option create(String partitionPath, String dataFileName, IOType } catch (IOException e) { throw new HoodieRemoteException("Failed to create marker file " + partitionPath + "/" + markerFileName, e); } - LOG.info("[timeline-server-based] Created marker file " + partitionPath + "/" + markerFileName - + " in " + timer.endTimer() + " ms"); - if (success) { - return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, partitionPath), markerFileName)); + return success; + } + + /** + * Gets parameter map for marker creation request. + * + * @param partitionPath Relative partition path. + * @param markerFileName Marker file name. + * @return parameter map. + */ + private Map getConfigMap( + String partitionPath, String markerFileName, boolean initEarlyConflictDetectionConfigs) { + Map paramsMap = new HashMap<>(); + paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); + if (StringUtils.isNullOrEmpty(partitionPath)) { + paramsMap.put(MARKER_NAME_PARAM, markerFileName); } else { - return Option.empty(); + paramsMap.put(MARKER_NAME_PARAM, partitionPath + "/" + markerFileName); + } + + if (initEarlyConflictDetectionConfigs) { + paramsMap.put(MARKER_BASEPATH_PARAM, basePath); } + + return paramsMap; } private T executeRequestToTimelineServer(String requestPath, Map queryParameters, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java index 07428dd936469..17751ad80de15 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java @@ -22,7 +22,10 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -55,15 +58,43 @@ public WriteMarkers(String basePath, String markerFolderPath, String instantTime /** * Creates a marker without checking if the marker already exists. * - * @param partitionPath partition path in the table - * @param dataFileName data file name - * @param type write IO type - * @return the marker path + * @param partitionPath partition path in the table. + * @param dataFileName data file name. + * @param type write IO type. + * @return the marker path. */ public Option create(String partitionPath, String dataFileName, IOType type) { return create(partitionPath, dataFileName, type, false); } + /** + * Creates a marker without checking if the marker already exists. + * This can invoke marker-based early conflict detection when enabled for multi-writers. + * + * @param partitionPath partition path in the table + * @param dataFileName data file name + * @param type write IO type + * @param writeConfig Hudi write configs. + * @param fileId File ID. + * @param activeTimeline Active timeline for the write operation. + * @return the marker path. + */ + public Option create(String partitionPath, String dataFileName, IOType type, HoodieWriteConfig writeConfig, + String fileId, HoodieActiveTimeline activeTimeline) { + if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() + && writeConfig.isEarlyConflictDetectionEnable()) { + HoodieTimeline pendingCompactionTimeline = activeTimeline.filterPendingCompactionTimeline(); + HoodieTimeline pendingReplaceTimeline = activeTimeline.filterPendingReplaceTimeline(); + // TODO If current is compact or clustering then create marker directly without early conflict detection. + // Need to support early conflict detection between table service and common writers. + if (pendingCompactionTimeline.containsInstant(instantTime) || pendingReplaceTimeline.containsInstant(instantTime)) { + return create(partitionPath, dataFileName, type, false); + } + return createWithEarlyConflictDetection(partitionPath, dataFileName, type, false, writeConfig, fileId, activeTimeline); + } + return create(partitionPath, dataFileName, type, false); + } + /** * Creates a marker if the marker does not exist. * @@ -165,4 +196,20 @@ protected Path getMarkerPath(String partitionPath, String dataFileName, IOType t * @return the marker path or empty option if already exists and {@code checkIfExists} is true */ abstract Option create(String partitionPath, String dataFileName, IOType type, boolean checkIfExists); + + /** + * Creates a marker with early conflict detection for multi-writers. If conflict is detected, + * an exception is thrown to fail the write operation. + * + * @param partitionPath partition path in the table. + * @param dataFileName data file name. + * @param type write IO type. + * @param checkIfExists whether to check if the marker already exists. + * @param config Hudi write configs. + * @param fileId File ID. + * @param activeTimeline Active timeline for the write operation. + * @return the marker path or empty option if already exists and {@code checkIfExists} is true. + */ + public abstract Option createWithEarlyConflictDetection(String partitionPath, String dataFileName, IOType type, boolean checkIfExists, + HoodieWriteConfig config, String fileId, HoodieActiveTimeline activeTimeline); } diff --git a/hudi-client/hudi-spark-client/pom.xml b/hudi-client/hudi-spark-client/pom.xml index de51e4480761c..5075bc687367c 100644 --- a/hudi-client/hudi-spark-client/pom.xml +++ b/hudi-client/hudi-spark-client/pom.xml @@ -215,6 +215,13 @@ awaitility test + + + org.apache.curator + curator-test + ${zk-curator.version} + test + diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 73fefc70ca663..7fa342916e46e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -20,6 +20,8 @@ import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; +import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -27,31 +29,40 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteConcurrencyMode; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.marker.SimpleDirectMarkerBasedDetectionStrategy; +import org.apache.hudi.table.marker.SimpleTransactionDirectMarkerBasedDetectionStrategy; import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy; +import org.apache.curator.test.TestingServer; import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -68,14 +79,21 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -115,8 +133,8 @@ public void clean() throws IOException { } private static final List LOCK_PROVIDER_CLASSES = Arrays.asList( - InProcessLockProvider.class, - FileSystemBasedLockProvider.class); + InProcessLockProvider.class, + FileSystemBasedLockProvider.class); private static Iterable providerClassAndTableType() { List opts = new ArrayList<>(); @@ -127,6 +145,112 @@ private static Iterable providerClassAndTableType() { return opts; } + /** + * Test multi-writers with early conflict detect enable, including + * 1. MOR + Direct marker + * 2. COW + Direct marker + * 3. MOR + Timeline server based marker + * 4. COW + Timeline server based marker + * + * |---------------------- 003 heartBeat expired -------------------| + * + * ---|---------|--------------------|--------------------------------------|-------------------------|-------------------------> time + * init 001 + * 002 start writing + * 003 start which has conflict with 002 + * and failed soon + * 002 commit successfully 004 write successfully + * @param tableType + * @param markerType + * @throws Exception + */ + @ParameterizedTest + @MethodSource("configParams") + public void testHoodieClientBasicMultiWriterWithEarlyConflictDetection(String tableType, String markerType, String earlyConflictDetectionStrategy) throws Exception { + if (tableType.equalsIgnoreCase(HoodieTableType.MERGE_ON_READ.name())) { + setUpMORTestTable(); + } + + int heartBeatIntervalForCommit4 = 10 * 1000; + + HoodieWriteConfig writeConfig; + TestingServer server = null; + if (earlyConflictDetectionStrategy.equalsIgnoreCase(SimpleTransactionDirectMarkerBasedDetectionStrategy.class.getName())) { + // need to setup zk related env there. Bcz SimpleTransactionDirectMarkerBasedDetectionStrategy is only support zk lock for now. + server = new TestingServer(); + Properties properties = new Properties(); + properties.setProperty(ZK_BASE_PATH_PROP_KEY, basePath); + properties.setProperty(ZK_CONNECT_URL_PROP_KEY, server.getConnectString()); + properties.setProperty(ZK_BASE_PATH_PROP_KEY, server.getTempDirectory().getAbsolutePath()); + properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP_KEY, "10000"); + properties.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, "10000"); + properties.setProperty(ZK_LOCK_KEY_PROP_KEY, "key"); + properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000"); + + writeConfig = buildWriteConfigForEarlyConflictDetect(markerType, properties, ZookeeperBasedLockProvider.class, earlyConflictDetectionStrategy); + } else { + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + writeConfig = buildWriteConfigForEarlyConflictDetect(markerType, properties, InProcessLockProvider.class, earlyConflictDetectionStrategy); + } + + // Create the first commit + final String nextCommitTime1 = "001"; + createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", nextCommitTime1, 200, true); + + final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig); + final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig); + + final String nextCommitTime2 = "002"; + + // start to write commit 002 + final JavaRDD writeStatusList2 = startCommitForUpdate(writeConfig, client2, nextCommitTime2, 100); + + // start to write commit 003 + // this commit 003 will failed quickly because early conflict detection before create marker. + final String nextCommitTime3 = "003"; + assertThrows(SparkException.class, () -> { + final JavaRDD writeStatusList3 = startCommitForUpdate(writeConfig, client3, nextCommitTime3, 100); + client3.commit(nextCommitTime3, writeStatusList3); + }, "Early conflict detected but cannot resolve conflicts for overlapping writes"); + + // start to commit 002 and success + assertDoesNotThrow(() -> { + client2.commit(nextCommitTime2, writeStatusList2); + }); + + HoodieWriteConfig config4 = HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).withHeartbeatIntervalInMs(heartBeatIntervalForCommit4).build(); + final SparkRDDWriteClient client4 = getHoodieWriteClient(config4); + + Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + nextCommitTime3); + fs.create(heartbeatFilePath, true); + + // Wait for heart beat expired for failed commitTime3 "003" + // Otherwise commit4 still can see conflict between failed write 003. + Thread.sleep(heartBeatIntervalForCommit4 * 2); + + final String nextCommitTime4 = "004"; + assertDoesNotThrow(() -> { + final JavaRDD writeStatusList4 = startCommitForUpdate(writeConfig, client4, nextCommitTime4, 100); + client4.commit(nextCommitTime4, writeStatusList4); + }); + + List completedInstant = metaClient.reloadActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants().stream() + .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + + assertEquals(3, completedInstant.size()); + assertTrue(completedInstant.contains(nextCommitTime1)); + assertTrue(completedInstant.contains(nextCommitTime2)); + assertTrue(completedInstant.contains(nextCommitTime4)); + + FileIOUtils.deleteDirectory(new File(basePath)); + if (server != null) { + server.close(); + } + } + @ParameterizedTest @MethodSource("providerClassAndTableType") public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass) throws Exception { @@ -658,4 +782,65 @@ private JavaRDD startCommitForUpdate(HoodieWriteConfig writeConfig, assertNoWriteErrors(statuses); return result; } + + public static Stream configParams() { + Object[][] data = + new Object[][] { + {"COPY_ON_WRITE", MarkerType.TIMELINE_SERVER_BASED.name(), AsyncTimelineServerBasedDetectionStrategy.class.getName()}, + {"MERGE_ON_READ", MarkerType.TIMELINE_SERVER_BASED.name(), AsyncTimelineServerBasedDetectionStrategy.class.getName()}, + {"MERGE_ON_READ", MarkerType.DIRECT.name(), SimpleDirectMarkerBasedDetectionStrategy.class.getName()}, + {"COPY_ON_WRITE", MarkerType.DIRECT.name(), SimpleDirectMarkerBasedDetectionStrategy.class.getName()}, + {"COPY_ON_WRITE", MarkerType.DIRECT.name(), SimpleTransactionDirectMarkerBasedDetectionStrategy.class.getName()} + }; + return Stream.of(data).map(Arguments::of); + } + + private HoodieWriteConfig buildWriteConfigForEarlyConflictDetect(String markerType, Properties properties, + Class lockProvider, String earlyConflictDetectionStrategy) { + if (markerType.equalsIgnoreCase(MarkerType.DIRECT.name())) { + return getConfigBuilder() + .withHeartbeatIntervalInMs(60 * 1000) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.MEMORY) + .withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoClean(false).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .withAutoArchive(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withMarkersType(MarkerType.DIRECT.name()) + .withEarlyConflictDetectionEnable(true) + .withEarlyConflictDetectionStrategy(earlyConflictDetectionStrategy) + .withAsyncConflictDetectorBatchIntervalMs(0) + .withAsyncConflictDetectorPeriodMs(100) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(lockProvider).build()) + .withAutoCommit(false).withProperties(properties).build(); + } else { + return getConfigBuilder() + .withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(20 * 1024).build()) + .withHeartbeatIntervalInMs(60 * 1000) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.MEMORY) + .withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withAutoClean(false).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .withAutoArchive(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withMarkersType(MarkerType.TIMELINE_SERVER_BASED.name()) + // Set the batch processing interval for marker requests to be larger than + // the running interval of the async conflict detector so that the conflict can + // be detected before the marker requests are processed at the timeline server + // in the test. + .withMarkersTimelineServerBasedBatchIntervalMs(1000) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(lockProvider).build()) + .withEarlyConflictDetectionEnable(true) + .withEarlyConflictDetectionStrategy(earlyConflictDetectionStrategy) + .withAsyncConflictDetectorBatchIntervalMs(0) + .withAsyncConflictDetectorPeriodMs(100) + .withAutoCommit(false).withProperties(properties).build(); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/DirectMarkerBasedDetectionStrategy.java b/hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/DirectMarkerBasedDetectionStrategy.java new file mode 100644 index 0000000000000..67286d4f0b3a8 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/DirectMarkerBasedDetectionStrategy.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.conflict.detection; + +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.MarkerUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This abstract strategy is used for direct marker writers, trying to do early conflict detection. + */ +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) +public abstract class DirectMarkerBasedDetectionStrategy implements EarlyConflictDetectionStrategy { + + private static final Logger LOG = LogManager.getLogger(DirectMarkerBasedDetectionStrategy.class); + + protected final FileSystem fs; + protected final String partitionPath; + protected final String fileId; + protected final String instantTime; + protected final HoodieActiveTimeline activeTimeline; + protected final HoodieConfig config; + + public DirectMarkerBasedDetectionStrategy(HoodieWrapperFileSystem fs, String partitionPath, String fileId, String instantTime, + HoodieActiveTimeline activeTimeline, HoodieConfig config) { + this.fs = fs; + this.partitionPath = partitionPath; + this.fileId = fileId; + this.instantTime = instantTime; + this.activeTimeline = activeTimeline; + this.config = config; + } + + /** + * We need to do list operation here. + * In order to reduce the list pressure as much as possible, first we build path prefix in advance: + * '$base_path/.temp/instant_time/partition_path', and only list these specific partition_paths + * we need instead of list all the '$base_path/.temp/' + * + * @param basePath Base path of the table. + * @param maxAllowableHeartbeatIntervalInMs Heartbeat timeout. + * @return true if current fileID is already existed under .temp/instant_time/partition_path/.. + * @throws IOException upon errors. + */ + public boolean checkMarkerConflict(String basePath, long maxAllowableHeartbeatIntervalInMs) throws IOException { + String tempFolderPath = basePath + Path.SEPARATOR + HoodieTableMetaClient.TEMPFOLDER_NAME; + + List candidateInstants = MarkerUtils.getCandidateInstants(activeTimeline, Arrays.stream(fs.listStatus(new Path(tempFolderPath))).map(FileStatus::getPath).collect(Collectors.toList()), + instantTime, maxAllowableHeartbeatIntervalInMs, fs, basePath); + + long res = candidateInstants.stream().flatMap(currentMarkerDirPath -> { + try { + Path markerPartitionPath; + if (StringUtils.isNullOrEmpty(partitionPath)) { + markerPartitionPath = new Path(currentMarkerDirPath); + } else { + markerPartitionPath = new Path(currentMarkerDirPath, partitionPath); + } + + if (!StringUtils.isNullOrEmpty(partitionPath) && !fs.exists(markerPartitionPath)) { + return Stream.empty(); + } else { + return Arrays.stream(fs.listStatus(markerPartitionPath)).parallel() + .filter((path) -> path.toString().contains(fileId)); + } + } catch (IOException e) { + throw new HoodieIOException("IOException occurs during checking marker file conflict"); + } + }).count(); + + if (res != 0L) { + LOG.warn("Detected conflict marker files: " + partitionPath + "/" + fileId + " for " + instantTime); + return true; + } + return false; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/EarlyConflictDetectionStrategy.java b/hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/EarlyConflictDetectionStrategy.java new file mode 100644 index 0000000000000..ccb97e1e8bf70 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/EarlyConflictDetectionStrategy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.conflict.detection; + +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.exception.HoodieEarlyConflictDetectionException; + +/** + * Interface for pluggable strategy of early conflict detection for multiple writers. + */ +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) +public interface EarlyConflictDetectionStrategy { + /** + * Detects and resolves the write conflict if necessary. + */ + void detectAndResolveConflictIfNecessary() throws HoodieEarlyConflictDetectionException; + + /** + * @return whether there's a write conflict based on markers. + */ + boolean hasMarkerConflict(); + + /** + * Resolves a write conflict. + * + * @param basePath Base path of the table. + * @param partitionPath Relative partition path. + * @param dataFileName Data file name. + */ + void resolveMarkerConflict(String basePath, String partitionPath, String dataFileName); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/TimelineServerBasedDetectionStrategy.java b/hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/TimelineServerBasedDetectionStrategy.java new file mode 100644 index 0000000000000..51df6216a2dc0 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/TimelineServerBasedDetectionStrategy.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.conflict.detection; + +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.common.table.timeline.HoodieInstant; + +import org.apache.hadoop.fs.FileSystem; + +import java.util.Set; + +/** + * This abstract strategy is used for writers using timeline-server-based markers, + * trying to do early conflict detection. + */ +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) +public abstract class TimelineServerBasedDetectionStrategy implements EarlyConflictDetectionStrategy { + + protected final String basePath; + protected final String markerDir; + protected final String markerName; + protected final boolean checkCommitConflict; + + public TimelineServerBasedDetectionStrategy(String basePath, String markerDir, String markerName, Boolean checkCommitConflict) { + this.basePath = basePath; + this.markerDir = markerDir; + this.markerName = markerName; + this.checkCommitConflict = checkCommitConflict; + } + + /** + * Starts the async conflict detection thread. + * + * @param batchIntervalMs Batch internal in milliseconds. + * @param periodMs Scheduling period in milliseconds. + * @param markerDir Marker directory. + * @param basePath Base path of the table. + * @param maxAllowableHeartbeatIntervalInMs Heartbeat timeout. + * @param fileSystem {@link FileSystem} instance. + * @param markerHandler Marker handler. + * @param completedCommits Completed Hudi commits. + */ + public abstract void startAsyncDetection(Long batchIntervalMs, Long periodMs, String markerDir, + String basePath, Long maxAllowableHeartbeatIntervalInMs, + FileSystem fileSystem, Object markerHandler, + Set completedCommits); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java new file mode 100644 index 0000000000000..2ef3f4d54dd2b --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/heartbeat/HoodieHeartbeatUtils.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.heartbeat; + +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; + +/** + * Common utils for Hudi heartbeat + */ +public class HoodieHeartbeatUtils { + private static final Logger LOG = LogManager.getLogger(HoodieHeartbeatUtils.class); + + /** + * Use modification time as last heart beat time. + * + * @param fs {@link FileSystem} instance. + * @param basePath Base path of the table. + * @param instantTime Instant time. + * @return Last heartbeat timestamp. + * @throws IOException + */ + public static Long getLastHeartbeatTime(FileSystem fs, String basePath, String instantTime) throws IOException { + Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + instantTime); + if (fs.exists(heartbeatFilePath)) { + return fs.getFileStatus(heartbeatFilePath).getModificationTime(); + } else { + // NOTE : This can happen when a writer is upgraded to use lazy cleaning and the last write had failed + return 0L; + } + } + + /** + * Whether a heartbeat is expired. + * + * @param instantTime Instant time. + * @param maxAllowableHeartbeatIntervalInMs Heartbeat timeout in milliseconds. + * @param fs {@link FileSystem} instance. + * @param basePath Base path of the table. + * @return {@code true} if expired; {@code false} otherwise. + * @throws IOException upon errors. + */ + public static boolean isHeartbeatExpired(String instantTime, long maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) throws IOException { + Long currentTime = System.currentTimeMillis(); + Long lastHeartbeatTime = getLastHeartbeatTime(fs, basePath, instantTime); + if (currentTime - lastHeartbeatTime > maxAllowableHeartbeatIntervalInMs) { + LOG.warn("Heartbeat expired, for instant: " + instantTime); + return true; + } + return false; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java index 94da60c39c1ab..81836bdb85238 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java @@ -29,6 +29,7 @@ public class MarkerOperation implements Serializable { public static final String MARKER_DIR_PATH_PARAM = "markerdirpath"; public static final String MARKER_NAME_PARAM = "markername"; + public static final String MARKER_BASEPATH_PARAM = "basepath"; // GET requests public static final String ALL_MARKERS_URL = String.format("%s/%s", BASE_URL, "all"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java index 0aff8f594a5df..0ac8590f0afcf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java @@ -22,8 +22,11 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -39,12 +42,17 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils.isHeartbeatExpired; import static org.apache.hudi.common.util.FileIOUtils.closeQuietly; /** @@ -200,6 +208,18 @@ public static Map> readTimelineServerBasedMarkersFromFileSys * @return markers in a {@code Set} of String. */ public static Set readMarkersFromFile(Path markersFilePath, SerializableConfiguration conf) { + return readMarkersFromFile(markersFilePath, conf, false); + } + + /** + * Reads the markers stored in the underlying file. + * + * @param markersFilePath File path for the markers. + * @param conf Serializable config. + * @param ignoreException Whether to ignore IOException. + * @return Markers in a {@code Set} of String. + */ + public static Set readMarkersFromFile(Path markersFilePath, SerializableConfiguration conf, boolean ignoreException) { FSDataInputStream fsDataInputStream = null; Set markers = new HashSet<>(); try { @@ -208,10 +228,104 @@ public static Set readMarkersFromFile(Path markersFilePath, Serializable fsDataInputStream = fs.open(markersFilePath); markers = new HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream)); } catch (IOException e) { - throw new HoodieIOException("Failed to read MARKERS file " + markersFilePath, e); + String errorMessage = "Failed to read MARKERS file " + markersFilePath; + if (ignoreException) { + LOG.warn(errorMessage + ". Ignoring the exception and continue.", e); + } else { + throw new HoodieIOException(errorMessage, e); + } } finally { closeQuietly(fsDataInputStream); } return markers; } + + /** + * Gets all marker directories. + * + * @param tempPath Temporary folder under .hoodie. + * @param fs File system to use. + * @return All marker directories. + * @throws IOException upon error. + */ + public static List getAllMarkerDir(Path tempPath, FileSystem fs) throws IOException { + return Arrays.stream(fs.listStatus(tempPath)).map(FileStatus::getPath).collect(Collectors.toList()); + } + + /** + * Whether there is write conflict with completed commit among multiple writers. + * + * @param activeTimeline Active timeline. + * @param currentFileIDs Current set of file IDs. + * @param completedCommitInstants Completed commits. + * @return {@code true} if the conflict is detected; {@code false} otherwise. + */ + public static boolean hasCommitConflict(HoodieActiveTimeline activeTimeline, Set currentFileIDs, Set completedCommitInstants) { + + Set currentInstants = new HashSet<>( + activeTimeline.reload().getCommitsTimeline().filterCompletedInstants().getInstants()); + + currentInstants.removeAll(completedCommitInstants); + Set missingFileIDs = currentInstants.stream().flatMap(instant -> { + try { + return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class) + .getFileIdAndRelativePaths().keySet().stream(); + } catch (Exception e) { + return Stream.empty(); + } + }).collect(Collectors.toSet()); + currentFileIDs.retainAll(missingFileIDs); + return !currentFileIDs.isEmpty(); + } + + /** + * Get Candidate Instant to do conflict checking: + * 1. Skip current writer related instant(currentInstantTime) + * 2. Skip all instants after currentInstantTime + * 3. Skip dead writers related instants based on heart-beat + * 4. Skip pending compaction instant (For now we don' do early conflict check with compact action) + * Because we don't want to let pending compaction block common writer. + * @param instants + * @return + */ + public static List getCandidateInstants(HoodieActiveTimeline activeTimeline, List instants, String currentInstantTime, + long maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) { + + return instants.stream().map(Path::toString).filter(instantPath -> { + String instantTime = markerDirToInstantTime(instantPath); + return instantTime.compareToIgnoreCase(currentInstantTime) < 0 + && !activeTimeline.filterPendingCompactionTimeline().containsInstant(instantTime) + && !activeTimeline.filterPendingReplaceTimeline().containsInstant(instantTime); + }).filter(instantPath -> { + try { + return !isHeartbeatExpired(markerDirToInstantTime(instantPath), maxAllowableHeartbeatIntervalInMs, fs, basePath); + } catch (IOException e) { + return false; + } + }).collect(Collectors.toList()); + } + + /** + * Get fileID from full marker path, for example: + * 20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0_85-15-1390_20220620181735781.parquet.marker.MERGE + * ==> get 20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0 + * @param marker + * @return + */ + public static String makerToPartitionAndFileID(String marker) { + String[] ele = marker.split("_"); + return ele[0]; + } + + /** + * Get instantTime from full marker path, for example: + * /var/folders/t3/th1dw75d0yz2x2k2qt6ys9zh0000gp/T/junit6502909693741900820/dataset/.hoodie/.temp/003 + * ==> 003 + * @param marker + * @return + */ + public static String markerDirToInstantTime(String marker) { + String[] ele = marker.split("/"); + return ele[ele.length - 1]; + } } \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index f831af1820181..d936ee5746579 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -189,4 +189,16 @@ public static Object invokeStaticMethod(String clazz, String methodName, Object[ throw new HoodieException(String.format("Unable to invoke the methond %s of the class %s ", methodName, clazz), e); } } + + /** + * Checks if the given class with the name is a subclass of another class. + * + * @param aClazzName Class name. + * @param superClazz Super class to check. + * @return {@code true} if {@code aClazzName} is a subclass of {@code superClazz}; + * {@code false} otherwise. + */ + public static boolean isSubClass(String aClazzName, Class superClazz) { + return superClazz.isAssignableFrom(getClass(aClazzName)); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieEarlyConflictDetectionException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieEarlyConflictDetectionException.java new file mode 100644 index 0000000000000..c88abcb4e0d57 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieEarlyConflictDetectionException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +/** + *

+ * Exception thrown for Hoodie failures. The root of the exception hierarchy. + *

+ *

+ * Hoodie Write clients will throw this exception if early conflict detected. This is a runtime (unchecked) + * exception. + *

+ */ +public class HoodieEarlyConflictDetectionException extends HoodieException { + + public HoodieEarlyConflictDetectionException(String msg) { + super(msg); + } + + public HoodieEarlyConflictDetectionException(Throwable e) { + super(e); + } + + public HoodieEarlyConflictDetectionException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestReflectionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestReflectionUtils.java new file mode 100644 index 0000000000000..15decc3e5c57b --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestReflectionUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.conflict.detection.DirectMarkerBasedDetectionStrategy; +import org.apache.hudi.common.conflict.detection.EarlyConflictDetectionStrategy; +import org.apache.hudi.common.conflict.detection.TimelineServerBasedDetectionStrategy; + +import org.junit.jupiter.api.Test; + +import static org.apache.hudi.common.util.ReflectionUtils.isSubClass; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests {@link ReflectionUtils} + */ +public class TestReflectionUtils { + @Test + public void testIsSubClass() { + String subClassName1 = DirectMarkerBasedDetectionStrategy.class.getName(); + String subClassName2 = TimelineServerBasedDetectionStrategy.class.getName(); + assertTrue(isSubClass(subClassName1, EarlyConflictDetectionStrategy.class)); + assertTrue(isSubClass(subClassName2, EarlyConflictDetectionStrategy.class)); + assertTrue(isSubClass(subClassName2, TimelineServerBasedDetectionStrategy.class)); + assertFalse(isSubClass(subClassName2, DirectMarkerBasedDetectionStrategy.class)); + } +} diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index a295036b6d030..4c999b9ad2209 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -459,7 +459,8 @@ private void registerMarkerAPI() { ctx.future(markerHandler.createMarker( ctx, ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(MarkerOperation.MARKER_NAME_PARAM, String.class).getOrDefault(""))); + ctx.queryParamAsClass(MarkerOperation.MARKER_NAME_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM, String.class).getOrDefault(""))); }, false)); app.post(MarkerOperation.DELETE_MARKER_DIR_URL, new ViewHandler(ctx -> { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index a74cda7b3af21..2bf3f4dea5006 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -18,8 +18,6 @@ package org.apache.hudi.timeline.service; -import io.javalin.core.JavalinConfig; -import io.javalin.jetty.JettyServer; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; @@ -33,6 +31,8 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import io.javalin.Javalin; +import io.javalin.core.JavalinConfig; +import io.javalin.jetty.JettyServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; @@ -124,6 +124,41 @@ public static class Config implements Serializable { @Parameter(names = {"--marker-parallelism", "-mdp"}, description = "Parallelism to use for reading and deleting marker files") public int markerParallelism = 100; + @Parameter(names = {"--early-conflict-detection-strategy"}, description = + "The class name of the early conflict detection strategy to use. " + + "This should be subclass of " + + "`org.apache.hudi.common.conflict.detection.EarlyConflictDetectionStrategy`") + public String earlyConflictDetectionStrategy = "org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy"; + + @Parameter(names = {"--early-conflict-detection-check-commit-conflict"}, description = + "Whether to enable commit conflict checking or not during early " + + "conflict detection.") + public Boolean checkCommitConflict = false; + + @Parameter(names = {"--early-conflict-detection-enable"}, description = + "Whether to enable early conflict detection based on markers. " + + "It eagerly detects writing conflict before create markers and fails fast if a " + + "conflict is detected, to release cluster compute resources as soon as possible.") + public Boolean earlyConflictDetectionEnable = false; + + @Parameter(names = {"--async-conflict-detector-batch-interval-ms"}, description = + "Used for timeline-server-based markers with " + + "`AsyncTimelineServerBasedDetectionStrategy`. " + + "The time in milliseconds to delay first async marker conflict detection.") + public Long asyncConflictDetectorBatchIntervalMs = 30000L; + + @Parameter(names = {"--async-conflict-detector-batch-period-ms"}, description = + "Used for timeline-server-based markers with " + + "`AsyncTimelineServerBasedDetectionStrategy`. " + + "The period in milliseconds between consecutive runs of async marker conflict detection.") + public Long asyncConflictDetectorBatchPeriodMs = 30000L; + + @Parameter(names = {"--early-conflict-detection-max-heartbeat-interval-ms"}, description = + "Used for timeline-server-based markers with " + + "`AsyncTimelineServerBasedDetectionStrategy`. " + + "Instants whose heartbeat is greater than the current value will not be used in early conflict detection.") + public Long maxAllowableHeartbeatIntervalInMs = 60000L; + @Parameter(names = {"--help", "-h"}) public Boolean help = false; @@ -148,6 +183,12 @@ public static class Builder { private int markerBatchNumThreads = 20; private long markerBatchIntervalMs = 50L; private int markerParallelism = 100; + private String earlyConflictDetectionStrategy = "org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy"; + private Boolean checkCommitConflict = false; + private Boolean earlyConflictDetectionEnable = false; + private Long asyncConflictDetectorBatchIntervalMs = 30000L; + private Long asyncConflictDetectorBatchPeriodMs = 30000L; + private Long maxAllowableHeartbeatIntervalInMs = 60000L; public Builder() { } @@ -217,6 +258,36 @@ public Builder markerParallelism(int markerParallelism) { return this; } + public Builder earlyConflictDetectionStrategy(String earlyConflictDetectionStrategy) { + this.earlyConflictDetectionStrategy = earlyConflictDetectionStrategy; + return this; + } + + public Builder earlyConflictDetectionCheckCommitConflict(Boolean checkCommitConflict) { + this.checkCommitConflict = checkCommitConflict; + return this; + } + + public Builder earlyConflictDetectionEnable(Boolean earlyConflictDetectionEnable) { + this.earlyConflictDetectionEnable = earlyConflictDetectionEnable; + return this; + } + + public Builder asyncConflictDetectorBatchIntervalMs(Long asyncConflictDetectorBatchIntervalMs) { + this.asyncConflictDetectorBatchIntervalMs = asyncConflictDetectorBatchIntervalMs; + return this; + } + + public Builder asyncConflictDetectorBatchPeriodMs(Long asyncConflictDetectorBatchPeriodMs) { + this.asyncConflictDetectorBatchPeriodMs = asyncConflictDetectorBatchPeriodMs; + return this; + } + + public Builder earlyConflictDetectionMaxAllowableHeartbeatIntervalInMs(Long maxAllowableHeartbeatIntervalInMs) { + this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs; + return this; + } + public Config build() { Config config = new Config(); config.serverPort = this.serverPort; @@ -232,6 +303,12 @@ public Config build() { config.markerBatchNumThreads = this.markerBatchNumThreads; config.markerBatchIntervalMs = this.markerBatchIntervalMs; config.markerParallelism = this.markerParallelism; + config.earlyConflictDetectionStrategy = this.earlyConflictDetectionStrategy; + config.checkCommitConflict = this.checkCommitConflict; + config.earlyConflictDetectionEnable = this.earlyConflictDetectionEnable; + config.asyncConflictDetectorBatchIntervalMs = this.asyncConflictDetectorBatchIntervalMs; + config.asyncConflictDetectorBatchPeriodMs = this.asyncConflictDetectorBatchPeriodMs; + config.maxAllowableHeartbeatIntervalInMs = this.maxAllowableHeartbeatIntervalInMs; return config; } } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java index 43c7d93bd6e8b..20604e61c741b 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java @@ -18,15 +18,24 @@ package org.apache.hudi.timeline.service.handlers; +import org.apache.hudi.common.conflict.detection.TimelineServerBasedDetectionStrategy; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieEarlyConflictDetectionException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.timeline.service.TimelineService; import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable; import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture; import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.javalin.http.Context; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -34,10 +43,12 @@ import org.apache.log4j.Logger; import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -45,6 +56,11 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; +import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult; + /** * REST Handler servicing marker requests. * @@ -79,9 +95,12 @@ public class MarkerHandler extends Handler { // A thread to dispatch marker creation requests to batch processing threads private final MarkerCreationDispatchingRunnable markerCreationDispatchingRunnable; private final Object firstCreationRequestSeenLock = new Object(); + private final Object earlyConflictDetectionLock = new Object(); private transient HoodieEngineContext hoodieEngineContext; private ScheduledFuture dispatchingThreadFuture; private boolean firstCreationRequestSeen; + private String currentMarkerDir = null; + private TimelineServerBasedDetectionStrategy earlyConflictDetectionStrategy; public MarkerHandler(Configuration conf, TimelineService.Config timelineServiceConfig, HoodieEngineContext hoodieEngineContext, FileSystem fileSystem, @@ -120,6 +139,19 @@ public Set getAllMarkers(String markerDir) { return markerDirState.getAllMarkers(); } + /** + * @param markerDir marker directory path. + * @return Pending markers from the requests to process. + */ + public Set getPendingMarkersToProcess(String markerDir) { + if (markerDirStateMap.containsKey(markerDir)) { + MarkerDirState markerDirState = getMarkerDirState(markerDir); + return markerDirState.getPendingMarkerCreationRequests(false).stream() + .map(MarkerCreationFuture::getMarkerName).collect(Collectors.toSet()); + } + return Collections.emptySet(); + } + /** * @param markerDir marker directory path * @return all marker paths of write IO type "CREATE" and "MERGE" @@ -150,7 +182,66 @@ public boolean doesMarkerDirExist(String markerDir) { * @param markerName marker name * @return the {@code CompletableFuture} instance for the request */ - public CompletableFuture createMarker(Context context, String markerDir, String markerName) { + public CompletableFuture createMarker(Context context, String markerDir, String markerName, String basePath) { + // Step1 do early conflict detection if enable + if (timelineServiceConfig.earlyConflictDetectionEnable) { + try { + synchronized (earlyConflictDetectionLock) { + if (earlyConflictDetectionStrategy == null) { + String strategyClassName = timelineServiceConfig.earlyConflictDetectionStrategy; + if (!ReflectionUtils.isSubClass(strategyClassName, TimelineServerBasedDetectionStrategy.class)) { + LOG.warn("Cannot use " + strategyClassName + " for timeline-server-based markers."); + strategyClassName = "org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy"; + LOG.warn("Falling back to " + strategyClassName); + } + + earlyConflictDetectionStrategy = + (TimelineServerBasedDetectionStrategy) ReflectionUtils.loadClass( + strategyClassName, basePath, markerDir, markerName, timelineServiceConfig.checkCommitConflict); + } + + // markerDir => $base_path/.hoodie/.temp/$instant_time + // If markerDir is changed like move to the next instant action, we need to fresh this earlyConflictDetectionStrategy. + // For specific instant related create marker action, we only call this check/fresh once + // instead of starting the conflict detector for every request + if (!markerDir.equalsIgnoreCase(currentMarkerDir)) { + this.currentMarkerDir = markerDir; + Set actions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION); + Set completedCommits = new HashSet<>( + viewManager.getFileSystemView(basePath) + .getTimeline() + .filterCompletedInstants() + .filter(instant -> actions.contains(instant.getAction())) + .getInstants()); + + earlyConflictDetectionStrategy.startAsyncDetection( + timelineServiceConfig.asyncConflictDetectorBatchIntervalMs, + timelineServiceConfig.asyncConflictDetectorBatchPeriodMs, + markerDir, basePath, timelineServiceConfig.maxAllowableHeartbeatIntervalInMs, + fileSystem, this, completedCommits); + } + } + + earlyConflictDetectionStrategy.detectAndResolveConflictIfNecessary(); + + } catch (HoodieEarlyConflictDetectionException he) { + LOG.warn("Detected the write conflict due to a concurrent writer, " + + "failing the marker creation as the early conflict detection is enabled", he); + return finishCreateMarkerFuture(context, markerDir, markerName); + } catch (Exception e) { + LOG.warn("Failed to execute early conflict detection." + e.getMessage()); + // When early conflict detection fails to execute, we still allow the marker creation + // to continue + return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName); + } + } + + // Step 2 create marker + return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName); + } + + private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing( + Context context, String markerDir, String markerName) { LOG.info("Request: create marker " + markerDir + " " + markerName); MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName); // Add the future to the list @@ -169,6 +260,17 @@ public CompletableFuture createMarker(Context context, String markerDir, return future; } + private CompletableFuture finishCreateMarkerFuture(Context context, String markerDir, String markerName) { + MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName); + try { + future.complete(jsonifyResult( + future.getContext(), future.isSuccessful(), metricsRegistry, new ObjectMapper(), LOG)); + } catch (JsonProcessingException e) { + throw new HoodieException("Failed to JSON encode the value", e); + } + return future; + } + /** * Deletes markers in the directory. * @@ -186,8 +288,13 @@ private MarkerDirState getMarkerDirState(String markerDir) { if (markerDirState == null) { synchronized (markerDirStateMap) { if (markerDirStateMap.get(markerDir) == null) { - markerDirState = new MarkerDirState(markerDir, timelineServiceConfig.markerBatchNumThreads, - fileSystem, metricsRegistry, hoodieEngineContext, parallelism); + Option strategy = + timelineServiceConfig.earlyConflictDetectionEnable + && earlyConflictDetectionStrategy != null + ? Option.of(earlyConflictDetectionStrategy) : Option.empty(); + markerDirState = new MarkerDirState( + markerDir, timelineServiceConfig.markerBatchNumThreads, + strategy, fileSystem, metricsRegistry, hoodieEngineContext, parallelism); markerDirStateMap.put(markerDir, markerDirState); } else { markerDirState = markerDirStateMap.get(markerDir); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/AsyncTimelineServerBasedDetectionStrategy.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/AsyncTimelineServerBasedDetectionStrategy.java new file mode 100644 index 0000000000000..1356db0d96eff --- /dev/null +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/AsyncTimelineServerBasedDetectionStrategy.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline.service.handlers.marker; + +import org.apache.hudi.common.conflict.detection.TimelineServerBasedDetectionStrategy; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.exception.HoodieEarlyConflictDetectionException; +import org.apache.hudi.timeline.service.handlers.MarkerHandler; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ConcurrentModificationException; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This abstract strategy is used for writers using timeline-server-based markers, + * trying to do early conflict detection by asynchronously and periodically checking + * write conflict among multiple writers based on the timeline-server-based markers. + */ +public class AsyncTimelineServerBasedDetectionStrategy extends TimelineServerBasedDetectionStrategy { + + private static final Logger LOG = LogManager.getLogger(AsyncTimelineServerBasedDetectionStrategy.class); + + private AtomicBoolean hasConflict = new AtomicBoolean(false); + private ScheduledExecutorService asyncDetectorExecutor; + + public AsyncTimelineServerBasedDetectionStrategy(String basePath, String markerDir, String markerName, Boolean checkCommitConflict) { + super(basePath, markerDir, markerName, checkCommitConflict); + } + + @Override + public boolean hasMarkerConflict() { + return hasConflict.get(); + } + + @Override + public void resolveMarkerConflict(String basePath, String markerDir, String markerName) { + throw new HoodieEarlyConflictDetectionException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes")); + } + + @Override + public void startAsyncDetection(Long batchIntervalMs, Long periodMs, String markerDir, + String basePath, Long maxAllowableHeartbeatIntervalInMs, + FileSystem fileSystem, Object markerHandler, + Set completedCommits) { + if (asyncDetectorExecutor != null) { + asyncDetectorExecutor.shutdown(); + } + hasConflict.set(false); + asyncDetectorExecutor = Executors.newSingleThreadScheduledExecutor(); + asyncDetectorExecutor.scheduleAtFixedRate( + new MarkerBasedEarlyConflictDetectionRunnable( + hasConflict, (MarkerHandler) markerHandler, markerDir, basePath, + fileSystem, maxAllowableHeartbeatIntervalInMs, completedCommits, checkCommitConflict), + batchIntervalMs, periodMs, TimeUnit.MILLISECONDS); + } + + @Override + public void detectAndResolveConflictIfNecessary() throws HoodieEarlyConflictDetectionException { + if (hasMarkerConflict()) { + resolveMarkerConflict(basePath, markerDir, markerName); + } + } +} diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java new file mode 100644 index 0000000000000..58fb14bdfc0cd --- /dev/null +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline.service.handlers.marker; + +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.MarkerUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.timeline.service.handlers.MarkerHandler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +public class MarkerBasedEarlyConflictDetectionRunnable implements Runnable { + private static final Logger LOG = LogManager.getLogger(MarkerBasedEarlyConflictDetectionRunnable.class); + + private MarkerHandler markerHandler; + private String markerDir; + private String basePath; + private FileSystem fs; + private AtomicBoolean hasConflict; + private long maxAllowableHeartbeatIntervalInMs; + private Set completedCommits; + private final boolean checkCommitConflict; + + public MarkerBasedEarlyConflictDetectionRunnable(AtomicBoolean hasConflict, MarkerHandler markerHandler, String markerDir, + String basePath, FileSystem fileSystem, long maxAllowableHeartbeatIntervalInMs, + Set completedCommits, boolean checkCommitConflict) { + this.markerHandler = markerHandler; + this.markerDir = markerDir; + this.basePath = basePath; + this.fs = fileSystem; + this.hasConflict = hasConflict; + this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs; + this.completedCommits = completedCommits; + this.checkCommitConflict = checkCommitConflict; + } + + @Override + public void run() { + // If a conflict among multiple writers is already detected, + // there is no need to run the detection again. + if (hasConflict.get()) { + return; + } + + try { + Set pendingMarkers = markerHandler.getPendingMarkersToProcess(markerDir); + + if (!fs.exists(new Path(markerDir)) && pendingMarkers.isEmpty()) { + return; + } + + HoodieTimer timer = HoodieTimer.start(); + Set currentInstantAllMarkers = new HashSet<>(); + // We need to check both the markers already written to the storage + // and the markers from the requests pending processing. + currentInstantAllMarkers.addAll(markerHandler.getAllMarkers(markerDir)); + currentInstantAllMarkers.addAll(pendingMarkers); + Path tempPath = new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.TEMPFOLDER_NAME); + + List instants = MarkerUtils.getAllMarkerDir(tempPath, fs); + + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true).build(); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + List candidate = MarkerUtils.getCandidateInstants(activeTimeline, instants, + MarkerUtils.markerDirToInstantTime(markerDir), maxAllowableHeartbeatIntervalInMs, fs, basePath); + Set tableMarkers = candidate.stream().flatMap(instant -> { + return MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(instant, fs, new HoodieLocalEngineContext(new Configuration()), 100) + .values().stream().flatMap(Collection::stream); + }).collect(Collectors.toSet()); + + Set currentFileIDs = currentInstantAllMarkers.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet()); + Set tableFilesIDs = tableMarkers.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet()); + + currentFileIDs.retainAll(tableFilesIDs); + if (!currentFileIDs.isEmpty() + || (checkCommitConflict && MarkerUtils.hasCommitConflict(activeTimeline, + currentInstantAllMarkers.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet()), completedCommits))) { + LOG.warn("Conflict writing detected based on markers!\n" + + "Conflict markers: " + currentInstantAllMarkers + "\n" + + "Table markers: " + tableMarkers); + hasConflict.compareAndSet(false, true); + } + LOG.info("Finish batching marker-based conflict detection in " + timer.endTimer() + " ms"); + + } catch (IOException e) { + throw new HoodieIOException("IOException occurs during checking marker conflict"); + } + } +} diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index f367ec870eb1c..cbffea3db9038 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -18,6 +18,7 @@ package org.apache.hudi.timeline.service.handlers.marker; +import org.apache.hudi.common.conflict.detection.TimelineServerBasedDetectionStrategy; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; @@ -25,6 +26,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieEarlyConflictDetectionException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -81,14 +83,18 @@ public class MarkerDirState implements Serializable { private final List markerCreationFutures = new ArrayList<>(); private final int parallelism; private final Object markerCreationProcessingLock = new Object(); + // Early conflict detection strategy if enabled + private final Option conflictDetectionStrategy; private transient HoodieEngineContext hoodieEngineContext; // Last underlying file index used, for finding the next file index // in a round-robin fashion private int lastFileIndexUsed = -1; private boolean isMarkerTypeWritten = false; - public MarkerDirState(String markerDirPath, int markerBatchNumThreads, FileSystem fileSystem, - Registry metricsRegistry, HoodieEngineContext hoodieEngineContext, int parallelism) { + public MarkerDirState(String markerDirPath, int markerBatchNumThreads, + Option conflictDetectionStrategy, + FileSystem fileSystem, Registry metricsRegistry, + HoodieEngineContext hoodieEngineContext, int parallelism) { this.markerDirPath = markerDirPath; this.fileSystem = fileSystem; this.metricsRegistry = metricsRegistry; @@ -96,6 +102,7 @@ public MarkerDirState(String markerDirPath, int markerBatchNumThreads, FileSyste this.parallelism = parallelism; this.threadUseStatus = Stream.generate(() -> false).limit(markerBatchNumThreads).collect(Collectors.toList()); + this.conflictDetectionStrategy = conflictDetectionStrategy; // Lazy initialization of markers by reading MARKERS* files on the file system syncMarkersFromFileSystem(); } @@ -167,16 +174,26 @@ public void markFileAsAvailable(int fileIndex) { } /** - * @return futures of pending marker creation requests and removes them from the list. + * @return futures of pending marker creation requests and removes them from the list. */ public List fetchPendingMarkerCreationRequests() { + return getPendingMarkerCreationRequests(true); + } + + /** + * @param shouldClear Should clear the internal request list or not. + * @return futures of pending marker creation requests. + */ + public List getPendingMarkerCreationRequests(boolean shouldClear) { List pendingFutures; synchronized (markerCreationFutures) { if (markerCreationFutures.isEmpty()) { return new ArrayList<>(); } pendingFutures = new ArrayList<>(markerCreationFutures); - markerCreationFutures.clear(); + if (shouldClear) { + markerCreationFutures.clear(); + } } return pendingFutures; } @@ -196,16 +213,33 @@ public void processMarkerCreationRequests( LOG.debug("timeMs=" + System.currentTimeMillis() + " markerDirPath=" + markerDirPath + " numRequests=" + pendingMarkerCreationFutures.size() + " fileIndex=" + fileIndex); - + boolean shouldFlushMarkers = false; + synchronized (markerCreationProcessingLock) { for (MarkerCreationFuture future : pendingMarkerCreationFutures) { String markerName = future.getMarkerName(); boolean exists = allMarkers.contains(markerName); if (!exists) { - allMarkers.add(markerName); - StringBuilder stringBuilder = fileMarkersMap.computeIfAbsent(fileIndex, k -> new StringBuilder(16384)); - stringBuilder.append(markerName); - stringBuilder.append('\n'); + if (conflictDetectionStrategy.isPresent()) { + try { + conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary(); + } catch (HoodieEarlyConflictDetectionException he) { + LOG.warn("Detected the write conflict due to a concurrent writer, " + + "failing the marker creation as the early conflict detection is enabled", he); + future.setResult(false); + continue; + } catch (Exception e) { + LOG.warn("Failed to execute early conflict detection." + e.getMessage()); + // When early conflict detection fails to execute, we still allow the marker creation + // to continue + addMarkerToMap(fileIndex, markerName); + future.setResult(true); + shouldFlushMarkers = true; + continue; + } + } + addMarkerToMap(fileIndex, markerName); + shouldFlushMarkers = true; } future.setResult(!exists); } @@ -216,7 +250,9 @@ public void processMarkerCreationRequests( isMarkerTypeWritten = true; } } - flushMarkersToFile(fileIndex); + if (shouldFlushMarkers) { + flushMarkersToFile(fileIndex); + } markFileAsAvailable(fileIndex); for (MarkerCreationFuture future : pendingMarkerCreationFutures) { @@ -267,6 +303,19 @@ private void syncMarkersFromFileSystem() { } } + /** + * Adds a new marker to the in-memory map. + * + * @param fileIndex Marker file index number. + * @param markerName Marker name. + */ + private void addMarkerToMap(int fileIndex, String markerName) { + allMarkers.add(markerName); + StringBuilder stringBuilder = fileMarkersMap.computeIfAbsent(fileIndex, k -> new StringBuilder(16384)); + stringBuilder.append(markerName); + stringBuilder.append('\n'); + } + /** * Writes marker type, "TIMELINE_SERVER_BASED", to file. */ diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/handlers/marker/TestMarkerBasedEarlyConflictDetectionRunnable.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/handlers/marker/TestMarkerBasedEarlyConflictDetectionRunnable.java new file mode 100644 index 0000000000000..f5cba27deba4b --- /dev/null +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/handlers/marker/TestMarkerBasedEarlyConflictDetectionRunnable.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline.service.handlers.marker; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.timeline.service.handlers.MarkerHandler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests {@link MarkerBasedEarlyConflictDetectionRunnable}. + */ +public class TestMarkerBasedEarlyConflictDetectionRunnable extends HoodieCommonTestHarness { + + private static final Logger LOG = LogManager.getLogger(TestMarkerBasedEarlyConflictDetectionRunnable.class); + + @BeforeEach + public void setUp() throws Exception { + initPath(); + } + + @AfterEach + public void tearDown() throws Exception { + Path path = new Path(basePath); + FileSystem fs = path.getFileSystem(new Configuration()); + fs.delete(path, true); + } + + /** + * Prepare dataset : + * $base_path/.hoodie/.temp/2016/001/MARKERS0 => 2016/b21adfa2-7013-4452-a565-4cc39fea5b73-0_4-17-21_001.parquet.marker.CREATE (same) + * 2016/4a266542-c7d5-426f-8fb8-fb85a2e88448-0_3-17-20_001.parquet.marker.CREATE + * /002/MARKERS0 => 2016/b21adfa2-7013-4452-a565-4cc39fea5b73-0_40-170-210_002.parquet.marker.MERGE (same) + * => 2016/1228caeb-4188-4e19-a18d-848e6f9b0448-0_55-55-425_002.parquet.marker.MERGE + *

+ *

+ * Run MarkerBasedEarlyConflictDetectionRunnable and find there is a conflict 2016/b21adfa2-7013-4452-a565-4cc39fea5b73-0 + */ + @Test + public void testMarkerConflictDetectionRunnable() throws IOException, InterruptedException { + + AtomicBoolean hasConflict = new AtomicBoolean(false); + FileSystem fs = new Path(basePath).getFileSystem(new Configuration()); + MarkerHandler markerHandler = mock(MarkerHandler.class); + String rootBaseMarkerDir = basePath + "/.hoodie/.temp"; + String partition = "2016"; + metaClient = HoodieTestUtils.init(new Configuration(), basePath, HoodieTableType.COPY_ON_WRITE); + + String oldInstant = "001"; + Set oldMarkers = Stream.of(partition + "/b21adfa2-7013-4452-a565-4cc39fea5b73-0_4-17-21_001.parquet.marker.CREATE", + partition + "/4a266542-c7d5-426f-8fb8-fb85a2e88448-0_3-17-20_001.parquet.marker.CREATE").collect(Collectors.toSet()); + prepareFiles(rootBaseMarkerDir, oldInstant, oldMarkers, fs); + + // here current markers and old markers have a common fileID b21adfa2-7013-4452-a565-4cc39fea5b73-0 + String currentInstantTime = "002"; + String currentMarkerDir = rootBaseMarkerDir + "/" + currentInstantTime; + Set currentMarkers = Stream.of(partition + "/b21adfa2-7013-4452-a565-4cc39fea5b73-0_40-170-210_002.parquet.marker.MERGE", + partition + "/1228caeb-4188-4e19-a18d-848e6f9b0448-0_55-55-425_002.parquet.marker.MERGE").collect(Collectors.toSet()); + prepareFiles(rootBaseMarkerDir, currentInstantTime, currentMarkers, fs); + + HashSet oldInstants = new HashSet<>(); + oldInstants.add(new HoodieInstant(false, "commit", oldInstant)); + when(markerHandler.getAllMarkers(currentMarkerDir)).thenReturn(currentMarkers); + + ScheduledExecutorService detectorExecutor = Executors.newSingleThreadScheduledExecutor(); + detectorExecutor.submit(new MarkerBasedEarlyConflictDetectionRunnable(hasConflict, markerHandler, currentMarkerDir, + basePath, fs, Long.MAX_VALUE, oldInstants, true)); + + detectorExecutor.shutdown(); + detectorExecutor.awaitTermination(60, TimeUnit.SECONDS); + + assertTrue(hasConflict.get()); + } + + private void prepareFiles(String baseMarkerDir, String instant, Set markers, FileSystem fs) throws IOException { + fs.create(new Path(basePath + "/.hoodie/" + instant + ".commit"), true); + String markerDir = baseMarkerDir + "/" + instant; + fs.mkdirs(new Path(markerDir)); + BufferedWriter out = new BufferedWriter(new FileWriter(markerDir + "/MARKERS0")); + markers.forEach(ele -> { + try { + out.write(ele); + out.write("\n"); + } catch (IOException e) { + // ignore here. + } + }); + + out.close(); + } +}