Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
79feeb3
need more test
Jul 18, 2022
6331439
tested
Jul 18, 2022
de69c0e
tested
Jul 18, 2022
fcaaf9d
tested
Jul 18, 2022
553fb00
fix liences
Jul 18, 2022
dbe3db8
fix config
Jul 18, 2022
66b7d1b
add uts
Jul 18, 2022
64819e4
resolve conflict
Jul 22, 2022
678cce4
merge from master && resolve conflicts
Oct 19, 2022
5842dcf
merge from master && resolve conflicts
Oct 19, 2022
645766d
fix checkstyle
Oct 19, 2022
e23ab61
merge from master
Oct 25, 2022
5d0d05f
Resolve conflict with master
yihua Nov 11, 2022
465536f
Merge branch 'master' into early-conflict-detection-based-on-occ-simp…
yihua Nov 11, 2022
c6bc22d
refact abstraction
Nov 18, 2022
7d8f3bc
refact abstraction
Nov 21, 2022
fc5927a
address comments
Nov 21, 2022
3bde14b
address comments
Nov 21, 2022
ea2719e
address comments
Nov 21, 2022
844b10a
address comments
Nov 21, 2022
71e0d1e
address comments
Nov 21, 2022
374212b
neeed to fix reflection issue
Nov 21, 2022
8dfdb4a
address comments
Nov 21, 2022
6fc5bf1
address comments
Nov 21, 2022
316e5ae
address comments
Nov 21, 2022
0b74647
address comments
Nov 21, 2022
c3403d7
address comments
Nov 21, 2022
345a9df
address comments
Nov 21, 2022
ffd8315
address comments
Nov 21, 2022
6ec57fe
address comments
Nov 21, 2022
1455ab1
address comments
Nov 22, 2022
e13ebb9
address comments
Nov 22, 2022
8a402c4
address comments
Nov 22, 2022
a3d0a47
address comments
Nov 22, 2022
3369e5e
address comments
Nov 22, 2022
b97bb16
address comments
Nov 22, 2022
1ccecb4
address comments
Nov 22, 2022
869baf7
address comments
Nov 22, 2022
0447a71
address comments
Nov 23, 2022
6fdf901
Merge branch 'master' into early-conflict-detection-based-on-occ-simp…
yihua Jan 6, 2023
c973c81
Fix errors after rebase
yihua Jan 6, 2023
c412635
Improve abstraction for lock and transaction manager, rename configs,…
yihua Jan 9, 2023
6bb1974
Replace checker naming
yihua Jan 9, 2023
6d19d03
address comments
Jan 9, 2023
b90ea04
merge from master and resolve conflict
Jan 9, 2023
3f2118a
address comments
Jan 9, 2023
be0d5b4
address comments
Jan 9, 2023
aad218a
address comments
Jan 10, 2023
1b837ec
address comments
Jan 10, 2023
c34fb52
Address review comments
yihua Jan 19, 2023
67b3892
Fix build
yihua Jan 19, 2023
a2980b7
Fix diverging changes from master and nits
yihua Jan 20, 2023
0579f9b
Fix config inference
yihua Jan 20, 2023
2976167
Add and revise javadocs, make strategy class names shorter
yihua Jan 20, 2023
7344fab
Revise other names
yihua Jan 20, 2023
501e47f
Improve async timeline-server-based conflict detection
yihua Jan 20, 2023
46e80ae
Add validation of conflict detection strategy to be compatible with t…
yihua Jan 23, 2023
0a77616
Merge branch 'master' into early-conflict-detection-based-on-occ-simp…
yihua Jan 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ public void startServer() throws IOException {
.markerParallelism(writeConfig.getMarkersDeleteParallelism());
}

if (writeConfig.isEarlyConflictDetectionEnable()) {
timelineServiceConfBuilder.earlyConflictDetectionEnable(true)
.earlyConflictDetectionStrategy(writeConfig.getEarlyConflictDetectionStrategyClassName())
.earlyConflictDetectionCheckCommitConflict(writeConfig.earlyConflictDetectionCheckCommitConflict())
.asyncConflictDetectorBatchIntervalMs(writeConfig.getAsyncConflictDetectorBatchIntervalMs())
.asyncConflictDetectorBatchPeriodMs(writeConfig.getAsyncConflictDetectorPeriodMs())
.earlyConflictDetectionMaxAllowableHeartbeatIntervalInMs(writeConfig.getHoodieClientHeartbeatIntervalInMs());
}

server = new TimelineService(context, hadoopConf.newCopy(), timelineServiceConfBuilder.build(),
FSUtils.getFs(basePath, hadoopConf.newCopy()), viewManager);
serverPort = server.startService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@

package org.apache.hudi.client.heartbeat;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;

import static org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils.getLastHeartbeatTime;

