-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-1575] Early Conflict Detection For Multi-writer #6133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
79feeb3
6331439
de69c0e
fcaaf9d
553fb00
dbe3db8
66b7d1b
64819e4
678cce4
5842dcf
645766d
e23ab61
5d0d05f
465536f
c6bc22d
7d8f3bc
fc5927a
3bde14b
ea2719e
844b10a
71e0d1e
374212b
8dfdb4a
6fc5bf1
316e5ae
0b74647
c3403d7
345a9df
ffd8315
6ec57fe
1455ab1
e13ebb9
8a402c4
a3d0a47
3369e5e
b97bb16
1ccecb4
869baf7
0447a71
6fdf901
c973c81
c412635
6bb1974
6d19d03
b90ea04
3f2118a
be0d5b4
aad218a
1b837ec
c34fb52
67b3892
a2980b7
0579f9b
2976167
7344fab
501e47f
46e80ae
0a77616
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics; | ||
| import org.apache.hudi.common.config.LockConfiguration; | ||
| import org.apache.hudi.common.config.SerializableConfiguration; | ||
| import org.apache.hudi.common.config.TypedProperties; | ||
| import org.apache.hudi.common.lock.LockProvider; | ||
| import org.apache.hudi.common.util.ReflectionUtils; | ||
| import org.apache.hudi.config.HoodieLockConfig; | ||
|
|
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) { | |
| metrics = new HoodieLockMetrics(writeConfig); | ||
| } | ||
|
|
||
| /** | ||
| * Try to have a lock at partitionPath + fileID level for different write handler. | ||
| * @param writeConfig | ||
| * @param fs | ||
| * @param partitionPath | ||
| * @param fileId | ||
| */ | ||
| public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String partitionPath, String fileId) { | ||
yihua marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| this.writeConfig = writeConfig; | ||
| this.hadoopConf = new SerializableConfiguration(fs.getConf()); | ||
| TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" + fileId); | ||
| this.lockConfiguration = new LockConfiguration(props); | ||
| maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, | ||
| Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())); | ||
| maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, | ||
| Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())); | ||
yihua marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /** | ||
| * rebuild lock related configs, only support ZK related lock for now. | ||
| */ | ||
| private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, String key) { | ||
yihua marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| TypedProperties props = new TypedProperties(writeConfig.getProps()); | ||
| props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key); | ||
|
||
| return props; | ||
| } | ||
|
|
||
| public void lock() { | ||
| if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { | ||
| LockProvider lockProvider = getLockProvider(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,9 @@ | |
|
|
||
| import org.apache.hudi.avro.HoodieAvroUtils; | ||
| import org.apache.hudi.client.WriteStatus; | ||
| import org.apache.hudi.client.transaction.TransactionManager; | ||
| import org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy; | ||
| import org.apache.hudi.common.conflict.detection.HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy; | ||
| import org.apache.hudi.common.engine.TaskContextSupplier; | ||
| import org.apache.hudi.common.fs.FSUtils; | ||
| import org.apache.hudi.common.model.FileSlice; | ||
|
|
@@ -28,6 +31,7 @@ | |
| import org.apache.hudi.common.model.HoodieRecordPayload; | ||
| import org.apache.hudi.common.model.IOType; | ||
| import org.apache.hudi.common.table.log.HoodieLogFormat; | ||
| import org.apache.hudi.common.table.timeline.HoodieInstant; | ||
| import org.apache.hudi.common.util.HoodieTimer; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.common.util.ReflectionUtils; | ||
|
|
@@ -37,6 +41,7 @@ | |
| import org.apache.hudi.io.storage.HoodieFileWriter; | ||
| import org.apache.hudi.io.storage.HoodieFileWriterFactory; | ||
| import org.apache.hudi.table.HoodieTable; | ||
| import org.apache.hudi.table.marker.WriteMarkers; | ||
| import org.apache.hudi.table.marker.WriteMarkersFactory; | ||
|
|
||
| import org.apache.avro.Schema; | ||
|
|
@@ -51,6 +56,8 @@ | |
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; | ||
|
|
||
|
|
@@ -187,8 +194,54 @@ protected Path makeNewFilePath(String partitionPath, String fileName) { | |
| * @param partitionPath Partition path | ||
| */ | ||
| protected void createMarkerFile(String partitionPath, String dataFileName) { | ||
| WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime) | ||
| .create(partitionPath, dataFileName, getIOType()); | ||
| WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime); | ||
| // do early conflict detection before create markers. | ||
| if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() | ||
| && config.isEarlyConflictDetectionEnable()) { | ||
| HoodieEarlyConflictDetectionStrategy earlyConflictDetectionStrategy = config.getEarlyConflictDetectionStrategy(); | ||
| if (earlyConflictDetectionStrategy instanceof HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy) { | ||
| createMarkerWithTransaction(earlyConflictDetectionStrategy, writeMarkers, partitionPath, dataFileName); | ||
| } else { | ||
| createMarkerWithEarlyConflictDetection(earlyConflictDetectionStrategy, writeMarkers, partitionPath, dataFileName); | ||
| } | ||
| } else { | ||
| // create marker directly | ||
| writeMarkers.create(partitionPath, dataFileName, getIOType()); | ||
| } | ||
yihua marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| private Option<Path> createMarkerWithEarlyConflictDetection(HoodieEarlyConflictDetectionStrategy resolutionStrategy, | ||
| WriteMarkers writeMarkers, | ||
| String partitionPath, | ||
| String dataFileName) { | ||
| Set<HoodieInstant> completedCommitInstants = hoodieTable.getMetaClient().getActiveTimeline() | ||
|
||
| .getCommitsTimeline() | ||
| .filterCompletedInstants() | ||
| .getInstants() | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| return writeMarkers.createWithEarlyConflictDetection(partitionPath, dataFileName, getIOType(), false, resolutionStrategy, completedCommitInstants, config, fileId); | ||
|
|
||
| } | ||
|
|
||
| private Option<Path> createMarkerWithTransaction(HoodieEarlyConflictDetectionStrategy resolutionStrategy, | ||
| WriteMarkers writeMarkers, | ||
| String partitionPath, | ||
| String dataFileName) { | ||
| TransactionManager txnManager = new TransactionManager(config, fs, partitionPath, fileId); | ||
| try { | ||
| // Need to do transaction before create marker file when using early conflict detection | ||
| txnManager.beginTransaction(partitionPath, fileId); | ||
| return createMarkerWithEarlyConflictDetection(resolutionStrategy, writeMarkers, partitionPath, dataFileName); | ||
|
|
||
| } catch (Exception e) { | ||
| LOG.warn("Exception occurs during create marker file in early conflict detection mode."); | ||
| throw e; | ||
| } finally { | ||
| // End transaction after created marker file. | ||
| txnManager.endTransaction(partitionPath + "/" + fileId); | ||
| txnManager.close(); | ||
| } | ||
| } | ||
|
|
||
| public Schema getWriterSchemaWithMetaFields() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hudi.table.marker; | ||
|
|
||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hudi.common.conflict.detection.HoodieDirectMarkerBasedEarlyConflictDetectionStrategy; | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient; | ||
| import org.apache.hudi.common.table.timeline.HoodieInstant; | ||
| import org.apache.hudi.exception.HoodieEarlyConflictDetectionException; | ||
| import org.apache.hudi.exception.HoodieIOException; | ||
| import org.apache.log4j.LogManager; | ||
| import org.apache.log4j.Logger; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ConcurrentModificationException; | ||
| import java.util.Set; | ||
|
|
||
| /** | ||
| * This strategy is used for direct marker writers, trying to do early conflict detection. | ||
| * It will use fileSystem api like list and exist directly to check if there is any marker file conflict. | ||
| */ | ||
| public class SimpleDirectMarkerBasedEarlyConflictDetectionStrategy extends HoodieDirectMarkerBasedEarlyConflictDetectionStrategy { | ||
| private static final Logger LOG = LogManager.getLogger(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class); | ||
|
|
||
| @Override | ||
| public boolean hasMarkerConflict(String basePath, FileSystem fs, String partitionPath, String fileId, String instantTime, | ||
| Set<HoodieInstant> completedCommitInstants, HoodieTableMetaClient metaClient) { | ||
| try { | ||
| return checkMarkerConflict(basePath, partitionPath, fileId, fs, instantTime) || checkCommitConflict(metaClient, completedCommitInstants, fileId); | ||
| } catch (IOException e) { | ||
| LOG.warn("Exception occurs during create marker file in eager conflict detection mode."); | ||
| throw new HoodieIOException("Exception occurs during create marker file in eager conflict detection mode.", e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void resolveMarkerConflict(String basePath, String partitionPath, String dataFileName) { | ||
| throw new HoodieEarlyConflictDetectionException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes")); | ||
yihua marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.