diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java index f2c01abefdd80..f13a8ac076c2b 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java @@ -42,7 +42,7 @@ import org.apache.hudi.common.lock.LockState; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.config.AWSLockConfiguration; +import org.apache.hudi.config.DynamoDbBasedLockConfig; import org.apache.hudi.exception.HoodieLockException; import org.apache.log4j.LogManager; @@ -80,8 +80,8 @@ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, fina public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, AmazonDynamoDB dynamoDB) { checkRequiredProps(lockConfiguration); this.lockConfiguration = lockConfiguration; - this.tableName = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key()); - this.dynamoDBPartitionKey = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key()); + this.tableName = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key()); + this.dynamoDBPartitionKey = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key()); long leaseDuration = Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY)); if (dynamoDB == null) { dynamoDB = getDynamoDBClient(); @@ -155,7 +155,7 @@ public LockItem getLock() { } private AmazonDynamoDB getDynamoDBClient() { - String region = this.lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key()); + String region = this.lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key()); String endpointURL = RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX); AwsClientBuilder.EndpointConfiguration dynamodbEndpoint = new AwsClientBuilder.EndpointConfiguration(endpointURL, region); @@ -166,7 +166,7 @@ private AmazonDynamoDB getDynamoDBClient() { } private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName) { - String billingMode = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key()); + String billingMode = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key()); KeySchemaElement partitionKeyElement = new KeySchemaElement(); partitionKeyElement.setAttributeName(DYNAMODB_ATTRIBUTE_NAME); partitionKeyElement.setKeyType(KeyType.HASH); @@ -182,14 +182,14 @@ private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName createTableRequest.setBillingMode(billingMode); if (billingMode.equals(BillingMode.PROVISIONED.name())) { createTableRequest.setProvisionedThroughput(new ProvisionedThroughput() - .withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key()))) - .withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key())))); + .withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key()))) + .withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key())))); } dynamoDB.createTable(createTableRequest); LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to be active"); try { - TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000); + TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000); } catch (TableUtils.TableNeverTransitionedToStateException e) { throw new HoodieLockException("Created dynamoDB table never transits to active", e); } catch (InterruptedException e) { @@ -199,14 +199,14 @@ private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName } private void checkRequiredProps(final LockConfiguration config) { - ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key()) != null); - ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key()) != null); - ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key()) != null); - ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key()) != null); - config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name()); - config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "20"); - config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "10"); - config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(), "600000"); + ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key()) != null); + ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key()) != null); + ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key()) != null); + ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key()) != null); + config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name()); + config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(), "20"); + config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "10"); + config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(), "600000"); } private String generateLogSuffixString() { diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java similarity index 87% rename from hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java rename to hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java index b6ea8232c1f80..373153330f265 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java +++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java @@ -18,7 +18,10 @@ package org.apache.hudi.config; +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; @@ -27,7 +30,15 @@ import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX; -public class AWSLockConfiguration { +/** + * Hoodie Configs for Locks. + */ +@ConfigClassProperty(name = "DynamoDB based Locks Configurations", + groupName = ConfigGroups.Names.WRITE_CLIENT, + description = "Configs that control DynamoDB based locking mechanisms required for concurrency control " + + " between writers to a Hudi table. Concurrency between Hudi's own table services " + + " are auto managed internally.") +public class DynamoDbBasedLockConfig extends HoodieConfig { // configs for DynamoDb based locks public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "dynamodb."; diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java index 405caafa1a90c..623704232e419 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java +++ b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java @@ -29,12 +29,12 @@ import java.util.Properties; import javax.annotation.concurrent.Immutable; -import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE; -import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY; -import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY; -import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_REGION; -import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME; -import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY; +import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE; +import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY; +import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY; +import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION; +import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME; +import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY; /** * Configurations used by the AWS credentials and AWS DynamoDB based lock. diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java index 8dc28328274fb..d2ab0375e050c 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java @@ -27,7 +27,7 @@ import com.amazonaws.services.dynamodbv2.model.BillingMode; import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider; import org.apache.hudi.common.config.LockConfiguration; -import org.apache.hudi.config.AWSLockConfiguration; +import org.apache.hudi.config.DynamoDbBasedLockConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -54,20 +54,20 @@ public class ITTestDynamoDBBasedLockProvider { @BeforeAll public static void setup() throws InterruptedException { Properties properties = new Properties(); - properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name()); + properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name()); // properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX); - properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key(), "testKey"); - properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key(), REGION); + properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(), "testKey"); + properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(), REGION); properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000"); - properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "0"); - properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "0"); + properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(), "0"); + properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "0"); lockConfiguration = new LockConfiguration(properties); dynamoDb = getDynamoClientWithLocalEndpoint(); } @Test public void testAcquireLock() { - lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); + lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb); Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); @@ -76,7 +76,7 @@ public void testAcquireLock() { @Test public void testUnlock() { - lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); + lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb); Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); @@ -87,7 +87,7 @@ public void testUnlock() { @Test public void testReentrantLock() { - lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); + lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb); Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); @@ -98,7 +98,7 @@ public void testReentrantLock() { @Test public void testUnlockWithoutLock() { - lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); + lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb); dynamoDbBasedLockProvider.unlock(); }