/**
* Helper class to delete heartbeat for completed or failed instants with expired heartbeats.
*/
Expand Down Expand Up @@ -89,7 +92,7 @@ public static void abortIfHeartbeatExpired(String instantTime, HoodieTable table
try {
if (config.getFailedWritesCleanPolicy().isLazy() && heartbeatClient.isHeartbeatExpired(instantTime)) {
throw new HoodieException("Heartbeat for instant " + instantTime + " has expired, last heartbeat "
+ HoodieHeartbeatClient.getLastHeartbeatTime(table.getMetaClient().getFs(), config.getBasePath(), instantTime));
+ getLastHeartbeatTime(table.getMetaClient().getFs(), config.getBasePath(), instantTime));
}
} catch (IOException io) {
throw new HoodieException("Unable to read heartbeat", io);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

package org.apache.hudi.client.heartbeat;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieHeartbeatException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
Expand All @@ -37,9 +39,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Timer;
import java.util.TimerTask;
import java.util.stream.Collectors;

import static org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils.getLastHeartbeatTime;

/**
* This class creates heartbeat for hudi client. This heartbeat is used to ascertain whether the running job is or not.
Expand Down Expand Up @@ -205,16 +209,6 @@ public void stop() throws HoodieException {
instantToHeartbeatMap.values().forEach(heartbeat -> stop(heartbeat.getInstantTime()));
}

public static Long getLastHeartbeatTime(FileSystem fs, String basePath, String instantTime) throws IOException {
Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + instantTime);
if (fs.exists(heartbeatFilePath)) {
return fs.getFileStatus(heartbeatFilePath).getModificationTime();
} else {
// NOTE : This can happen when a writer is upgraded to use lazy cleaning and the last write had failed
return 0L;
}
}

public static Boolean heartbeatExists(FileSystem fs, String basePath, String instantTime) throws IOException {
Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + instantTime);
if (fs.exists(heartbeatFilePath)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.client.transaction;

import org.apache.hudi.client.transaction.lock.LockManager;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;

import org.apache.hadoop.fs.FileSystem;

import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;

/**
* This class allows clients to start and end transactions for creating direct marker, used by
* `SimpleTransactionDirectMarkerBasedDetectionStrategy`, when early conflict
* detection is enabled. Anything done between a start and end transaction is guaranteed to be
* atomic.
*/
public class DirectMarkerTransactionManager extends TransactionManager {
private final String filePath;

public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) {
super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)),
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl());
this.filePath = partitionPath + "/" + fileId;
}

public void beginTransaction(String newTxnOwnerInstantTime) {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction starting for " + newTxnOwnerInstantTime + " and " + filePath);
lockManager.lock();

reset(currentTxnOwnerInstant, Option.of(getInstant(newTxnOwnerInstantTime)), Option.empty());
LOG.info("Transaction started for " + newTxnOwnerInstantTime + " and " + filePath);
}
}

public void endTransaction(String currentTxnOwnerInstantTime) {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstantTime
+ " for " + filePath);
if (reset(Option.of(getInstant(currentTxnOwnerInstantTime)), Option.empty(), Option.empty())) {
lockManager.unlock();
LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstantTime
+ " for " + filePath);
}
}
}

/**
* Rebuilds lock related configs. Only support ZK related lock for now.
*
* @param writeConfig Hudi write configs.
* @param partitionPath Relative partition path.
* @param fileId File ID.
* @return Updated lock related configs.
*/
private static TypedProperties createUpdatedLockProps(
HoodieWriteConfig writeConfig, String partitionPath, String fileId) {
if (!ZookeeperBasedLockProvider.class.getName().equals(writeConfig.getLockProviderClass())) {
throw new HoodieNotSupportedException("Only Support ZK-based lock for DirectMarkerTransactionManager now.");
}
TypedProperties props = new TypedProperties(writeConfig.getProps());
props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, partitionPath + "/" + fileId);
return props;
}

private HoodieInstant getInstant(String instantTime) {
return new HoodieInstant(HoodieInstant.State.INFLIGHT, EMPTY_STRING, instantTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.hudi.client.transaction;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.transaction.lock.LockManager;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -34,15 +35,20 @@
*/
public class TransactionManager implements Serializable {

private static final Logger LOG = LogManager.getLogger(TransactionManager.class);
private final LockManager lockManager;
private final boolean isOptimisticConcurrencyControlEnabled;
private Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
protected static final Logger LOG = LogManager.getLogger(TransactionManager.class);
protected final LockManager lockManager;
protected final boolean isOptimisticConcurrencyControlEnabled;
protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();

public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
this.lockManager = new LockManager(config, fs);
this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
this(new LockManager(config, fs),
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl());
}

protected TransactionManager(LockManager lockManager, boolean isOptimisticConcurrencyControlEnabled) {
this.lockManager = lockManager;
this.isOptimisticConcurrencyControlEnabled = isOptimisticConcurrencyControlEnabled;
}

public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
Expand All @@ -67,9 +73,9 @@ public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
}
}

private synchronized boolean reset(Option<HoodieInstant> callerInstant,
Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
protected synchronized boolean reset(Option<HoodieInstant> callerInstant,
Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant.get().equals(callerInstant.get())) {
this.currentTxnOwnerInstant = newTxnOwnerInstant;
this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieLockConfig;
Expand Down Expand Up @@ -52,9 +53,13 @@ public class LockManager implements Serializable, AutoCloseable {
private volatile LockProvider lockProvider;

public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
this(writeConfig, fs, writeConfig.getProps());
}

public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, TypedProperties lockProps) {
this.writeConfig = writeConfig;
this.hadoopConf = new SerializableConfiguration(fs.getConf());
this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
this.lockConfiguration = new LockConfiguration(lockProps);
maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
Expand Down
Loading