diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml new file mode 100644 index 000000000000..19ceed7fce7b --- /dev/null +++ b/hudi-aws/pom.xml @@ -0,0 +1,194 @@ + + + + + hudi + org.apache.hudi + 0.10.0-SNAPSHOT + + 4.0.0 + + hudi-aws + 0.10.0-SNAPSHOT + + hudi-aws + jar + + + 1.15.0 + + + + + + org.apache.hudi + hudi-common + ${project.version} + + + org.apache.hudi + hudi-client-common + ${project.version} + + + + + log4j + log4j + + + + + org.apache.hadoop + hadoop-common + tests + test + + + org.mortbay.jetty + * + + + javax.servlet.jsp + * + + + javax.servlet + * + + + + + + com.amazonaws + dynamodb-lock-client + ${dynamodb.lockclient.version} + + + com.amazonaws + aws-java-sdk-dynamodb + ${aws.sdk.version} + + + io.netty + * + + + + + com.amazonaws + aws-java-sdk-core + ${aws.sdk.version} + + + + + org.junit.jupiter + junit-jupiter-api + test + + + + + + + org.jacoco + jacoco-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + io.fabric8 + docker-maven-plugin + + + prepare-it-database + pre-integration-test + + start + + + + + amazon/dynamodb-local:${dynamodb-local.version} + it-database + + + ${dynamodb-local.port}:${dynamodb-local.port} + + + + ${dynamodb-local.endpoint}/shell/ + + + + + + + + + + remove-it-database + post-integration-test + + stop + + + + + + org.apache.rat + apache-rat-plugin + + + + + + src/main/resources + + + src/test/resources + + + + diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java new file mode 100644 index 000000000000..631b0fa8d534 --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.credentials; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProviderChain; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Factory class for Hoodie AWSCredentialsProvider. + */ +public class HoodieAWSCredentialsProviderFactory { + public static AWSCredentialsProvider getAwsCredentialsProvider(Properties props) { + return getAwsCredentialsProviderChain(props); + } + + private static AWSCredentialsProvider getAwsCredentialsProviderChain(Properties props) { + List providers = new ArrayList<>(); + providers.add(new HoodieConfigAWSCredentialsProvider(props)); + providers.add(new DefaultAWSCredentialsProviderChain()); + AWSCredentialsProviderChain providerChain = new AWSCredentialsProviderChain(providers); + providerChain.setReuseLastProvider(true); + return providerChain; + } +} diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java new file mode 100644 index 000000000000..4e9cf383906a --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.credentials; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.BasicSessionCredentials; +import org.apache.hudi.config.HoodieAWSConfig; +import org.apache.hudi.common.util.StringUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Properties; + +/** + * Credentials provider which fetches AWS access key from Hoodie config. + */ +public class HoodieConfigAWSCredentialsProvider implements AWSCredentialsProvider { + + private static final Logger LOG = LogManager.getLogger(HoodieConfigAWSCredentialsProvider.class); + + private AWSCredentials awsCredentials; + + public HoodieConfigAWSCredentialsProvider(Properties props) { + String accessKey = props.getProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key()); + String secretKey = props.getProperty(HoodieAWSConfig.AWS_SECRET_KEY.key()); + String sessionToken = props.getProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key()); + + if (StringUtils.isNullOrEmpty(accessKey) || StringUtils.isNullOrEmpty(secretKey)) { + LOG.debug("AWS access key or secret key not found in the Hudi configuration. " + + "Use default AWS credentials"); + } else { + this.awsCredentials = createCredentials(accessKey, secretKey, sessionToken); + } + } + + private static AWSCredentials createCredentials(String accessKey, String secretKey, + String sessionToken) { + return (sessionToken == null) + ? new BasicAWSCredentials(accessKey, secretKey) + : new BasicSessionCredentials(accessKey, secretKey, sessionToken); + } + + @Override + public AWSCredentials getCredentials() { + return this.awsCredentials; + } + + @Override + public void refresh() { + + } +} diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java new file mode 100644 index 000000000000..f2c01abefdd8 --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.transaction.lock; + +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.services.dynamodbv2.AcquireLockOptions; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions; +import com.amazonaws.services.dynamodbv2.LockItem; +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.BillingMode; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; +import com.amazonaws.services.dynamodbv2.util.TableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.lock.LockProvider; +import org.apache.hudi.common.lock.LockState; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.AWSLockConfiguration; +import org.apache.hudi.exception.HoodieLockException; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.NotThreadSafe; + +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; + +/** + * A DynamoDB based lock. This {@link LockProvider} implementation allows to lock table operations + * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use this lock. + */ +@NotThreadSafe +public class DynamoDBBasedLockProvider implements LockProvider { + + private static final Logger LOG = LogManager.getLogger(DynamoDBBasedLockProvider.class); + + private static final String DYNAMODB_ATTRIBUTE_NAME = "key"; + + private final AmazonDynamoDBLockClient client; + private final String tableName; + private final String dynamoDBPartitionKey; + protected LockConfiguration lockConfiguration; + private volatile LockItem lock; + + public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) { + this(lockConfiguration, conf, null); + } + + public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, AmazonDynamoDB dynamoDB) { + checkRequiredProps(lockConfiguration); + this.lockConfiguration = lockConfiguration; + this.tableName = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key()); + this.dynamoDBPartitionKey = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key()); + long leaseDuration = Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY)); + if (dynamoDB == null) { + dynamoDB = getDynamoDBClient(); + } + // build the dynamoDb lock client + this.client = new AmazonDynamoDBLockClient( + AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName) + .withTimeUnit(TimeUnit.MILLISECONDS) + .withLeaseDuration(leaseDuration) + .withHeartbeatPeriod(leaseDuration / 3) + .withCreateHeartbeatBackgroundThread(true) + .build()); + + if (!this.client.lockTableExists()) { + createLockTableInDynamoDB(dynamoDB, tableName); + } + } + + @Override + public boolean tryLock(long time, TimeUnit unit) { + LOG.info(generateLogStatement(LockState.ACQUIRING, generateLogSuffixString())); + try { + lock = client.acquireLock(AcquireLockOptions.builder(dynamoDBPartitionKey) + .withAdditionalTimeToWaitForLock(time) + .withTimeUnit(TimeUnit.MILLISECONDS) + .build()); + LOG.info(generateLogStatement(LockState.ACQUIRED, generateLogSuffixString())); + } catch (InterruptedException e) { + throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, generateLogSuffixString()), e); + } catch (LockNotGrantedException e) { + return false; + } + return lock != null && !lock.isExpired(); + } + + @Override + public void unlock() { + try { + LOG.info(generateLogStatement(LockState.RELEASING, generateLogSuffixString())); + if (lock == null) { + return; + } + if (!client.releaseLock(lock)) { + LOG.warn("The lock has already been stolen"); + } + lock = null; + LOG.info(generateLogStatement(LockState.RELEASED, generateLogSuffixString())); + } catch (Exception e) { + throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE, generateLogSuffixString()), e); + } + } + + @Override + public void close() { + try { + if (lock != null) { + if (!client.releaseLock(lock)) { + LOG.warn("The lock has already been stolen"); + } + lock = null; + } + this.client.close(); + } catch (Exception e) { + LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE, generateLogSuffixString())); + } + } + + @Override + public LockItem getLock() { + return lock; + } + + private AmazonDynamoDB getDynamoDBClient() { + String region = this.lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key()); + String endpointURL = RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX); + AwsClientBuilder.EndpointConfiguration dynamodbEndpoint = + new AwsClientBuilder.EndpointConfiguration(endpointURL, region); + return AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration(dynamodbEndpoint) + .withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig())) + .build(); + } + + private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName) { + String billingMode = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key()); + KeySchemaElement partitionKeyElement = new KeySchemaElement(); + partitionKeyElement.setAttributeName(DYNAMODB_ATTRIBUTE_NAME); + partitionKeyElement.setKeyType(KeyType.HASH); + + List keySchema = new ArrayList<>(); + keySchema.add(partitionKeyElement); + + Collection attributeDefinitions = new ArrayList<>(); + attributeDefinitions.add(new AttributeDefinition().withAttributeName(DYNAMODB_ATTRIBUTE_NAME).withAttributeType(ScalarAttributeType.S)); + + CreateTableRequest createTableRequest = new CreateTableRequest(tableName, keySchema); + createTableRequest.setAttributeDefinitions(attributeDefinitions); + createTableRequest.setBillingMode(billingMode); + if (billingMode.equals(BillingMode.PROVISIONED.name())) { + createTableRequest.setProvisionedThroughput(new ProvisionedThroughput() + .withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key()))) + .withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key())))); + } + dynamoDB.createTable(createTableRequest); + + LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to be active"); + try { + TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000); + } catch (TableUtils.TableNeverTransitionedToStateException e) { + throw new HoodieLockException("Created dynamoDB table never transits to active", e); + } catch (InterruptedException e) { + throw new HoodieLockException("Thread interrupted while waiting for dynamoDB table to turn active", e); + } + LOG.info("Created dynamoDB table " + tableName); + } + + private void checkRequiredProps(final LockConfiguration config) { + ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key()) != null); + ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key()) != null); + ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key()) != null); + ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key()) != null); + config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name()); + config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "20"); + config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "10"); + config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(), "600000"); + } + + private String generateLogSuffixString() { + return StringUtils.join("DynamoDb table = ", tableName, ", partition key = ", dynamoDBPartitionKey); + } + + protected String generateLogStatement(LockState state, String suffix) { + return StringUtils.join(state.name(), " lock at ", suffix); + } +} diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java b/hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java new file mode 100644 index 000000000000..cb52f66a8427 --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.util.Option; + +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.services.dynamodbv2.model.BillingMode; + +import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX; + +public class AWSLockConfiguration { + + // configs for DynamoDb based locks + public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "dynamodb."; + + public static final ConfigProperty DYNAMODB_LOCK_TABLE_NAME = ConfigProperty + .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table") + .noDefaultValue() + .withDocumentation("For DynamoDB based lock provider, the name of the DynamoDB table acting as lock table"); + + public static final ConfigProperty DYNAMODB_LOCK_PARTITION_KEY = ConfigProperty + .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "partition_key") + .noDefaultValue() + .withInferFunction(cfg -> { + if (cfg.contains(HoodieWriteConfig.TBL_NAME)) { + return Option.of(cfg.getString(HoodieWriteConfig.TBL_NAME)); + } + return Option.empty(); + }) + .withDocumentation("For DynamoDB based lock provider, the partition key for the DynamoDB lock table. " + + "Each Hudi dataset should has it's unique key so concurrent writers could refer to the same partition key." + + " By default we use the Hudi table name specified to be the partition key"); + + public static final ConfigProperty DYNAMODB_LOCK_REGION = ConfigProperty + .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "region") + .defaultValue("us-east-1") + .withInferFunction(cfg -> { + String regionFromEnv = System.getenv("AWS_REGION"); + if (regionFromEnv != null) { + return Option.of(RegionUtils.getRegion(regionFromEnv).getName()); + } + return Option.empty(); + }) + .withDocumentation("For DynamoDB based lock provider, the region used in endpoint for Amazon DynamoDB service." + + " Would try to first get it from AWS_REGION environment variable. If not find, by default use us-east-1"); + + public static final ConfigProperty DYNAMODB_LOCK_BILLING_MODE = ConfigProperty + .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "billing_mode") + .defaultValue(BillingMode.PAY_PER_REQUEST.name()) + .withDocumentation("For DynamoDB based lock provider, by default it is PAY_PER_REQUEST mode"); + + public static final ConfigProperty DYNAMODB_LOCK_READ_CAPACITY = ConfigProperty + .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "read_capacity") + .defaultValue("20") + .withDocumentation("For DynamoDB based lock provider, read capacity units when using PROVISIONED billing mode"); + + public static final ConfigProperty DYNAMODB_LOCK_WRITE_CAPACITY = ConfigProperty + .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "write_capacity") + .defaultValue("10") + .withDocumentation("For DynamoDB based lock provider, write capacity units when using PROVISIONED billing mode"); + + public static final ConfigProperty DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT = ConfigProperty + .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table_creation_timeout") + .defaultValue(String.valueOf(10 * 60 * 1000)) + .withDocumentation("For DynamoDB based lock provider, the maximum number of milliseconds to wait for creating DynamoDB table"); +} diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java new file mode 100644 index 000000000000..6c62b61528ad --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; +import javax.annotation.concurrent.Immutable; + +import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE; +import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY; +import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY; +import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_REGION; +import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME; +import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY; + +/** + * Configurations used by the AWS credentials and AWS DynamoDB based lock. + */ +@Immutable +@ConfigClassProperty(name = "AWS credential Configs", + groupName = ConfigGroups.Names.AWS_DYNAMO_DB, + description = "Configurations used for AWS credentials to get AWS resources.") +public class HoodieAWSConfig extends HoodieConfig { + public static final ConfigProperty AWS_ACCESS_KEY = ConfigProperty + .key("hoodie.aws.access.key") + .noDefaultValue() + .withDocumentation("AWS access key id"); + + public static final ConfigProperty AWS_SECRET_KEY = ConfigProperty + .key("hoodie.aws.secret.key") + .noDefaultValue() + .withDocumentation("AWS secret key"); + + public static final ConfigProperty AWS_SESSION_TOKEN = ConfigProperty + .key("hoodie.aws.session.token") + .noDefaultValue() + .withDocumentation("AWS session token"); + + private HoodieAWSConfig() { + super(); + } + + public static HoodieAWSConfig.Builder newBuilder() { + return new HoodieAWSConfig.Builder(); + } + + public String getAWSAccessKey() { + return getString(AWS_ACCESS_KEY); + } + + public String getAWSSecretKey() { + return getString(AWS_SECRET_KEY); + } + + public String getAWSSessionToken() { + return getString(AWS_SESSION_TOKEN); + } + + public static class Builder { + + private final HoodieAWSConfig awsConfig = new HoodieAWSConfig(); + + public HoodieAWSConfig.Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.awsConfig.getProps().load(reader); + return this; + } + } + + public HoodieAWSConfig.Builder fromProperties(Properties props) { + this.awsConfig.getProps().putAll(props); + return this; + } + + public HoodieAWSConfig.Builder withAccessKey(String accessKey) { + awsConfig.setValue(AWS_ACCESS_KEY, accessKey); + return this; + } + + public HoodieAWSConfig.Builder withSecretKey(String secretKey) { + awsConfig.setValue(AWS_SECRET_KEY, secretKey); + return this; + } + + public HoodieAWSConfig.Builder withSessionToken(String sessionToken) { + awsConfig.setValue(AWS_SESSION_TOKEN, sessionToken); + return this; + } + + public Builder withDynamoDBTable(String dynamoDbTableName) { + awsConfig.setValue(DYNAMODB_LOCK_TABLE_NAME, dynamoDbTableName); + return this; + } + + public Builder withDynamoDBPartitionKey(String partitionKey) { + awsConfig.setValue(DYNAMODB_LOCK_PARTITION_KEY, partitionKey); + return this; + } + + public Builder withDynamoDBRegion(String region) { + awsConfig.setValue(DYNAMODB_LOCK_REGION, region); + return this; + } + + public Builder withDynamoDBBillingMode(String mode) { + awsConfig.setValue(DYNAMODB_LOCK_BILLING_MODE, mode); + return this; + } + + public Builder withDynamoDBReadCapacity(String capacity) { + awsConfig.setValue(DYNAMODB_LOCK_READ_CAPACITY, capacity); + return this; + } + + public Builder withDynamoDBWriteCapacity(String capacity) { + awsConfig.setValue(DYNAMODB_LOCK_WRITE_CAPACITY, capacity); + return this; + } + + public HoodieAWSConfig build() { + awsConfig.setDefaults(HoodieAWSConfig.class.getName()); + return awsConfig; + } + } +} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java b/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java new file mode 100644 index 000000000000..051fe81e8b0f --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws; + +import com.amazonaws.auth.BasicSessionCredentials; +import org.apache.hudi.config.HoodieAWSConfig; +import org.apache.hudi.common.config.HoodieConfig; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieAWSCredentialsProviderFactory { + + @Test + public void testGetAWSCredentials() { + HoodieConfig cfg = new HoodieConfig(); + cfg.setValue(HoodieAWSConfig.AWS_ACCESS_KEY, "random-access-key"); + cfg.setValue(HoodieAWSConfig.AWS_SECRET_KEY, "random-secret-key"); + cfg.setValue(HoodieAWSConfig.AWS_SESSION_TOKEN, "random-session-token"); + BasicSessionCredentials credentials = (BasicSessionCredentials) org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(cfg.getProps()).getCredentials(); + assertEquals("random-access-key", credentials.getAWSAccessKeyId()); + assertEquals("random-secret-key", credentials.getAWSSecretKey()); + assertEquals("random-session-token", credentials.getSessionToken()); + } +} diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java new file mode 100644 index 000000000000..8dc28328274f --- /dev/null +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.transaction.integ; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.model.BillingMode; +import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.config.AWSLockConfiguration; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.UUID; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; + +/** + * Test for {@link DynamoDBBasedLockProvider}. + * Set it as integration test because it requires setting up docker environment. + */ +public class ITTestDynamoDBBasedLockProvider { + + private static LockConfiguration lockConfiguration; + private static AmazonDynamoDB dynamoDb; + + private static final String TABLE_NAME_PREFIX = "testDDBTable-"; + private static final String REGION = "us-east-2"; + + @BeforeAll + public static void setup() throws InterruptedException { + Properties properties = new Properties(); + properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name()); + // properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX); + properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key(), "testKey"); + properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key(), REGION); + properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000"); + properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "0"); + properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "0"); + lockConfiguration = new LockConfiguration(properties); + dynamoDb = getDynamoClientWithLocalEndpoint(); + } + + @Test + public void testAcquireLock() { + lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); + DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb); + Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + dynamoDbBasedLockProvider.unlock(); + } + + @Test + public void testUnlock() { + lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); + DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb); + Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + dynamoDbBasedLockProvider.unlock(); + Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + } + + @Test + public void testReentrantLock() { + lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); + DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb); + Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + Assertions.assertFalse(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + dynamoDbBasedLockProvider.unlock(); + } + + @Test + public void testUnlockWithoutLock() { + lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID()); + DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb); + dynamoDbBasedLockProvider.unlock(); + } + + private static AmazonDynamoDB getDynamoClientWithLocalEndpoint() { + String endpoint = System.getProperty("dynamodb-local.endpoint"); + if (endpoint == null || endpoint.isEmpty()) { + throw new IllegalStateException("dynamodb-local.endpoint system property not set"); + } + return AmazonDynamoDBClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, REGION)) + .withCredentials(getCredentials()) + .build(); + } + + private static AWSCredentialsProvider getCredentials() { + return new AWSStaticCredentialsProvider(new BasicAWSCredentials("random-access-key", "random-secret-key")); + } +} diff --git a/hudi-aws/src/test/resources/log4j-surefire.properties b/hudi-aws/src/test/resources/log4j-surefire.properties new file mode 100644 index 000000000000..a59d4ebe2b19 --- /dev/null +++ b/hudi-aws/src/test/resources/log4j-surefire.properties @@ -0,0 +1,25 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, A1 +log4j.category.org.apache=INFO +log4j.category.org.apache.parquet.hadoop=WARN +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index 33519b9adc0d..d5e87ec20239 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -72,7 +72,7 @@ public void lock() { LOG.info("Retrying to acquire lock..."); Thread.sleep(waitTimeInMs); retryCount++; - } catch (InterruptedException e) { + } catch (HoodieLockException | InterruptedException e) { if (retryCount >= retries) { throw new HoodieLockException("Unable to acquire lock, lock object ", e); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index fd7b310a57d6..bf560810ae99 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -53,7 +53,6 @@ import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY; - /** * Hoodie Configs for Locks. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java index 08e1bb4a87a5..18cd8042763b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java @@ -31,7 +31,8 @@ public enum Names { WRITE_CLIENT("Write Client Configs"), METRICS("Metrics Configs"), RECORD_PAYLOAD("Record Payload Config"), - KAFKA_CONNECT("Kafka Connect Configs"); + KAFKA_CONNECT("Kafka Connect Configs"), + AWS_DYNAMO_DB("aws-dynamo-db"); public final String name; diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 8b7b2176a74b..a6b9490a40ec 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -81,6 +81,7 @@ org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr org.apache.hudi:hudi-timeline-service + org.apache.hudi:hudi-aws com.yammer.metrics:metrics-core com.beust:jcommander @@ -149,6 +150,10 @@ org.apache.hbase:hbase-protocol org.apache.htrace:htrace-core commons-codec:commons-codec + + com.amazonaws:dynamodb-lock-client + com.amazonaws:aws-java-sdk-dynamodb + com.amazonaws:aws-java-sdk-core diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index d55b39493dca..32a9abf8f7c9 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -73,6 +73,7 @@ org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr org.apache.hudi:hudi-timeline-service + org.apache.hudi:hudi-aws com.beust:jcommander io.javalin:javalin @@ -100,6 +101,10 @@ com.yammer.metrics:metrics-core com.google.guava:guava + com.amazonaws:dynamodb-lock-client + com.amazonaws:aws-java-sdk-dynamodb + com.amazonaws:aws-java-sdk-core + org.apache.spark:spark-avro_${scala.binary.version} org.apache.hive:hive-common org.apache.hive:hive-service diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index be65ead9b96d..0fb1d417ebc6 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -75,6 +75,7 @@ org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr org.apache.hudi:hudi-timeline-service + org.apache.hudi:hudi-aws com.yammer.metrics:metrics-core com.beust:jcommander @@ -90,6 +91,10 @@ org.antlr:stringtemplate org.apache.parquet:parquet-avro + com.amazonaws:dynamodb-lock-client + com.amazonaws:aws-java-sdk-dynamodb + com.amazonaws:aws-java-sdk-core + com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} io.confluent:kafka-avro-serializer diff --git a/pom.xml b/pom.xml index e403973b417c..360addeab1cd 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ hudi-common hudi-cli hudi-client + hudi-aws hudi-hadoop-mr hudi-spark-datasource hudi-timeline-service @@ -83,6 +84,7 @@ 0.15 1.7 3.0.0-M1 + 0.37.0 1.8 2.6.7 @@ -140,6 +142,7 @@ ${skipTests} ${skipTests} ${skipTests} + ${skipTests} UTF-8 ${project.basedir} provided @@ -158,6 +161,9 @@ 1.12.22 3.17.3 3.1.0 + 1.1.0 + 8000 + http://localhost:${dynamodb-local.port} @@ -331,6 +337,14 @@ jacoco-maven-plugin ${jacoco.version} + + io.fabric8 + docker-maven-plugin + ${maven-docker-plugin.version} + + ${skipDocker} + + @@ -1269,6 +1283,9 @@ **/IT*.java + + ${dynamodb-local.endpoint} +