Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions hudi-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-lock-client</artifactId>
<version>${dynamodb.lockclient.version}</version>
<version>1.2.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you keep the version variable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in we do not specify a version and let it pull in the latest version?

</dependency>

<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
Expand All @@ -104,14 +103,14 @@

<!-- AWS SDK -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatch</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>cloudwatch</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<version>${aws.sdk.version}</version>
<exclusions>
<exclusion>
Expand All @@ -121,8 +120,8 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

Expand All @@ -133,18 +132,29 @@

<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-glue -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sqs -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${aws.sdk.httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${aws.sdk.httpcore.version}</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.apache.hudi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
import org.apache.hudi.common.util.Option;

import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder;
import com.amazonaws.services.cloudwatch.model.Dimension;
import com.amazonaws.services.cloudwatch.model.MetricDatum;
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import com.codahale.metrics.Clock;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Counting;
Expand All @@ -41,8 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -63,7 +62,7 @@ public class CloudWatchReporter extends ScheduledReporter {

private static final Logger LOG = LoggerFactory.getLogger(CloudWatchReporter.class);

private final AmazonCloudWatchAsync cloudWatchClientAsync;
private final CloudWatchAsyncClient cloudWatchClientAsync;
private final Clock clock;
private final String prefix;
private final String namespace;
Expand Down Expand Up @@ -139,7 +138,7 @@ public CloudWatchReporter build(Properties props) {
durationUnit);
}

CloudWatchReporter build(AmazonCloudWatchAsync amazonCloudWatchAsync) {
CloudWatchReporter build(CloudWatchAsyncClient amazonCloudWatchAsync) {
return new CloudWatchReporter(registry,
amazonCloudWatchAsync,
clock,
Expand All @@ -153,7 +152,7 @@ CloudWatchReporter build(AmazonCloudWatchAsync amazonCloudWatchAsync) {
}

protected CloudWatchReporter(MetricRegistry registry,
AmazonCloudWatchAsync cloudWatchClientAsync,
CloudWatchAsyncClient cloudWatchClientAsync,
Clock clock,
String prefix,
String namespace,
Expand All @@ -169,9 +168,9 @@ protected CloudWatchReporter(MetricRegistry registry,
this.maxDatumsPerRequest = maxDatumsPerRequest;
}

private static AmazonCloudWatchAsync getAmazonCloudWatchClient(Properties props) {
return AmazonCloudWatchAsyncClientBuilder.standard()
.withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props))
private static CloudWatchAsyncClient getAmazonCloudWatchClient(Properties props) {
return CloudWatchAsyncClient.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does .standard() do before? Is it by default in the builder now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/aws/aws-sdk-java-v2/blob/master/docs/LaunchChangelog.md#4-service-changes.

If you look at the table for several of these aws clients the table shows a before column where the .standard was used in 1.x and now in 2.x it is replaced by using the .builder pattern. See this snippet taken from link above

Client builders no longer contain static methods. The static methods on the clients must be used: AmazonDynamoDBClientBuilder.defaultClient is now DynamoDbClient.create and AmazonDynamoDBClientBuilder.standard is now DynamoDbClient.builder.

.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props))
.build();
}

Expand Down Expand Up @@ -213,7 +212,7 @@ public void report(SortedMap<String, Gauge> gauges,
}

private void report(List<MetricDatum> metricsData) {
List<Future<PutMetricDataResult>> cloudWatchFutures = new ArrayList<>(metricsData.size());
List<Future<PutMetricDataResponse>> cloudWatchFutures = new ArrayList<>(metricsData.size());
List<List<MetricDatum>> partitions = new ArrayList<>();

for (int i = 0; i < metricsData.size(); i += maxDatumsPerRequest) {
Expand All @@ -222,14 +221,15 @@ private void report(List<MetricDatum> metricsData) {
}

for (List<MetricDatum> partition : partitions) {
PutMetricDataRequest request = new PutMetricDataRequest()
.withNamespace(namespace)
.withMetricData(partition);
PutMetricDataRequest request = PutMetricDataRequest.builder()
.namespace(namespace)
.metricData(partition)
.build();

cloudWatchFutures.add(cloudWatchClientAsync.putMetricDataAsync(request));
cloudWatchFutures.add(cloudWatchClientAsync.putMetricData(request));
}

for (final Future<PutMetricDataResult> cloudWatchFuture : cloudWatchFutures) {
for (final Future<PutMetricDataResponse> cloudWatchFuture : cloudWatchFutures) {
try {
cloudWatchFuture.get(30, TimeUnit.SECONDS);
} catch (final Exception ex) {
Expand All @@ -250,7 +250,7 @@ private void processGauge(final String metricName,
.ifPresent(value -> stageMetricDatum(metricName,
value.doubleValue(),
DIMENSION_GAUGE_TYPE_VALUE,
StandardUnit.None,
StandardUnit.NONE,
timestampMilliSec,
metricData));
}
Expand All @@ -262,7 +262,7 @@ private void processCounter(final String metricName,
stageMetricDatum(metricName,
counter.getCount(),
DIMENSION_COUNT_TYPE_VALUE,
StandardUnit.Count,
StandardUnit.COUNT,
timestampMilliSec,
metricData);
}
Expand All @@ -277,22 +277,25 @@ private void stageMetricDatum(String metricName,
String tableName = metricNameParts[0];


metricData.add(new MetricDatum()
.withTimestamp(new Date(timestampMilliSec))
.withMetricName(prefix(metricNameParts[1]))
.withValue(metricValue)
.withDimensions(getDimensions(tableName, metricType))
.withUnit(standardUnit));
metricData.add(MetricDatum.builder()
.timestamp(Instant.ofEpochMilli(timestampMilliSec))
.metricName(prefix(metricNameParts[1]))
.value(metricValue)
.dimensions(getDimensions(tableName, metricType))
.unit(standardUnit)
.build());
}

private List<Dimension> getDimensions(String tableName, String metricType) {
List<Dimension> dimensions = new ArrayList<>();
dimensions.add(new Dimension()
.withName(DIMENSION_TABLE_NAME_KEY)
.withValue(tableName));
dimensions.add(new Dimension()
.withName(DIMENSION_METRIC_TYPE_KEY)
.withValue(metricType));
dimensions.add(Dimension.builder()
.name(DIMENSION_TABLE_NAME_KEY)
.value(tableName)
.build());
dimensions.add(Dimension.builder()
.name(DIMENSION_METRIC_TYPE_KEY)
.value(metricType)
.build());
return dimensions;
}

Expand All @@ -306,7 +309,7 @@ public void stop() {
super.stop();
} finally {
try {
cloudWatchClientAsync.shutdown();
cloudWatchClientAsync.close();
} catch (Exception ex) {
LOG.warn("Exception while shutting down CloudWatch client.", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.hudi.aws.credentials;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -30,16 +30,23 @@
* Factory class for Hoodie AWSCredentialsProvider.
*/
public class HoodieAWSCredentialsProviderFactory {
public static AWSCredentialsProvider getAwsCredentialsProvider(Properties props) {
public static AwsCredentialsProvider getAwsCredentialsProvider(Properties props) {
return getAwsCredentialsProviderChain(props);
}

private static AWSCredentialsProvider getAwsCredentialsProviderChain(Properties props) {
List<AWSCredentialsProvider> providers = new ArrayList<>();
providers.add(new HoodieConfigAWSCredentialsProvider(props));
providers.add(new DefaultAWSCredentialsProviderChain());
AWSCredentialsProviderChain providerChain = new AWSCredentialsProviderChain(providers);
providerChain.setReuseLastProvider(true);
private static AwsCredentialsProvider getAwsCredentialsProviderChain(Properties props) {
List<AwsCredentialsProvider> providers = new ArrayList<>();
HoodieConfigAWSCredentialsProvider hoodieConfigAWSCredentialsProvider = new HoodieConfigAWSCredentialsProvider(props);
if (hoodieConfigAWSCredentialsProvider.resolveCredentials() != null) {
providers.add(hoodieConfigAWSCredentialsProvider);
}
providers.add(DefaultCredentialsProvider.builder()
.reuseLastProviderEnabled(true)
.build());
AwsCredentialsProviderChain providerChain = AwsCredentialsProviderChain.builder()
.credentialsProviders(providers)
.reuseLastProviderEnabled(true)
.build();
return providerChain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.hudi.aws.credentials;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieAWSConfig;
import org.apache.hudi.common.util.StringUtils;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,11 +33,11 @@
/**
* Credentials provider which fetches AWS access key from Hoodie config.
*/
public class HoodieConfigAWSCredentialsProvider implements AWSCredentialsProvider {
public class HoodieConfigAWSCredentialsProvider implements AwsCredentialsProvider {

private static final Logger LOG = LoggerFactory.getLogger(HoodieConfigAWSCredentialsProvider.class);

private AWSCredentials awsCredentials;
private AwsCredentials awsCredentials;

public HoodieConfigAWSCredentialsProvider(Properties props) {
String accessKey = props.getProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key());
Expand All @@ -52,20 +52,15 @@ public HoodieConfigAWSCredentialsProvider(Properties props) {
}
}

private static AWSCredentials createCredentials(String accessKey, String secretKey,
private static AwsCredentials createCredentials(String accessKey, String secretKey,
String sessionToken) {
return (sessionToken == null)
? new BasicAWSCredentials(accessKey, secretKey)
: new BasicSessionCredentials(accessKey, secretKey, sessionToken);
? AwsBasicCredentials.create(accessKey, secretKey)
: AwsSessionCredentials.create(accessKey, secretKey, sessionToken);
}

@Override
public AWSCredentials getCredentials() {
public AwsCredentials resolveCredentials() {
return this.awsCredentials;
}

@Override
public void refresh() {

}
}
Loading