Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
Expand All @@ -65,7 +66,7 @@ public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, fin
this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
.retryPolicy(new BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
5000, lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP), lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP, DEFAULT_ZK_SESSION_TIMEOUT_MS))
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP, DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_MAX_RETRY_WAIT_TIME_IN_MILLIS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
Expand Down Expand Up @@ -104,6 +107,11 @@ public HoodieLockConfig.Builder withHiveTableName(String tableName) {
return this;
}

public HoodieLockConfig.Builder withHiveMetastoreURIs(String hiveMetastoreURIs) {
props.setProperty(HIVE_METASTORE_URI_PROP, hiveMetastoreURIs);
return this;
}

public HoodieLockConfig.Builder withZkQuorum(String zkQuorum) {
props.setProperty(ZK_CONNECT_URL_PROP, zkQuorum);
return this;
Expand Down Expand Up @@ -144,6 +152,11 @@ public HoodieLockConfig.Builder withRetryWaitTimeInMillis(Long retryWaitTimeInMi
return this;
}

public HoodieLockConfig.Builder withRetryMaxWaitTimeInMillis(Long retryMaxWaitTimeInMillis) {
props.setProperty(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP, String.valueOf(retryMaxWaitTimeInMillis));
return this;
}

public HoodieLockConfig.Builder withClientNumRetries(int clientNumRetries) {
props.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP, String.valueOf(clientNumRetries));
return this;
Expand Down Expand Up @@ -174,6 +187,8 @@ public HoodieLockConfig build() {
LOCK_ACQUIRE_NUM_RETRIES_PROP, DEFAULT_LOCK_ACQUIRE_NUM_RETRIES);
setDefaultOnCondition(props, !props.containsKey(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP, DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS);
setDefaultOnCondition(props, !props.containsKey(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP),
LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP, DEFAULT_LOCK_ACQUIRE_MAX_RETRY_WAIT_TIME_IN_MILLIS);
setDefaultOnCondition(props, !props.containsKey(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP),
LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP, DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES);
setDefaultOnCondition(props, !props.containsKey(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class LockConfiguration implements Serializable {
public static final String LOCK_PREFIX = "hoodie.writer.lock.";
public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX + "wait_time_ms_between_retry";
public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(5000L);
public static final String LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX + "max_wait_time_ms_between_retry";
public static final String DEFAULT_LOCK_ACQUIRE_MAX_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(5000L);
public static final String LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX + "client.wait_time_ms_between_retry";
public static final String DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(10000L);
public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP = LOCK_PREFIX + "num_retries";
Expand All @@ -45,16 +47,17 @@ public class LockConfiguration implements Serializable {
public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "hivemetastore.";
public static final String HIVE_DATABASE_NAME_PROP = HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "database";
public static final String HIVE_TABLE_NAME_PROP = HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "table";
public static final String HIVE_METASTORE_URI_PROP = HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "uris";
// Zookeeper configs for zk based locks
public static final String ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "zookeeper.";
public static final String ZK_BASE_PATH_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_base_path";
public static final String ZK_SESSION_TIMEOUT_MS_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_session_timeout_ms";
public static final String ZK_BASE_PATH_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "base_path";
public static final String ZK_SESSION_TIMEOUT_MS_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "session_timeout_ms";
public static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000;
public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_connection_timeout_ms";
public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "connection_timeout_ms";
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000;
public static final String ZK_CONNECT_URL_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "url";
public static final String ZK_PORT_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "port";
public static final String ZK_LOCK_KEY_PROP = LOCK_PREFIX + "lock_key";
public static final String ZK_LOCK_KEY_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "lock_key";

private final TypedProperties props;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private static TypedProperties getProperties() {
props.setProperty("hoodie.writer.lock.zookeeper.port", "2828");
props.setProperty("hoodie.writer.lock.wait_time_ms", "1200000");
props.setProperty("hoodie.writer.lock.num_retries", "10");
props.setProperty("hoodie.writer.lock.lock_key", "test_table");
props.setProperty("hoodie.writer.lock.zookeeper.lock_key", "test_table");
props.setProperty("hoodie.writer.lock.zookeeper.zk_base_path", "/test");
return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.TimeoutException;

import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
Expand All @@ -68,18 +69,19 @@
* using hive metastore APIs. Users need to have a HiveMetastore & Zookeeper cluster deployed to be able to use this lock.
*
*/
public class HiveMetastoreLockProvider implements LockProvider<LockResponse> {
public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse> {

private static final Logger LOG = LogManager.getLogger(HiveMetastoreLockProvider.class);
private static final Logger LOG = LogManager.getLogger(HiveMetastoreBasedLockProvider.class);

private final String databaseName;
private final String tableName;
private final String hiveMetastoreUris;
private IMetaStoreClient hiveClient;
private volatile LockResponse lock = null;
protected LockConfiguration lockConfiguration;
ExecutorService executor = Executors.newSingleThreadExecutor();

public HiveMetastoreLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
public HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
this(lockConfiguration);
try {
HiveConf hiveConf = new HiveConf();
Expand All @@ -91,16 +93,17 @@ public HiveMetastoreLockProvider(final LockConfiguration lockConfiguration, fina
}
}

public HiveMetastoreLockProvider(final LockConfiguration lockConfiguration, final IMetaStoreClient metaStoreClient) {
public HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration, final IMetaStoreClient metaStoreClient) {
this(lockConfiguration);
this.hiveClient = metaStoreClient;
}

HiveMetastoreLockProvider(final LockConfiguration lockConfiguration) {
HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration) {
checkRequiredProps(lockConfiguration);
this.lockConfiguration = lockConfiguration;
this.databaseName = this.lockConfiguration.getConfig().getString(HIVE_DATABASE_NAME_PROP);
this.tableName = this.lockConfiguration.getConfig().getString(HIVE_TABLE_NAME_PROP);
this.hiveMetastoreUris = this.lockConfiguration.getConfig().getOrDefault(HIVE_METASTORE_URI_PROP, "").toString();
}

@Override
Expand Down Expand Up @@ -206,6 +209,9 @@ private void checkRequiredProps(final LockConfiguration lockConfiguration) {
}

private void setHiveLockConfs(HiveConf hiveConf) {
if (!StringUtils.isNullOrEmpty(this.hiveMetastoreUris)) {
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, this.hiveMetastoreUris);
}
hiveConf.set("hive.support.concurrency", "true");
hiveConf.set("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager");
hiveConf.set("hive.lock.numretries", lockConfiguration.getConfig().getString(LOCK_ACQUIRE_NUM_RETRIES_PROP));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* /metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java#L2892
* Unless this is set, we cannot use HiveMetastore server in tests for locking use-cases.
*/
public class TestHiveMetastoreLockProvider {
public class TestHiveMetastoreBasedLockProvider {

private static Connection connection;
private static LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, "testdb");
Expand Down Expand Up @@ -86,7 +86,7 @@ public static void cleanUpClass() {

@Test
public void testAcquireLock() throws Exception {
HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent));
Expand All @@ -106,7 +106,7 @@ public void testAcquireLock() throws Exception {

@Test
public void testUnlock() throws Exception {
HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent));
Expand All @@ -119,7 +119,7 @@ public void testUnlock() throws Exception {

@Test
public void testReentrantLock() throws Exception {
HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent));
Expand All @@ -135,7 +135,7 @@ public void testReentrantLock() throws Exception {

@Test
public void testUnlockWithoutLock() {
HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
lockProvider.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ protected static TypedProperties prepareMultiWriterProps(String propsFileName) t
props.setProperty("hoodie.writer.lock.zookeeper.port", "2828");
props.setProperty("hoodie.writer.lock.wait_time_ms", "1200000");
props.setProperty("hoodie.writer.lock.num_retries", "10");
props.setProperty("hoodie.writer.lock.lock_key", "test_table");
props.setProperty("hoodie.writer.lock.zookeeper.lock_key", "test_table");
props.setProperty("hoodie.writer.lock.zookeeper.zk_base_path", "/test");

UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
Expand Down