diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml index ad871b635d65c..c6c54e465260b 100644 --- a/hudi-aws/pom.xml +++ b/hudi-aws/pom.xml @@ -77,9 +77,8 @@ com.amazonaws dynamodb-lock-client - ${dynamodb.lockclient.version} + 1.2.0 - ${hive.groupid} @@ -104,14 +103,14 @@ - com.amazonaws - aws-java-sdk-cloudwatch + software.amazon.awssdk + cloudwatch ${aws.sdk.version} - com.amazonaws - aws-java-sdk-dynamodb + software.amazon.awssdk + dynamodb ${aws.sdk.version} @@ -121,8 +120,8 @@ - com.amazonaws - aws-java-sdk-core + software.amazon.awssdk + sdk-core ${aws.sdk.version} @@ -133,18 +132,29 @@ - com.amazonaws - aws-java-sdk-glue + software.amazon.awssdk + glue ${aws.sdk.version} - com.amazonaws - aws-java-sdk-sqs + software.amazon.awssdk + sqs ${aws.sdk.version} + + org.apache.httpcomponents + httpclient + ${aws.sdk.httpclient.version} + + + org.apache.httpcomponents + httpcore + ${aws.sdk.httpcore.version} + + org.apache.hudi diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java index e086862f2cf62..f8fefbc69348b 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java @@ -21,13 +21,12 @@ import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory; import org.apache.hudi.common.util.Option; -import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync; -import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder; -import com.amazonaws.services.cloudwatch.model.Dimension; -import com.amazonaws.services.cloudwatch.model.MetricDatum; -import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; -import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; -import com.amazonaws.services.cloudwatch.model.StandardUnit; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.cloudwatch.model.Dimension; +import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse; +import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import com.codahale.metrics.Clock; import com.codahale.metrics.Counter; import com.codahale.metrics.Counting; @@ -41,8 +40,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Properties; @@ -63,7 +62,7 @@ public class CloudWatchReporter extends ScheduledReporter { private static final Logger LOG = LoggerFactory.getLogger(CloudWatchReporter.class); - private final AmazonCloudWatchAsync cloudWatchClientAsync; + private final CloudWatchAsyncClient cloudWatchClientAsync; private final Clock clock; private final String prefix; private final String namespace; @@ -139,7 +138,7 @@ public CloudWatchReporter build(Properties props) { durationUnit); } - CloudWatchReporter build(AmazonCloudWatchAsync amazonCloudWatchAsync) { + CloudWatchReporter build(CloudWatchAsyncClient amazonCloudWatchAsync) { return new CloudWatchReporter(registry, amazonCloudWatchAsync, clock, @@ -153,7 +152,7 @@ CloudWatchReporter build(AmazonCloudWatchAsync amazonCloudWatchAsync) { } protected CloudWatchReporter(MetricRegistry registry, - AmazonCloudWatchAsync cloudWatchClientAsync, + CloudWatchAsyncClient cloudWatchClientAsync, Clock clock, String prefix, String namespace, @@ -169,9 +168,9 @@ protected CloudWatchReporter(MetricRegistry registry, this.maxDatumsPerRequest = maxDatumsPerRequest; } - private static AmazonCloudWatchAsync getAmazonCloudWatchClient(Properties props) { - return AmazonCloudWatchAsyncClientBuilder.standard() - .withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props)) + private static CloudWatchAsyncClient getAmazonCloudWatchClient(Properties props) { + return CloudWatchAsyncClient.builder() + .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props)) .build(); } @@ -213,7 +212,7 @@ public void report(SortedMap gauges, } private void report(List metricsData) { - List> cloudWatchFutures = new ArrayList<>(metricsData.size()); + List> cloudWatchFutures = new ArrayList<>(metricsData.size()); List> partitions = new ArrayList<>(); for (int i = 0; i < metricsData.size(); i += maxDatumsPerRequest) { @@ -222,14 +221,15 @@ private void report(List metricsData) { } for (List partition : partitions) { - PutMetricDataRequest request = new PutMetricDataRequest() - .withNamespace(namespace) - .withMetricData(partition); + PutMetricDataRequest request = PutMetricDataRequest.builder() + .namespace(namespace) + .metricData(partition) + .build(); - cloudWatchFutures.add(cloudWatchClientAsync.putMetricDataAsync(request)); + cloudWatchFutures.add(cloudWatchClientAsync.putMetricData(request)); } - for (final Future cloudWatchFuture : cloudWatchFutures) { + for (final Future cloudWatchFuture : cloudWatchFutures) { try { cloudWatchFuture.get(30, TimeUnit.SECONDS); } catch (final Exception ex) { @@ -250,7 +250,7 @@ private void processGauge(final String metricName, .ifPresent(value -> stageMetricDatum(metricName, value.doubleValue(), DIMENSION_GAUGE_TYPE_VALUE, - StandardUnit.None, + StandardUnit.NONE, timestampMilliSec, metricData)); } @@ -262,7 +262,7 @@ private void processCounter(final String metricName, stageMetricDatum(metricName, counter.getCount(), DIMENSION_COUNT_TYPE_VALUE, - StandardUnit.Count, + StandardUnit.COUNT, timestampMilliSec, metricData); } @@ -277,22 +277,25 @@ private void stageMetricDatum(String metricName, String tableName = metricNameParts[0]; - metricData.add(new MetricDatum() - .withTimestamp(new Date(timestampMilliSec)) - .withMetricName(prefix(metricNameParts[1])) - .withValue(metricValue) - .withDimensions(getDimensions(tableName, metricType)) - .withUnit(standardUnit)); + metricData.add(MetricDatum.builder() + .timestamp(Instant.ofEpochMilli(timestampMilliSec)) + .metricName(prefix(metricNameParts[1])) + .value(metricValue) + .dimensions(getDimensions(tableName, metricType)) + .unit(standardUnit) + .build()); } private List getDimensions(String tableName, String metricType) { List dimensions = new ArrayList<>(); - dimensions.add(new Dimension() - .withName(DIMENSION_TABLE_NAME_KEY) - .withValue(tableName)); - dimensions.add(new Dimension() - .withName(DIMENSION_METRIC_TYPE_KEY) - .withValue(metricType)); + dimensions.add(Dimension.builder() + .name(DIMENSION_TABLE_NAME_KEY) + .value(tableName) + .build()); + dimensions.add(Dimension.builder() + .name(DIMENSION_METRIC_TYPE_KEY) + .value(metricType) + .build()); return dimensions; } @@ -306,7 +309,7 @@ public void stop() { super.stop(); } finally { try { - cloudWatchClientAsync.shutdown(); + cloudWatchClientAsync.close(); } catch (Exception ex) { LOG.warn("Exception while shutting down CloudWatch client.", ex); } diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java index 631b0fa8d5349..4342a529d29a0 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieAWSCredentialsProviderFactory.java @@ -18,9 +18,9 @@ package org.apache.hudi.aws.credentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSCredentialsProviderChain; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import java.util.ArrayList; import java.util.List; @@ -30,16 +30,23 @@ * Factory class for Hoodie AWSCredentialsProvider. */ public class HoodieAWSCredentialsProviderFactory { - public static AWSCredentialsProvider getAwsCredentialsProvider(Properties props) { + public static AwsCredentialsProvider getAwsCredentialsProvider(Properties props) { return getAwsCredentialsProviderChain(props); } - private static AWSCredentialsProvider getAwsCredentialsProviderChain(Properties props) { - List providers = new ArrayList<>(); - providers.add(new HoodieConfigAWSCredentialsProvider(props)); - providers.add(new DefaultAWSCredentialsProviderChain()); - AWSCredentialsProviderChain providerChain = new AWSCredentialsProviderChain(providers); - providerChain.setReuseLastProvider(true); + private static AwsCredentialsProvider getAwsCredentialsProviderChain(Properties props) { + List providers = new ArrayList<>(); + HoodieConfigAWSCredentialsProvider hoodieConfigAWSCredentialsProvider = new HoodieConfigAWSCredentialsProvider(props); + if (hoodieConfigAWSCredentialsProvider.resolveCredentials() != null) { + providers.add(hoodieConfigAWSCredentialsProvider); + } + providers.add(DefaultCredentialsProvider.builder() + .reuseLastProviderEnabled(true) + .build()); + AwsCredentialsProviderChain providerChain = AwsCredentialsProviderChain.builder() + .credentialsProviders(providers) + .reuseLastProviderEnabled(true) + .build(); return providerChain; } } diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java index 3f618891d781c..271149f71b9f8 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/credentials/HoodieConfigAWSCredentialsProvider.java @@ -18,13 +18,13 @@ package org.apache.hudi.aws.credentials; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieAWSConfig; +import org.apache.hudi.common.util.StringUtils; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.BasicSessionCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,11 +33,11 @@ /** * Credentials provider which fetches AWS access key from Hoodie config. */ -public class HoodieConfigAWSCredentialsProvider implements AWSCredentialsProvider { +public class HoodieConfigAWSCredentialsProvider implements AwsCredentialsProvider { private static final Logger LOG = LoggerFactory.getLogger(HoodieConfigAWSCredentialsProvider.class); - private AWSCredentials awsCredentials; + private AwsCredentials awsCredentials; public HoodieConfigAWSCredentialsProvider(Properties props) { String accessKey = props.getProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key()); @@ -52,20 +52,15 @@ public HoodieConfigAWSCredentialsProvider(Properties props) { } } - private static AWSCredentials createCredentials(String accessKey, String secretKey, + private static AwsCredentials createCredentials(String accessKey, String secretKey, String sessionToken) { return (sessionToken == null) - ? new BasicAWSCredentials(accessKey, secretKey) - : new BasicSessionCredentials(accessKey, secretKey, sessionToken); + ? AwsBasicCredentials.create(accessKey, secretKey) + : AwsSessionCredentials.create(accessKey, secretKey, sessionToken); } @Override - public AWSCredentials getCredentials() { + public AwsCredentials resolveCredentials() { return this.awsCredentials; } - - @Override - public void refresh() { - - } } diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index a2270b56c76ea..e0825b7148e72 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -25,41 +25,40 @@ import org.apache.hudi.sync.common.HoodieSyncClient; import org.apache.hudi.sync.common.model.Partition; -import com.amazonaws.services.glue.AWSGlue; -import com.amazonaws.services.glue.AWSGlueClientBuilder; -import com.amazonaws.services.glue.model.AlreadyExistsException; -import com.amazonaws.services.glue.model.BatchCreatePartitionRequest; -import com.amazonaws.services.glue.model.BatchCreatePartitionResult; -import com.amazonaws.services.glue.model.BatchDeletePartitionRequest; -import com.amazonaws.services.glue.model.BatchDeletePartitionResult; -import com.amazonaws.services.glue.model.BatchUpdatePartitionRequest; -import com.amazonaws.services.glue.model.BatchUpdatePartitionRequestEntry; -import com.amazonaws.services.glue.model.BatchUpdatePartitionResult; -import com.amazonaws.services.glue.model.Column; -import com.amazonaws.services.glue.model.CreateDatabaseRequest; -import com.amazonaws.services.glue.model.CreateDatabaseResult; -import com.amazonaws.services.glue.model.CreateTableRequest; -import com.amazonaws.services.glue.model.CreateTableResult; -import com.amazonaws.services.glue.model.DatabaseInput; -import com.amazonaws.services.glue.model.EntityNotFoundException; -import com.amazonaws.services.glue.model.GetDatabaseRequest; -import com.amazonaws.services.glue.model.GetPartitionsRequest; -import com.amazonaws.services.glue.model.GetPartitionsResult; -import com.amazonaws.services.glue.model.GetTableRequest; -import com.amazonaws.services.glue.model.PartitionInput; -import com.amazonaws.services.glue.model.PartitionValueList; -import com.amazonaws.services.glue.model.SerDeInfo; -import com.amazonaws.services.glue.model.StorageDescriptor; -import com.amazonaws.services.glue.model.Table; -import com.amazonaws.services.glue.model.TableInput; -import com.amazonaws.services.glue.model.UpdateTableRequest; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.BatchCreatePartitionResponse; +import software.amazon.awssdk.services.glue.model.BatchDeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.BatchDeletePartitionResponse; +import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequest; +import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequestEntry; +import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionResponse; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.CreateTableResponse; +import software.amazon.awssdk.services.glue.model.DatabaseInput; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.PartitionInput; +import software.amazon.awssdk.services.glue.model.PartitionValueList; +import software.amazon.awssdk.services.glue.model.SerDeInfo; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -87,12 +86,12 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { private static final Logger LOG = LoggerFactory.getLogger(AWSGlueCatalogSyncClient.class); private static final int MAX_PARTITIONS_PER_REQUEST = 100; private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L; - private final AWSGlue awsGlue; + private final GlueClient awsGlue; private final String databaseName; public AWSGlueCatalogSyncClient(HiveSyncConfig config) { super(config); - this.awsGlue = AWSGlueClientBuilder.standard().build(); + this.awsGlue = GlueClient.builder().build(); this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME); } @@ -102,14 +101,15 @@ public List getAllPartitions(String tableName) { List partitions = new ArrayList<>(); String nextToken = null; do { - GetPartitionsResult result = awsGlue.getPartitions(new GetPartitionsRequest() - .withDatabaseName(databaseName) - .withTableName(tableName) - .withNextToken(nextToken)); - partitions.addAll(result.getPartitions().stream() - .map(p -> new Partition(p.getValues(), p.getStorageDescriptor().getLocation())) + GetPartitionsResponse result = awsGlue.getPartitions(GetPartitionsRequest.builder() + .databaseName(databaseName) + .tableName(tableName) + .nextToken(nextToken) + .build()); + partitions.addAll(result.partitions().stream() + .map(p -> new Partition(p.values(), p.storageDescriptor().location())) .collect(Collectors.toList())); - nextToken = result.getNextToken(); + nextToken = result.nextToken(); } while (nextToken != null); return partitions; } catch (Exception e) { @@ -126,26 +126,27 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " + tableId(databaseName, tableName)); try { Table table = getTable(awsGlue, databaseName, tableName); - StorageDescriptor sd = table.getStorageDescriptor(); + StorageDescriptor sd = table.storageDescriptor(); List partitionInputs = partitionsToAdd.stream().map(partition -> { - StorageDescriptor partitionSd = sd.clone(); String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString(); List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - partitionSd.setLocation(fullPartitionPath); - return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd); + StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath)); + return PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build(); }).collect(Collectors.toList()); for (List batch : CollectionUtils.batches(partitionInputs, MAX_PARTITIONS_PER_REQUEST)) { - BatchCreatePartitionRequest request = new BatchCreatePartitionRequest(); - request.withDatabaseName(databaseName).withTableName(tableName).withPartitionInputList(batch); - - BatchCreatePartitionResult result = awsGlue.batchCreatePartition(request); - if (CollectionUtils.nonEmpty(result.getErrors())) { - if (result.getErrors().stream().allMatch((error) -> "AlreadyExistsException".equals(error.getErrorDetail().getErrorCode()))) { - LOG.warn("Partitions already exist in glue: " + result.getErrors()); + BatchCreatePartitionRequest request = BatchCreatePartitionRequest.builder() + .databaseName(databaseName).tableName(tableName).partitionInputList(batch).build(); + + BatchCreatePartitionResponse response = awsGlue.batchCreatePartition(request); + if (CollectionUtils.nonEmpty(response.errors())) { + if (response.errors().stream() + .allMatch( + (error) -> "AlreadyExistsException".equals(error.errorDetail().errorCode()))) { + LOG.warn("Partitions already exist in glue: " + response.errors()); } else { throw new HoodieGlueSyncException("Fail to add partitions to " + tableId(databaseName, tableName) - + " with error(s): " + result.getErrors()); + + " with error(s): " + response.errors()); } } Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS); @@ -164,24 +165,23 @@ public void updatePartitionsToTable(String tableName, List changedPartit LOG.info("Updating " + changedPartitions.size() + "partition(s) in table " + tableId(databaseName, tableName)); try { Table table = getTable(awsGlue, databaseName, tableName); - StorageDescriptor sd = table.getStorageDescriptor(); + StorageDescriptor sd = table.storageDescriptor(); List updatePartitionEntries = changedPartitions.stream().map(partition -> { - StorageDescriptor partitionSd = sd.clone(); String fullPartitionPath = FSUtils.getPartitionPath(getBasePath(), partition).toString(); List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); - partitionSd.setLocation(fullPartitionPath); - PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd); - return new BatchUpdatePartitionRequestEntry().withPartitionInput(partitionInput).withPartitionValueList(partitionValues); + StorageDescriptor partitionSD = sd.copy(copySd -> copySd.location(fullPartitionPath)); + PartitionInput partitionInput = PartitionInput.builder().values(partitionValues).storageDescriptor(partitionSD).build(); + return BatchUpdatePartitionRequestEntry.builder().partitionInput(partitionInput).partitionValueList(partitionValues).build(); }).collect(Collectors.toList()); for (List batch : CollectionUtils.batches(updatePartitionEntries, MAX_PARTITIONS_PER_REQUEST)) { - BatchUpdatePartitionRequest request = new BatchUpdatePartitionRequest(); - request.withDatabaseName(databaseName).withTableName(tableName).withEntries(batch); + BatchUpdatePartitionRequest request = BatchUpdatePartitionRequest.builder() + .databaseName(databaseName).tableName(tableName).entries(batch).build(); - BatchUpdatePartitionResult result = awsGlue.batchUpdatePartition(request); - if (CollectionUtils.nonEmpty(result.getErrors())) { + BatchUpdatePartitionResponse response = awsGlue.batchUpdatePartition(request); + if (CollectionUtils.nonEmpty(response.errors())) { throw new HoodieGlueSyncException("Fail to update partitions to " + tableId(databaseName, tableName) - + " with error(s): " + result.getErrors()); + + " with error(s): " + response.errors()); } Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS); } @@ -201,20 +201,22 @@ public void dropPartitions(String tableName, List partitionsToDrop) { for (List batch : CollectionUtils.batches(partitionsToDrop, MAX_PARTITIONS_PER_REQUEST)) { List partitionValueLists = batch.stream().map(partition -> { - PartitionValueList partitionValueList = new PartitionValueList(); - partitionValueList.setValues(partitionValueExtractor.extractPartitionValuesInPath(partition)); + PartitionValueList partitionValueList = PartitionValueList.builder() + .values(partitionValueExtractor.extractPartitionValuesInPath(partition)) + .build(); return partitionValueList; }).collect(Collectors.toList()); - BatchDeletePartitionRequest batchDeletePartitionRequest = new BatchDeletePartitionRequest() - .withDatabaseName(databaseName) - .withTableName(tableName) - .withPartitionsToDelete(partitionValueLists); + BatchDeletePartitionRequest batchDeletePartitionRequest = BatchDeletePartitionRequest.builder() + .databaseName(databaseName) + .tableName(tableName) + .partitionsToDelete(partitionValueLists) + .build(); - BatchDeletePartitionResult result = awsGlue.batchDeletePartition(batchDeletePartitionRequest); - if (CollectionUtils.nonEmpty(result.getErrors())) { + BatchDeletePartitionResponse response = awsGlue.batchDeletePartition(batchDeletePartitionRequest); + if (CollectionUtils.nonEmpty(response.errors())) { throw new HoodieGlueSyncException("Fail to drop partitions to " + tableId(databaseName, tableName) - + " with error(s): " + result.getErrors()); + + " with error(s): " + response.errors()); } Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS); } @@ -246,22 +248,22 @@ public void updateTableSchema(String tableName, MessageType newSchema) { Table table = getTable(awsGlue, databaseName, tableName); Map newSchemaMap = parquetSchemaToMapSchema(newSchema, config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE), false); List newColumns = getColumnsFromSchema(newSchemaMap); - StorageDescriptor sd = table.getStorageDescriptor(); - sd.setColumns(newColumns); - - final Date now = new Date(); - TableInput updatedTableInput = new TableInput() - .withName(tableName) - .withTableType(table.getTableType()) - .withParameters(table.getParameters()) - .withPartitionKeys(table.getPartitionKeys()) - .withStorageDescriptor(sd) - .withLastAccessTime(now) - .withLastAnalyzedTime(now); - - UpdateTableRequest request = new UpdateTableRequest() - .withDatabaseName(databaseName) - .withTableInput(updatedTableInput); + StorageDescriptor sd = table.storageDescriptor(); + StorageDescriptor partitionSD = sd.copy(copySd -> copySd.columns(newColumns)); + final Instant now = Instant.now(); + TableInput updatedTableInput = TableInput.builder() + .tableType(table.tableType()) + .parameters(table.parameters()) + .partitionKeys(table.partitionKeys()) + .storageDescriptor(partitionSD) + .lastAccessTime(now) + .lastAnalyzedTime(now) + .build(); + + UpdateTableRequest request = UpdateTableRequest.builder() + .databaseName(databaseName) + .tableInput(updatedTableInput) + .build(); awsGlue.updateTable(request); } catch (Exception e) { @@ -280,7 +282,6 @@ public void createTable(String tableName, if (tableExists(tableName)) { return; } - CreateTableRequest request = new CreateTableRequest(); Map params = new HashMap<>(); if (!config.getBoolean(HIVE_CREATE_MANAGED_TABLE)) { params.put("EXTERNAL", "TRUE"); @@ -295,32 +296,36 @@ public void createTable(String tableName, // now create the schema partition List schemaPartitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS).stream().map(partitionKey -> { String keyType = getPartitionKeyType(mapSchema, partitionKey); - return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment(""); + return Column.builder().name(partitionKey).type(keyType.toLowerCase()).comment("").build(); }).collect(Collectors.toList()); - StorageDescriptor storageDescriptor = new StorageDescriptor(); serdeProperties.put("serialization.format", "1"); - storageDescriptor - .withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties)) - .withLocation(s3aToS3(getBasePath())) - .withInputFormat(inputFormatClass) - .withOutputFormat(outputFormatClass) - .withColumns(schemaWithoutPartitionKeys); - - final Date now = new Date(); - TableInput tableInput = new TableInput() - .withName(tableName) - .withTableType(TableType.EXTERNAL_TABLE.toString()) - .withParameters(params) - .withPartitionKeys(schemaPartitionKeys) - .withStorageDescriptor(storageDescriptor) - .withLastAccessTime(now) - .withLastAnalyzedTime(now); - request.withDatabaseName(databaseName) - .withTableInput(tableInput); - - CreateTableResult result = awsGlue.createTable(request); - LOG.info("Created table " + tableId(databaseName, tableName) + " : " + result); + StorageDescriptor storageDescriptor = StorageDescriptor.builder() + .serdeInfo(SerDeInfo.builder().serializationLibrary(serdeClass).parameters(serdeProperties).build()) + .location(s3aToS3(getBasePath())) + .inputFormat(inputFormatClass) + .outputFormat(outputFormatClass) + .columns(schemaWithoutPartitionKeys) + .build(); + + final Instant now = Instant.now(); + TableInput tableInput = TableInput.builder() + .name(tableName) + .tableType(TableType.EXTERNAL_TABLE.toString()) + .parameters(params) + .partitionKeys(schemaPartitionKeys) + .storageDescriptor(storageDescriptor) + .lastAccessTime(now) + .lastAnalyzedTime(now) + .build(); + + CreateTableRequest request = CreateTableRequest.builder() + .databaseName(databaseName) + .tableInput(tableInput) + .build(); + + CreateTableResponse response = awsGlue.createTable(request); + LOG.info("Created table " + tableId(databaseName, tableName) + " : " + response); } catch (AlreadyExistsException e) { LOG.warn("Table " + tableId(databaseName, tableName) + " already exists.", e); } catch (Exception e) { @@ -335,10 +340,10 @@ public Map getMetastoreSchema(String tableName) { // get the Schema of the table. Table table = getTable(awsGlue, databaseName, tableName); Map partitionKeysMap = - table.getPartitionKeys().stream().collect(Collectors.toMap(Column::getName, f -> f.getType().toUpperCase())); + table.partitionKeys().stream().collect(Collectors.toMap(Column::name, f -> f.type().toUpperCase())); Map columnsMap = - table.getStorageDescriptor().getColumns().stream().collect(Collectors.toMap(Column::getName, f -> f.getType().toUpperCase())); + table.storageDescriptor().columns().stream().collect(Collectors.toMap(Column::name, f -> f.type().toUpperCase())); Map schema = new HashMap<>(); schema.putAll(columnsMap); @@ -351,11 +356,12 @@ public Map getMetastoreSchema(String tableName) { @Override public boolean tableExists(String tableName) { - GetTableRequest request = new GetTableRequest() - .withDatabaseName(databaseName) - .withName(tableName); + GetTableRequest request = GetTableRequest.builder() + .databaseName(databaseName) + .name(tableName) + .build(); try { - return Objects.nonNull(awsGlue.getTable(request).getTable()); + return Objects.nonNull(awsGlue.getTable(request).table()); } catch (EntityNotFoundException e) { LOG.info("Table not found: " + tableId(databaseName, tableName), e); return false; @@ -366,10 +372,9 @@ public boolean tableExists(String tableName) { @Override public boolean databaseExists(String databaseName) { - GetDatabaseRequest request = new GetDatabaseRequest(); - request.setName(databaseName); + GetDatabaseRequest request = GetDatabaseRequest.builder().name(databaseName).build(); try { - return Objects.nonNull(awsGlue.getDatabase(request).getDatabase()); + return Objects.nonNull(awsGlue.getDatabase(request).database()); } catch (EntityNotFoundException e) { LOG.info("Database not found: " + databaseName, e); return false; @@ -383,14 +388,16 @@ public void createDatabase(String databaseName) { if (databaseExists(databaseName)) { return; } - CreateDatabaseRequest request = new CreateDatabaseRequest(); - request.setDatabaseInput(new DatabaseInput() - .withName(databaseName) - .withDescription("Automatically created by " + this.getClass().getName()) - .withParameters(null) - .withLocationUri(null)); + CreateDatabaseRequest request = CreateDatabaseRequest.builder() + .databaseInput(DatabaseInput.builder() + .name(databaseName) + .description("Automatically created by " + this.getClass().getName()) + .parameters(null) + .locationUri(null) + .build() + ).build(); try { - CreateDatabaseResult result = awsGlue.createDatabase(request); + CreateDatabaseResponse result = awsGlue.createDatabase(request); LOG.info("Successfully created database in AWS Glue: " + result.toString()); } catch (AlreadyExistsException e) { LOG.warn("AWS Glue Database " + databaseName + " already exists", e); @@ -403,7 +410,7 @@ public void createDatabase(String databaseName) { public Option getLastCommitTimeSynced(String tableName) { try { Table table = getTable(awsGlue, databaseName, tableName); - return Option.ofNullable(table.getParameters().get(HOODIE_LAST_COMMIT_TIME_SYNC)); + return Option.ofNullable(table.parameters().get(HOODIE_LAST_COMMIT_TIME_SYNC)); } catch (Exception e) { throw new HoodieGlueSyncException("Fail to get last sync commit time for " + tableId(databaseName, tableName), e); } @@ -411,7 +418,7 @@ public Option getLastCommitTimeSynced(String tableName) { @Override public void close() { - awsGlue.shutdown(); + awsGlue.close(); } @Override @@ -449,7 +456,7 @@ private List getColumnsFromSchema(Map mapSchema) { // In Glue, the full schema should exclude the partition keys if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).contains(key)) { String keyType = getPartitionKeyType(mapSchema, key); - Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment(""); + Column column = Column.builder().name(key).type(keyType.toLowerCase()).comment("").build(); cols.add(column); } } @@ -464,12 +471,13 @@ private enum TableType { MATERIALIZED_VIEW } - private static Table getTable(AWSGlue awsGlue, String databaseName, String tableName) throws HoodieGlueSyncException { - GetTableRequest request = new GetTableRequest() - .withDatabaseName(databaseName) - .withName(tableName); + private static Table getTable(GlueClient awsGlue, String databaseName, String tableName) throws HoodieGlueSyncException { + GetTableRequest request = GetTableRequest.builder() + .databaseName(databaseName) + .name(tableName) + .build(); try { - return awsGlue.getTable(request).getTable(); + return awsGlue.getTable(request).table(); } catch (EntityNotFoundException e) { throw new HoodieGlueSyncException("Table not found: " + tableId(databaseName, tableName), e); } catch (Exception e) { @@ -477,28 +485,29 @@ private static Table getTable(AWSGlue awsGlue, String databaseName, String table } } - private static void updateTableParameters(AWSGlue awsGlue, String databaseName, String tableName, Map updatingParams, boolean shouldReplace) { + private static void updateTableParameters(GlueClient awsGlue, String databaseName, String tableName, Map updatingParams, boolean shouldReplace) { final Map newParams = new HashMap<>(); try { Table table = getTable(awsGlue, databaseName, tableName); if (!shouldReplace) { - newParams.putAll(table.getParameters()); + newParams.putAll(table.parameters()); } newParams.putAll(updatingParams); - final Date now = new Date(); - TableInput updatedTableInput = new TableInput() - .withName(tableName) - .withTableType(table.getTableType()) - .withParameters(newParams) - .withPartitionKeys(table.getPartitionKeys()) - .withStorageDescriptor(table.getStorageDescriptor()) - .withLastAccessTime(now) - .withLastAnalyzedTime(now); - - UpdateTableRequest request = new UpdateTableRequest(); - request.withDatabaseName(databaseName) - .withTableInput(updatedTableInput); + final Instant now = Instant.now(); + TableInput updatedTableInput = TableInput.builder() + .name(tableName) + .tableType(table.tableType()) + .parameters(newParams) + .partitionKeys(table.partitionKeys()) + .storageDescriptor(table.storageDescriptor()) + .lastAccessTime(now) + .lastAnalyzedTime(now) + .build(); + + UpdateTableRequest request = UpdateTableRequest.builder().databaseName(databaseName) + .tableInput(updatedTableInput) + .build(); awsGlue.updateTable(request); } catch (Exception e) { throw new HoodieGlueSyncException("Fail to update params for table " + tableId(databaseName, tableName) + ": " + newParams, e); diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java index 111d4142da075..b48c83f8f5b26 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java @@ -27,29 +27,29 @@ import org.apache.hudi.config.DynamoDbBasedLockConfig; import org.apache.hudi.exception.HoodieLockException; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.regions.RegionUtils; import com.amazonaws.services.dynamodbv2.AcquireLockOptions; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions; import com.amazonaws.services.dynamodbv2.LockItem; -import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; -import com.amazonaws.services.dynamodbv2.model.BillingMode; -import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; -import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; -import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; -import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; -import com.amazonaws.services.dynamodbv2.util.TableUtils; + +import org.apache.hudi.aws.utils.DynamoTableUtils; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.concurrent.NotThreadSafe; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -78,7 +78,7 @@ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, fina this(lockConfiguration, conf, null); } - public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, AmazonDynamoDB dynamoDB) { + public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, DynamoDbClient dynamoDB) { checkRequiredProps(lockConfiguration); this.lockConfiguration = lockConfiguration; this.tableName = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key()); @@ -155,45 +155,48 @@ public LockItem getLock() { return lock; } - private AmazonDynamoDB getDynamoDBClient() { + private DynamoDbClient getDynamoDBClient() { String region = this.lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key()); String endpointURL = this.lockConfiguration.getConfig().containsKey(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key()) ? this.lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key()) - : RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX); - AwsClientBuilder.EndpointConfiguration dynamodbEndpoint = - new AwsClientBuilder.EndpointConfiguration(endpointURL, region); - return AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(dynamodbEndpoint) - .withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig())) + : DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString(); + return DynamoDbClient.builder() + .endpointOverride(URI.create(endpointURL)) + .credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig())) .build(); } - private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName) { + private void createLockTableInDynamoDB(DynamoDbClient dynamoDB, String tableName) { String billingMode = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key()); - KeySchemaElement partitionKeyElement = new KeySchemaElement(); - partitionKeyElement.setAttributeName(DYNAMODB_ATTRIBUTE_NAME); - partitionKeyElement.setKeyType(KeyType.HASH); + KeySchemaElement partitionKeyElement = KeySchemaElement + .builder() + .attributeName(DYNAMODB_ATTRIBUTE_NAME) + .keyType(KeyType.HASH) + .build(); List keySchema = new ArrayList<>(); keySchema.add(partitionKeyElement); Collection attributeDefinitions = new ArrayList<>(); - attributeDefinitions.add(new AttributeDefinition().withAttributeName(DYNAMODB_ATTRIBUTE_NAME).withAttributeType(ScalarAttributeType.S)); - - CreateTableRequest createTableRequest = new CreateTableRequest(tableName, keySchema); - createTableRequest.setAttributeDefinitions(attributeDefinitions); - createTableRequest.setBillingMode(billingMode); + attributeDefinitions.add(AttributeDefinition.builder().attributeName(DYNAMODB_ATTRIBUTE_NAME).attributeType(ScalarAttributeType.S).build()); + CreateTableRequest.Builder createTableRequestBuilder = CreateTableRequest.builder(); if (billingMode.equals(BillingMode.PROVISIONED.name())) { - createTableRequest.setProvisionedThroughput(new ProvisionedThroughput() - .withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key()))) - .withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key())))); + createTableRequestBuilder.provisionedThroughput(ProvisionedThroughput.builder() + .readCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key()))) + .writeCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key()))) + .build()); } - dynamoDB.createTable(createTableRequest); + createTableRequestBuilder.tableName(tableName) + .keySchema(keySchema) + .attributeDefinitions(attributeDefinitions) + .billingMode(billingMode); + dynamoDB.createTable(createTableRequestBuilder.build()); LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to be active"); try { - TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000); - } catch (TableUtils.TableNeverTransitionedToStateException e) { + + DynamoTableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000); + } catch (DynamoTableUtils.TableNeverTransitionedToStateException e) { throw new HoodieLockException("Created dynamoDB table never transits to active", e); } catch (InterruptedException e) { throw new HoodieLockException("Thread interrupted while waiting for dynamoDB table to turn active", e); diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java b/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java new file mode 100644 index 0000000000000..5db6ab55ac157 --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/utils/DynamoTableUtils.java @@ -0,0 +1,265 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.hudi.aws.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.TableDescription; +import software.amazon.awssdk.services.dynamodb.model.TableStatus; + +/** + * Reused code from https://github.com/aws/aws-sdk-java-v2/blob/master/services/dynamodb/src/test/java/utils/test/util/TableUtils.java + * + * Utility methods for working with DynamoDB tables. + * + *
+ * // ... create DynamoDB table ...
+ * try {
+ *     waitUntilActive(dynamoDB, myTableName());
+ * } catch (SdkClientException e) {
+ *     // table didn't become active
+ * }
+ * // ... start making calls to table ...
+ * 
+ */ +public class DynamoTableUtils { + + private static final int DEFAULT_WAIT_TIMEOUT = 20 * 60 * 1000; + private static final int DEFAULT_WAIT_INTERVAL = 10 * 1000; + /** + * The logging utility. + */ + private static final Logger LOGGER = LoggerFactory.getLogger(DynamoTableUtils.class); + + /** + * Waits up to 10 minutes for a specified DynamoDB table to resolve, + * indicating that it exists. If the table doesn't return a result after + * this time, a SdkClientException is thrown. + * + * @param dynamo + * The DynamoDB client to use to make requests. + * @param tableName + * The name of the table being resolved. + * + * @throws SdkClientException + * If the specified table does not resolve before this method + * times out and stops polling. + * @throws InterruptedException + * If the thread is interrupted while waiting for the table to + * resolve. + */ + public static void waitUntilExists(final DynamoDbClient dynamo, final String tableName) + throws InterruptedException { + waitUntilExists(dynamo, tableName, DEFAULT_WAIT_TIMEOUT, DEFAULT_WAIT_INTERVAL); + } + + /** + * Waits up to a specified amount of time for a specified DynamoDB table to + * resolve, indicating that it exists. If the table doesn't return a result + * after this time, a SdkClientException is thrown. + * + * @param dynamo + * The DynamoDB client to use to make requests. + * @param tableName + * The name of the table being resolved. + * @param timeout + * The maximum number of milliseconds to wait. + * @param interval + * The poll interval in milliseconds. + * + * @throws SdkClientException + * If the specified table does not resolve before this method + * times out and stops polling. + * @throws InterruptedException + * If the thread is interrupted while waiting for the table to + * resolve. + */ + public static void waitUntilExists(final DynamoDbClient dynamo, final String tableName, final int timeout, + final int interval) throws InterruptedException { + TableDescription table = waitForTableDescription(dynamo, tableName, null, timeout, interval); + + if (table == null) { + throw SdkClientException.builder().message("Table " + tableName + " never returned a result").build(); + } + } + + /** + * Waits up to 10 minutes for a specified DynamoDB table to move into the + * ACTIVE state. If the table does not exist or does not + * transition to the ACTIVE state after this time, then + * SdkClientException is thrown. + * + * @param dynamo + * The DynamoDB client to use to make requests. + * @param tableName + * The name of the table whose status is being checked. + * + * @throws TableNeverTransitionedToStateException + * If the specified table does not exist or does not transition + * into the ACTIVE state before this method times + * out and stops polling. + * @throws InterruptedException + * If the thread is interrupted while waiting for the table to + * transition into the ACTIVE state. + */ + public static void waitUntilActive(final DynamoDbClient dynamo, final String tableName) + throws InterruptedException, TableNeverTransitionedToStateException { + waitUntilActive(dynamo, tableName, DEFAULT_WAIT_TIMEOUT, DEFAULT_WAIT_INTERVAL); + } + + /** + * Waits up to a specified amount of time for a specified DynamoDB table to + * move into the ACTIVE state. If the table does not exist or + * does not transition to the ACTIVE state after this time, + * then a SdkClientException is thrown. + * + * @param dynamo + * The DynamoDB client to use to make requests. + * @param tableName + * The name of the table whose status is being checked. + * @param timeout + * The maximum number of milliseconds to wait. + * @param interval + * The poll interval in milliseconds. + * + * @throws TableNeverTransitionedToStateException + * If the specified table does not exist or does not transition + * into the ACTIVE state before this method times + * out and stops polling. + * @throws InterruptedException + * If the thread is interrupted while waiting for the table to + * transition into the ACTIVE state. + */ + public static void waitUntilActive(final DynamoDbClient dynamo, final String tableName, final int timeout, + final int interval) throws InterruptedException, TableNeverTransitionedToStateException { + TableDescription table = waitForTableDescription(dynamo, tableName, TableStatus.ACTIVE, timeout, interval); + + if (table == null || !table.tableStatus().equals(TableStatus.ACTIVE)) { + throw new TableNeverTransitionedToStateException(tableName, TableStatus.ACTIVE); + } + } + + /** + * Wait for the table to reach the desired status and returns the table + * description + * + * @param dynamo + * Dynamo client to use + * @param tableName + * Table name to poll status of + * @param desiredStatus + * Desired {@link TableStatus} to wait for. If null this method + * simply waits until DescribeTable returns something non-null + * (i.e. any status) + * @param timeout + * Timeout in milliseconds to continue to poll for desired status + * @param interval + * Time to wait in milliseconds between poll attempts + * @return Null if DescribeTables never returns a result, otherwise the + * result of the last poll attempt (which may or may not have the + * desired state) + * @throws {@link + * IllegalArgumentException} If timeout or interval is invalid + */ + private static TableDescription waitForTableDescription(final DynamoDbClient dynamo, final String tableName, + TableStatus desiredStatus, final int timeout, final int interval) + throws InterruptedException, IllegalArgumentException { + if (timeout < 0) { + throw new IllegalArgumentException("Timeout must be >= 0"); + } + if (interval <= 0 || interval >= timeout) { + throw new IllegalArgumentException("Interval must be > 0 and < timeout"); + } + long startTime = System.currentTimeMillis(); + long endTime = startTime + timeout; + + TableDescription table = null; + while (System.currentTimeMillis() < endTime) { + try { + table = dynamo.describeTable(DescribeTableRequest.builder().tableName(tableName).build()).table(); + if (desiredStatus == null || table.tableStatus().equals(desiredStatus)) { + return table; + + } + } catch (ResourceNotFoundException rnfe) { + // ResourceNotFound means the table doesn't exist yet, + // so ignore this error and just keep polling. + } + + Thread.sleep(interval); + } + return table; + } + + /** + * Creates the table and ignores any errors if it already exists. + * @param dynamo The Dynamo client to use. + * @param createTableRequest The create table request. + * @return True if created, false otherwise. + */ + public static boolean createTableIfNotExists(final DynamoDbClient dynamo, final CreateTableRequest createTableRequest) { + try { + dynamo.createTable(createTableRequest); + return true; + } catch (final ResourceInUseException e) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Table " + createTableRequest.tableName() + " already exists", e); + } + } + return false; + } + + /** + * Deletes the table and ignores any errors if it doesn't exist. + * @param dynamo The Dynamo client to use. + * @param deleteTableRequest The delete table request. + * @return True if deleted, false otherwise. + */ + public static boolean deleteTableIfExists(final DynamoDbClient dynamo, final DeleteTableRequest deleteTableRequest) { + try { + dynamo.deleteTable(deleteTableRequest); + return true; + } catch (final ResourceNotFoundException e) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Table " + deleteTableRequest.tableName() + " does not exist", e); + } + } + return false; + } + + /** + * Thrown by {@link DynamoTableUtils} when a table never reaches a desired state + */ + public static class TableNeverTransitionedToStateException extends SdkClientException { + + private static final long serialVersionUID = 8920567021104846647L; + + public TableNeverTransitionedToStateException(String tableName, TableStatus desiredStatus) { + super(SdkClientException.builder() + .message("Table " + tableName + " never transitioned to desired state of " + + desiredStatus.toString())); + } + + } + +} diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java index 4530340537be4..d0da020f31550 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java +++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java @@ -25,8 +25,9 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; -import com.amazonaws.regions.RegionUtils; -import com.amazonaws.services.dynamodbv2.model.BillingMode; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.regions.RegionMetadata; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX; @@ -64,19 +65,21 @@ public class DynamoDbBasedLockConfig extends HoodieConfig { + "Each Hudi dataset should has it's unique key so concurrent writers could refer to the same partition key." + " By default we use the Hudi table name specified to be the partition key"); - public static final ConfigProperty DYNAMODB_LOCK_REGION = ConfigProperty - .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "region") - .defaultValue("us-east-1") - .sinceVersion("0.10.0") - .withInferFunction(cfg -> { - String regionFromEnv = System.getenv("AWS_REGION"); - if (regionFromEnv != null) { - return Option.of(RegionUtils.getRegion(regionFromEnv).getName()); - } - return Option.empty(); - }) - .withDocumentation("For DynamoDB based lock provider, the region used in endpoint for Amazon DynamoDB service." - + " Would try to first get it from AWS_REGION environment variable. If not find, by default use us-east-1"); + public static final ConfigProperty DYNAMODB_LOCK_REGION = + ConfigProperty.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "region") + .defaultValue("us-east-1") + .sinceVersion("0.10.0") + .withInferFunction( + cfg -> { + String regionFromEnv = System.getenv("AWS_REGION"); + if (regionFromEnv != null) { + return Option.of(RegionMetadata.of(Region.of(regionFromEnv)).id()); + } + return Option.empty(); + }) + .withDocumentation( + "For DynamoDB based lock provider, the region used in endpoint for Amazon DynamoDB service." + + " Would try to first get it from AWS_REGION environment variable. If not find, by default use us-east-1"); public static final ConfigProperty DYNAMODB_LOCK_BILLING_MODE = ConfigProperty .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "billing_mode") diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java b/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java index 051fe81e8b0ff..7a5e776db8d53 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/TestHoodieAWSCredentialsProviderFactory.java @@ -18,11 +18,11 @@ package org.apache.hudi.aws; -import com.amazonaws.auth.BasicSessionCredentials; import org.apache.hudi.config.HoodieAWSConfig; import org.apache.hudi.common.config.HoodieConfig; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -34,9 +34,9 @@ public void testGetAWSCredentials() { cfg.setValue(HoodieAWSConfig.AWS_ACCESS_KEY, "random-access-key"); cfg.setValue(HoodieAWSConfig.AWS_SECRET_KEY, "random-secret-key"); cfg.setValue(HoodieAWSConfig.AWS_SESSION_TOKEN, "random-session-token"); - BasicSessionCredentials credentials = (BasicSessionCredentials) org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(cfg.getProps()).getCredentials(); - assertEquals("random-access-key", credentials.getAWSAccessKeyId()); - assertEquals("random-secret-key", credentials.getAWSSecretKey()); - assertEquals("random-session-token", credentials.getSessionToken()); + AwsSessionCredentials credentials = (AwsSessionCredentials) org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(cfg.getProps()).resolveCredentials(); + assertEquals("random-access-key", credentials.accessKeyId()); + assertEquals("random-secret-key", credentials.secretAccessKey()); + assertEquals("random-session-token", credentials.sessionToken()); } } diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java index 85f551e6fda82..600dd57869599 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java @@ -18,11 +18,11 @@ package org.apache.hudi.aws.cloudwatch; -import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync; -import com.amazonaws.services.cloudwatch.model.Dimension; -import com.amazonaws.services.cloudwatch.model.MetricDatum; -import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; -import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; +import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; +import software.amazon.awssdk.services.cloudwatch.model.Dimension; +import software.amazon.awssdk.services.cloudwatch.model.MetricDatum; +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest; +import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse; import com.codahale.metrics.Clock; import com.codahale.metrics.Counter; import com.codahale.metrics.ExponentiallyDecayingReservoir; @@ -67,10 +67,10 @@ public class TestCloudWatchReporter { MetricRegistry metricRegistry; @Mock - AmazonCloudWatchAsync cloudWatchAsync; + CloudWatchAsyncClient cloudWatchAsync; @Mock - CompletableFuture cloudWatchFuture; + CompletableFuture cloudWatchFuture; @Captor ArgumentCaptor putMetricDataRequestCaptor; @@ -89,7 +89,7 @@ public void setup() { .convertDurationsTo(TimeUnit.MILLISECONDS) .build(cloudWatchAsync); - Mockito.when(cloudWatchAsync.putMetricDataAsync(ArgumentMatchers.any())).thenReturn(cloudWatchFuture); + Mockito.when(cloudWatchAsync.putMetricData((PutMetricDataRequest) ArgumentMatchers.any())).thenReturn(cloudWatchFuture); } @Test @@ -130,52 +130,54 @@ public void testReporter() { // Since there are 6 metrics in total, and max datums per request is 2 we would expect 3 calls to CloudWatch // with 2 datums in each - Mockito.verify(cloudWatchAsync, Mockito.times(3)).putMetricDataAsync(putMetricDataRequestCaptor.capture()); - Assertions.assertEquals(NAMESPACE, putMetricDataRequestCaptor.getValue().getNamespace()); + Mockito.verify(cloudWatchAsync, Mockito.times(3)).putMetricData(putMetricDataRequestCaptor.capture()); + Assertions.assertEquals(NAMESPACE, putMetricDataRequestCaptor.getValue().namespace()); List putMetricDataRequests = putMetricDataRequestCaptor.getAllValues(); - putMetricDataRequests.forEach(request -> assertEquals(2, request.getMetricData().size())); + putMetricDataRequests.forEach(request -> assertEquals(2, request.metricData().size())); - List metricDataBatch1 = putMetricDataRequests.get(0).getMetricData(); - assertEquals(PREFIX + ".gauge1", metricDataBatch1.get(0).getMetricName()); - assertEquals(Double.valueOf(gauge1.getValue()), metricDataBatch1.get(0).getValue()); - assertDimensions(metricDataBatch1.get(0).getDimensions(), DIMENSION_GAUGE_TYPE_VALUE); + List metricDataBatch1 = putMetricDataRequests.get(0).metricData(); + assertEquals(PREFIX + ".gauge1", metricDataBatch1.get(0).metricName()); + assertEquals(Double.valueOf(gauge1.getValue()), metricDataBatch1.get(0).value()); + assertDimensions(metricDataBatch1.get(0).dimensions(), DIMENSION_GAUGE_TYPE_VALUE); - assertEquals(PREFIX + ".gauge2", metricDataBatch1.get(1).getMetricName()); - assertEquals(gauge2.getValue(), metricDataBatch1.get(1).getValue()); - assertDimensions(metricDataBatch1.get(1).getDimensions(), DIMENSION_GAUGE_TYPE_VALUE); + assertEquals(PREFIX + ".gauge2", metricDataBatch1.get(1).metricName()); + assertEquals(gauge2.getValue(), metricDataBatch1.get(1).value()); + assertDimensions(metricDataBatch1.get(1).dimensions(), DIMENSION_GAUGE_TYPE_VALUE); - List metricDataBatch2 = putMetricDataRequests.get(1).getMetricData(); - assertEquals(PREFIX + ".counter1", metricDataBatch2.get(0).getMetricName()); - assertEquals(counter1.getCount(), metricDataBatch2.get(0).getValue().longValue()); - assertDimensions(metricDataBatch2.get(0).getDimensions(), DIMENSION_COUNT_TYPE_VALUE); + List metricDataBatch2 = putMetricDataRequests.get(1).metricData(); + assertEquals(PREFIX + ".counter1", metricDataBatch2.get(0).metricName()); + assertEquals(counter1.getCount(), metricDataBatch2.get(0).value().longValue()); + assertDimensions(metricDataBatch2.get(0).dimensions(), DIMENSION_COUNT_TYPE_VALUE); - assertEquals(PREFIX + ".histogram1", metricDataBatch2.get(1).getMetricName()); - assertEquals(histogram1.getCount(), metricDataBatch2.get(1).getValue().longValue()); - assertDimensions(metricDataBatch2.get(1).getDimensions(), DIMENSION_COUNT_TYPE_VALUE); + assertEquals(PREFIX + ".histogram1", metricDataBatch2.get(1).metricName()); + assertEquals(histogram1.getCount(), metricDataBatch2.get(1).value().longValue()); + assertDimensions(metricDataBatch2.get(1).dimensions(), DIMENSION_COUNT_TYPE_VALUE); - List metricDataBatch3 = putMetricDataRequests.get(2).getMetricData(); - assertEquals(PREFIX + ".meter1", metricDataBatch3.get(0).getMetricName()); - assertEquals(meter1.getCount(), metricDataBatch3.get(0).getValue().longValue()); - assertDimensions(metricDataBatch3.get(0).getDimensions(), DIMENSION_COUNT_TYPE_VALUE); + List metricDataBatch3 = putMetricDataRequests.get(2).metricData(); + assertEquals(PREFIX + ".meter1", metricDataBatch3.get(0).metricName()); + assertEquals(meter1.getCount(), metricDataBatch3.get(0).value().longValue()); + assertDimensions(metricDataBatch3.get(0).dimensions(), DIMENSION_COUNT_TYPE_VALUE); - assertEquals(PREFIX + ".timer1", metricDataBatch3.get(1).getMetricName()); - assertEquals(timer1.getCount(), metricDataBatch3.get(1).getValue().longValue()); - assertDimensions(metricDataBatch3.get(1).getDimensions(), DIMENSION_COUNT_TYPE_VALUE); + assertEquals(PREFIX + ".timer1", metricDataBatch3.get(1).metricName()); + assertEquals(timer1.getCount(), metricDataBatch3.get(1).value().longValue()); + assertDimensions(metricDataBatch3.get(1).dimensions(), DIMENSION_COUNT_TYPE_VALUE); reporter.stop(); - Mockito.verify(cloudWatchAsync).shutdown(); + Mockito.verify(cloudWatchAsync).close(); } private void assertDimensions(List actualDimensions, String metricTypeDimensionVal) { assertEquals(2, actualDimensions.size()); - Dimension expectedTableNameDimension = new Dimension() - .withName(DIMENSION_TABLE_NAME_KEY) - .withValue(TABLE_NAME); - Dimension expectedMetricTypeDimension = new Dimension() - .withName(DIMENSION_METRIC_TYPE_KEY) - .withValue(metricTypeDimensionVal); + Dimension expectedTableNameDimension = Dimension.builder() + .name(DIMENSION_TABLE_NAME_KEY) + .value(TABLE_NAME) + .build(); + Dimension expectedMetricTypeDimension = Dimension.builder() + .name(DIMENSION_METRIC_TYPE_KEY) + .value(metricTypeDimensionVal) + .build(); assertEquals(expectedTableNameDimension, actualDimensions.get(0)); assertEquals(expectedMetricTypeDimension, actualDimensions.get(1)); diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java index d2ab0375e050c..150468012d19d 100644 --- a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java +++ b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java @@ -18,13 +18,10 @@ package org.apache.hudi.aws.transaction.integ; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.model.BillingMode; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.config.DynamoDbBasedLockConfig; @@ -32,7 +29,10 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.BillingMode; +import java.net.URI; import java.util.UUID; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -46,7 +46,7 @@ public class ITTestDynamoDBBasedLockProvider { private static LockConfiguration lockConfiguration; - private static AmazonDynamoDB dynamoDb; + private static DynamoDbClient dynamoDb; private static final String TABLE_NAME_PREFIX = "testDDBTable-"; private static final String REGION = "us-east-2"; @@ -103,18 +103,19 @@ public void testUnlockWithoutLock() { dynamoDbBasedLockProvider.unlock(); } - private static AmazonDynamoDB getDynamoClientWithLocalEndpoint() { + private static DynamoDbClient getDynamoClientWithLocalEndpoint() { String endpoint = System.getProperty("dynamodb-local.endpoint"); if (endpoint == null || endpoint.isEmpty()) { throw new IllegalStateException("dynamodb-local.endpoint system property not set"); } - return AmazonDynamoDBClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, REGION)) - .withCredentials(getCredentials()) + return DynamoDbClient.builder() + .region(Region.of(REGION)) + .endpointOverride(URI.create(endpoint)) + .credentialsProvider(getCredentials()) .build(); } - private static AWSCredentialsProvider getCredentials() { - return new AWSStaticCredentialsProvider(new BasicAWSCredentials("random-access-key", "random-secret-key")); + private static AwsCredentialsProvider getCredentials() { + return StaticCredentialsProvider.create(AwsBasicCredentials.create("random-access-key", "random-secret-key")); } } diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 2a77116893ffc..3ebde8b3c6b72 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -136,6 +136,18 @@ hudi-aws ${project.version}
+ + + org.apache.httpcomponents + httpclient + ${aws.sdk.httpclient.version} + + + org.apache.httpcomponents + httpcore + ${aws.sdk.httpcore.version} + + org.apache.hudi hudi-gcp @@ -456,6 +468,14 @@ test + + + + software.amazon.awssdk + sqs + ${aws.sdk.version} + + ${hive.groupid} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java index df84217381b66..a742b4bfd869b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java @@ -25,8 +25,8 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -48,7 +48,7 @@ public class S3EventsSource extends RowSource implements Closeable { private final S3EventsMetaSelector pathSelector; private final List processedMessages = new ArrayList<>(); - AmazonSQS sqs; + SqsClient sqs; public S3EventsSource( TypedProperties props, @@ -85,7 +85,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt @Override public void close() throws IOException { // close resource - this.sqs.shutdown(); + this.sqs.close(); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java index 6112dfc709360..b6f1afbdc7723 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java @@ -22,17 +22,17 @@ import org.apache.hudi.utilities.config.DFSPathSelectorConfig; import org.apache.hudi.utilities.config.S3SourceConfig; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.BatchResultErrorEntry; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; -import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; -import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; -import com.amazonaws.services.sqs.model.GetQueueAttributesResult; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,11 +97,14 @@ public CloudObjectsSelector(TypedProperties props) { * @param queueUrl queue full url * @return map of attributes needed */ - protected Map getSqsQueueAttributes(AmazonSQS sqsClient, String queueUrl) { - GetQueueAttributesResult queueAttributesResult = sqsClient.getQueueAttributes( - new GetQueueAttributesRequest(queueUrl).withAttributeNames(SQS_ATTR_APPROX_MESSAGES) + protected Map getSqsQueueAttributes(SqsClient sqsClient, String queueUrl) { + GetQueueAttributesResponse queueAttributesResult = sqsClient.getQueueAttributes( + GetQueueAttributesRequest.builder() + .queueUrl(queueUrl) + .attributeNames(QueueAttributeName.fromValue(SQS_ATTR_APPROX_MESSAGES)) + .build() ); - return queueAttributesResult.getAttributes(); + return queueAttributesResult.attributesAsStrings(); } /** @@ -128,27 +131,27 @@ protected Map getFileAttributesFromRecord(JSONObject record) thr /** * Amazon SQS Client Builder. */ - public AmazonSQS createAmazonSqsClient() { - return AmazonSQSClientBuilder.standard().withRegion(Regions.fromName(regionName)).build(); + public SqsClient createAmazonSqsClient() { + return SqsClient.builder().region(Region.of(regionName)).build(); } /** * List messages from queue. */ protected List getMessagesToProcess( - AmazonSQS sqsClient, + SqsClient sqsClient, String queueUrl, int longPollWait, int visibilityTimeout, int maxMessagePerBatch, int maxMessagesPerRequest) { List messagesToProcess = new ArrayList<>(); - ReceiveMessageRequest receiveMessageRequest = - new ReceiveMessageRequest() - .withQueueUrl(queueUrl) - .withWaitTimeSeconds(longPollWait) - .withVisibilityTimeout(visibilityTimeout); - receiveMessageRequest.setMaxNumberOfMessages(maxMessagesPerRequest); + ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() + .queueUrl(queueUrl) + .waitTimeSeconds(longPollWait) + .visibilityTimeout(visibilityTimeout) + .maxNumberOfMessages(maxMessagesPerRequest) + .build(); // Get count for available messages Map queueAttributesResult = getSqsQueueAttributes(sqsClient, queueUrl); long approxMessagesAvailable = Long.parseLong(queueAttributesResult.get(SQS_ATTR_APPROX_MESSAGES)); @@ -157,7 +160,7 @@ protected List getMessagesToProcess( for (int i = 0; i < (int) Math.ceil((double) numMessagesToProcess / maxMessagesPerRequest); ++i) { - List messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages(); + List messages = sqsClient.receiveMessage(receiveMessageRequest).messages(); log.debug("Number of messages: " + messages.size()); messagesToProcess.addAll(messages); if (messages.isEmpty()) { @@ -192,20 +195,21 @@ protected List> createListPartitions(List singleList, int /** * Delete batch of messages from queue. */ - protected void deleteBatchOfMessages(AmazonSQS sqs, String queueUrl, List messagesToBeDeleted) { + protected void deleteBatchOfMessages(SqsClient sqs, String queueUrl, List messagesToBeDeleted) { DeleteMessageBatchRequest deleteBatchReq = - new DeleteMessageBatchRequest().withQueueUrl(queueUrl); - List deleteEntries = deleteBatchReq.getEntries(); + DeleteMessageBatchRequest.builder().queueUrl(queueUrl).build(); + List deleteEntries = new ArrayList<>(deleteBatchReq.entries()); for (Message message : messagesToBeDeleted) { deleteEntries.add( - new DeleteMessageBatchRequestEntry() - .withId(message.getMessageId()) - .withReceiptHandle(message.getReceiptHandle())); + DeleteMessageBatchRequestEntry.builder() + .id(message.messageId()) + .receiptHandle(message.receiptHandle()) + .build()); } - DeleteMessageBatchResult deleteResult = sqs.deleteMessageBatch(deleteBatchReq); + DeleteMessageBatchResponse deleteResponse = sqs.deleteMessageBatch(deleteBatchReq); List deleteFailures = - deleteResult.getFailed().stream() - .map(BatchResultErrorEntry::getId) + deleteResponse.failed().stream() + .map(BatchResultErrorEntry::id) .collect(Collectors.toList()); if (!deleteFailures.isEmpty()) { log.warn( @@ -222,7 +226,7 @@ protected void deleteBatchOfMessages(AmazonSQS sqs, String queueUrl, List processedMessages) { + public void deleteProcessedMessages(SqsClient sqs, String queueUrl, List processedMessages) { if (!processedMessages.isEmpty()) { // create batch for deletion, SES DeleteMessageBatchRequest only accept max 10 entries List> deleteBatches = createListPartitions(processedMessages, 10); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java index 4c190d638c635..763351eecc9a2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java @@ -26,8 +26,8 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.config.DFSPathSelectorConfig; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; import com.fasterxml.jackson.databind.ObjectMapper; import org.json.JSONException; import org.json.JSONObject; @@ -84,7 +84,7 @@ public static S3EventsMetaSelector createSourceSelector(TypedProperties props) { * @param processedMessages array of processed messages to add more messages * @return the filtered list of valid S3 events in SQS. */ - protected List> getValidEvents(AmazonSQS sqs, List processedMessages) throws IOException { + protected List> getValidEvents(SqsClient sqs, List processedMessages) throws IOException { List messages = getMessagesToProcess( sqs, @@ -100,7 +100,7 @@ private List> processAndDeleteInvalidMessages(List List messages) throws IOException { List> validEvents = new ArrayList<>(); for (Message message : messages) { - JSONObject messageBody = new JSONObject(message.getBody()); + JSONObject messageBody = new JSONObject(message.body()); Map messageMap; ObjectMapper mapper = new ObjectMapper(); if (messageBody.has(SQS_MODEL_MESSAGE)) { @@ -136,7 +136,7 @@ private List> processAndDeleteInvalidMessages(List * @param lastCheckpointStr The last checkpoint instant string, empty if first run. * @return A pair of dataset of event records and the next checkpoint instant string. */ - public Pair, String> getNextEventsFromQueue(AmazonSQS sqs, + public Pair, String> getNextEventsFromQueue(SqsClient sqs, Option lastCheckpointStr, List processedMessages) { processedMessages.clear(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java index 9ae47958f57bd..c34155ff7d844 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelector.java @@ -24,8 +24,6 @@ import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.utilities.testutils.CloudObjectTestUtils; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.Message; import com.fasterxml.jackson.databind.ObjectMapper; import org.json.JSONObject; import org.junit.jupiter.api.AfterEach; @@ -35,6 +33,8 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; import java.io.IOException; import java.util.ArrayList; @@ -62,7 +62,7 @@ public class TestCloudObjectsSelector extends HoodieClientTestHarness { String sqsUrl; @Mock - AmazonSQS sqs; + SqsClient sqs; @Mock private CloudObjectsSelector cloudObjectsSelector; @@ -156,22 +156,22 @@ public void testCreateListPartitionsReturnsExpectedSetOfBatch(Class clazz) { // setup lists List testSingleList = new ArrayList<>(); - testSingleList.add(new Message().addAttributesEntry("id", "1")); - testSingleList.add(new Message().addAttributesEntry("id", "2")); - testSingleList.add(new Message().addAttributesEntry("id", "3")); - testSingleList.add(new Message().addAttributesEntry("id", "4")); - testSingleList.add(new Message().addAttributesEntry("id", "5")); + testSingleList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "1")).build()); + testSingleList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "2")).build()); + testSingleList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "3")).build()); + testSingleList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "4")).build()); + testSingleList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "5")).build()); List expectedFirstList = new ArrayList<>(); - expectedFirstList.add(new Message().addAttributesEntry("id", "1")); - expectedFirstList.add(new Message().addAttributesEntry("id", "2")); + expectedFirstList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "1")).build()); + expectedFirstList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "2")).build()); List expectedSecondList = new ArrayList<>(); - expectedSecondList.add(new Message().addAttributesEntry("id", "3")); - expectedSecondList.add(new Message().addAttributesEntry("id", "4")); + expectedSecondList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "3")).build()); + expectedSecondList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "4")).build()); List expectedFinalList = new ArrayList<>(); - expectedFinalList.add(new Message().addAttributesEntry("id", "5")); + expectedFinalList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "5")).build()); // test the return values List> partitionedList = selector.createListPartitions(testSingleList, 2); @@ -191,8 +191,8 @@ public void testCreateListPartitionsReturnsEmptyIfBatchSizeIsZero(Class clazz // setup lists List testSingleList = new ArrayList<>(); - testSingleList.add(new Message().addAttributesEntry("id", "1")); - testSingleList.add(new Message().addAttributesEntry("id", "2")); + testSingleList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "1")).build()); + testSingleList.add(Message.builder().attributesWithStrings(createAttributeMap("id", "2")).build()); // test the return values List> partitionedList = selector.createListPartitions(testSingleList, 0); @@ -210,17 +210,26 @@ public void testOnCommitDeleteProcessedMessages(Class clazz) { // setup lists List testSingleList = new ArrayList<>(); testSingleList.add( - new Message() - .addAttributesEntry("MessageId", "1") - .addAttributesEntry("ReceiptHandle", "1")); + Message.builder() + .attributesWithStrings(createAttributeMap("MessageId", "1")) + .attributesWithStrings(createAttributeMap("ReceiptHandle", "1")) + .build()); testSingleList.add( - new Message() - .addAttributesEntry("MessageId", "2") - .addAttributesEntry("ReceiptHandle", "1")); + Message.builder() + .attributesWithStrings(createAttributeMap("MessageId", "2")) + .attributesWithStrings(createAttributeMap("ReceiptHandle", "1")) + .build()); deleteMessagesInQueue(sqs); // test the return values selector.deleteProcessedMessages(sqs, sqsUrl, testSingleList); } + + public Map createAttributeMap(String key, String value) { + Map attribute = new HashMap<>(); + attribute.put(key, value); + return attribute; + } + } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java index 7bae65e73301d..534103ba5fb1d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java @@ -26,10 +26,6 @@ import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.utilities.testutils.CloudObjectTestUtils; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; -import com.amazonaws.services.sqs.model.GetQueueAttributesResult; -import com.amazonaws.services.sqs.model.Message; import org.apache.hadoop.fs.Path; import org.json.JSONObject; import org.junit.jupiter.api.AfterEach; @@ -40,9 +36,15 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; +import software.amazon.awssdk.services.sqs.model.Message; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.hudi.utilities.config.S3SourceConfig.S3_SOURCE_QUEUE_REGION; import static org.apache.hudi.utilities.config.S3SourceConfig.S3_SOURCE_QUEUE_URL; @@ -59,7 +61,7 @@ public class TestS3EventsMetaSelector extends HoodieClientTestHarness { String sqsUrl; @Mock - AmazonSQS sqs; + SqsClient sqs; @Mock private S3EventsMetaSelector s3EventsMetaSelector; @@ -114,10 +116,13 @@ public void testNextEventsFromQueueShouldReturnsEventsFromQueue(Class clazz) @Test public void testEventsFromQueueNoMessages() { S3EventsMetaSelector selector = new S3EventsMetaSelector(props); + Map attribute = new HashMap<>(); + attribute.put(SQS_ATTR_APPROX_MESSAGES, "0"); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))) .thenReturn( - new GetQueueAttributesResult() - .addAttributesEntry(SQS_ATTR_APPROX_MESSAGES, "0")); + GetQueueAttributesResponse.builder() + .attributesWithStrings(attribute) + .build()); List processed = new ArrayList<>(); Pair, String> eventFromQueue = diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java index 49ea2de0480e0..9a848463df6d9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/CloudObjectTestUtils.java @@ -19,18 +19,20 @@ package org.apache.hudi.utilities.testutils; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; -import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; -import com.amazonaws.services.sqs.model.GetQueueAttributesResult; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; import org.apache.hadoop.fs.Path; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; +import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES; import static org.mockito.ArgumentMatchers.any; @@ -48,9 +50,9 @@ public class CloudObjectTestUtils { * @param sqs Mocked instance of AmazonSQS * @param path Absolute Path of file in FileSystem */ - public static void setMessagesInQueue(AmazonSQS sqs, Path path) { + public static void setMessagesInQueue(SqsClient sqs, Path path) { - ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); + ReceiveMessageResponse receiveMessageResult = ReceiveMessageResponse.builder().build(); String approximateNumberOfMessages = "0"; if (path != null) { @@ -69,21 +71,25 @@ public static void setMessagesInQueue(AmazonSQS sqs, Path path) { + path.getName() + "\\\",\\\"size\\\":123,\\\"eTag\\\":\\\"test\\\",\\\"sequencer\\\":\\\"1\\\"}}}]}\"}"; - Message message = new Message(); - message.setReceiptHandle("1"); - message.setMessageId("1"); - message.setBody(body); + Message message = Message.builder() + .receiptHandle("1") + .messageId("1") + .body(body) + .build(); List messages = new ArrayList<>(); messages.add(message); - receiveMessageResult.setMessages(messages); + receiveMessageResult = ReceiveMessageResponse.builder().messages(messages).build(); approximateNumberOfMessages = "1"; } + Map attributes = new HashMap(); + attributes.put(SQS_ATTR_APPROX_MESSAGES, approximateNumberOfMessages); when(sqs.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))) .thenReturn( - new GetQueueAttributesResult() - .addAttributesEntry(SQS_ATTR_APPROX_MESSAGES, approximateNumberOfMessages)); + GetQueueAttributesResponse.builder() + .attributesWithStrings(attributes) + .build()); } /** @@ -91,8 +97,8 @@ public static void setMessagesInQueue(AmazonSQS sqs, Path path) { * * @param sqs Mocked instance of AmazonSQS */ - public static void deleteMessagesInQueue(AmazonSQS sqs) { + public static void deleteMessagesInQueue(SqsClient sqs) { when(sqs.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) - .thenReturn(new DeleteMessageBatchResult()); + .thenReturn(DeleteMessageBatchResponse.builder().build()); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java index 3fced8d031cf2..bdb6c85ce72b5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java @@ -25,7 +25,6 @@ import org.apache.hudi.utilities.testutils.CloudObjectTestUtils; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; -import com.amazonaws.services.sqs.AmazonSQS; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -33,6 +32,7 @@ import org.junit.jupiter.api.BeforeEach; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import software.amazon.awssdk.services.sqs.SqsClient; import java.io.IOException; import java.util.List; @@ -50,7 +50,7 @@ public abstract class AbstractCloudObjectsSourceTestBase extends UtilitiesTestBa protected String sqsUrl = "test-queue"; protected String regionName = "us-east-1"; @Mock - protected AmazonSQS sqs; + protected SqsClient sqs; @BeforeAll public static void initClass() throws Exception { diff --git a/packaging/hudi-aws-bundle/pom.xml b/packaging/hudi-aws-bundle/pom.xml index b464293e1f4ae..19af282281cc0 100644 --- a/packaging/hudi-aws-bundle/pom.xml +++ b/packaging/hudi-aws-bundle/pom.xml @@ -78,11 +78,12 @@ org.apache.hudi:hudi-hive-sync org.apache.hudi:hudi-aws org.apache.parquet:parquet-avro + org.reactivestreams:reactive-streams com.amazonaws:dynamodb-lock-client - com.amazonaws:aws-java-sdk-cloudwatch - com.amazonaws:aws-java-sdk-dynamodb - com.amazonaws:aws-java-sdk-core - com.amazonaws:aws-java-sdk-glue + org.apache.httpcomponents:httpclient + org.apache.httpcomponents:httpcore + io.netty:* + software.amazon.awssdk:* io.dropwizard.metrics:metrics-core com.beust:jcommander commons-io:commons-io @@ -99,8 +100,8 @@ org.apache.hudi.org.apache.commons.io. - com.amazonaws. - org.apache.hudi.com.amazonaws. + software.amazon.awssdk. + org.apache.hudi.software.amazon.awssdk. com.codahale.metrics. @@ -110,6 +111,10 @@ org.openjdk.jol. org.apache.hudi.org.openjdk.jol. + + org.apache.httpcomponents. + org.apache.hudi.aws.org.apache.httpcomponents. + false @@ -173,6 +178,17 @@ ${project.version} + + org.apache.httpcomponents + httpclient + ${aws.sdk.httpclient.version} + + + org.apache.httpcomponents + httpcore + ${aws.sdk.httpcore.version} + + org.apache.parquet diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 70daeabf1d7c7..5f01ebb921f43 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -153,6 +153,10 @@ commons-codec:commons-codec commons-io:commons-io org.openjdk.jol:jol-core + org.apache.hudi:hudi-aws + software.amazon.awssdk:* + org.apache.httpcomponents:httpclient + org.apache.httpcomponents:httpcore @@ -219,6 +223,10 @@ org.openjdk.jol. org.apache.hudi.org.openjdk.jol. + + org.apache.httpcomponents. + org.apache.hudi.aws.org.apache.httpcomponents. + diff --git a/pom.xml b/pom.xml index 9cb6ca042d66b..36bad0ce14656 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,8 @@ 1.5.6 0.16 0.8.0 + 4.5.13 + 4.4.13 4.4.1 ${spark2.version} 2.4.4 @@ -206,10 +208,9 @@ 2.7.1 3.4.2 4.7 - 1.12.22 - 3.21.7 + 2.18.40 + 3.21.5 3.21.5 - 1.1.0 3.5.7 0.16 1.120.0