diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml
index 19ceed7fce7b..263f88affa34 100644
--- a/hudi-aws/pom.xml
+++ b/hudi-aws/pom.xml
@@ -40,11 +40,6 @@
hudi-common
${project.version}
-
- org.apache.hudi
- hudi-client-common
- ${project.version}
-
@@ -79,6 +74,14 @@
dynamodb-lock-client
${dynamodb.lockclient.version}
+
+
+
+ com.amazonaws
+ aws-java-sdk-cloudwatch
+ ${aws.sdk.version}
+
+
com.amazonaws
aws-java-sdk-dynamodb
@@ -96,12 +99,23 @@
${aws.sdk.version}
+
+ io.dropwizard.metrics
+ metrics-core
+
+
org.junit.jupiter
junit-jupiter-api
test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java
new file mode 100644
index 000000000000..e4bc598ce293
--- /dev/null
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/cloudwatch/CloudWatchReporter.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.cloudwatch;
+
+import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.util.Option;
+
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder;
+import com.amazonaws.services.cloudwatch.model.Dimension;
+import com.amazonaws.services.cloudwatch.model.MetricDatum;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
+import com.amazonaws.services.cloudwatch.model.StandardUnit;
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Counting;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.SortedMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A reporter for publishing metrics to Amazon CloudWatch. It is responsible for collecting, converting DropWizard
+ * metrics to CloudWatch metrics and composing metrics payload.
+ */
+public class CloudWatchReporter extends ScheduledReporter {
+
+ static final String DIMENSION_TABLE_NAME_KEY = "Table";
+ static final String DIMENSION_METRIC_TYPE_KEY = "Metric Type";
+ static final String DIMENSION_GAUGE_TYPE_VALUE = "gauge";
+ static final String DIMENSION_COUNT_TYPE_VALUE = "count";
+
+ private static final Logger LOG = LogManager.getLogger(CloudWatchReporter.class);
+
+ private final AmazonCloudWatchAsync cloudWatchClientAsync;
+ private final Clock clock;
+ private final String prefix;
+ private final String namespace;
+ private final int maxDatumsPerRequest;
+
+ public static Builder forRegistry(MetricRegistry registry) {
+ return new Builder(registry);
+ }
+
+ public static class Builder {
+ private MetricRegistry registry;
+ private Clock clock;
+ private String prefix;
+ private TimeUnit rateUnit;
+ private TimeUnit durationUnit;
+ private MetricFilter filter;
+ private String namespace;
+ private int maxDatumsPerRequest;
+
+ private Builder(MetricRegistry registry) {
+ this.registry = registry;
+ this.clock = Clock.defaultClock();
+ this.rateUnit = TimeUnit.SECONDS;
+ this.durationUnit = TimeUnit.MILLISECONDS;
+ this.filter = MetricFilter.ALL;
+ this.maxDatumsPerRequest = 20;
+ }
+
+ public Builder withClock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ public Builder prefixedWith(String prefix) {
+ this.prefix = prefix;
+ return this;
+ }
+
+ public Builder convertRatesTo(TimeUnit rateUnit) {
+ this.rateUnit = rateUnit;
+ return this;
+ }
+
+ public Builder convertDurationsTo(TimeUnit durationUnit) {
+ this.durationUnit = durationUnit;
+ return this;
+ }
+
+ public Builder filter(MetricFilter filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ public Builder namespace(String namespace) {
+ this.namespace = namespace;
+ return this;
+ }
+
+ public Builder maxDatumsPerRequest(int maxDatumsPerRequest) {
+ this.maxDatumsPerRequest = maxDatumsPerRequest;
+ return this;
+ }
+
+ public CloudWatchReporter build(Properties props) {
+ return new CloudWatchReporter(registry,
+ getAmazonCloudWatchClient(props),
+ clock,
+ prefix,
+ namespace,
+ maxDatumsPerRequest,
+ filter,
+ rateUnit,
+ durationUnit);
+ }
+
+ CloudWatchReporter build(AmazonCloudWatchAsync amazonCloudWatchAsync) {
+ return new CloudWatchReporter(registry,
+ amazonCloudWatchAsync,
+ clock,
+ prefix,
+ namespace,
+ maxDatumsPerRequest,
+ filter,
+ rateUnit,
+ durationUnit);
+ }
+ }
+
+ protected CloudWatchReporter(MetricRegistry registry,
+ AmazonCloudWatchAsync cloudWatchClientAsync,
+ Clock clock,
+ String prefix,
+ String namespace,
+ int maxDatumsPerRequest,
+ MetricFilter filter,
+ TimeUnit rateUnit,
+ TimeUnit durationUnit) {
+ super(registry, "hudi-cloudWatch-reporter", filter, rateUnit, durationUnit);
+ this.cloudWatchClientAsync = cloudWatchClientAsync;
+ this.clock = clock;
+ this.prefix = prefix;
+ this.namespace = namespace;
+ this.maxDatumsPerRequest = maxDatumsPerRequest;
+ }
+
+ private static AmazonCloudWatchAsync getAmazonCloudWatchClient(Properties props) {
+ return AmazonCloudWatchAsyncClientBuilder.standard()
+ .withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props))
+ .build();
+ }
+
+ @Override
+ public void report(SortedMap gauges,
+ SortedMap counters,
+ SortedMap histograms,
+ SortedMap meters,
+ SortedMap timers) {
+ LOG.info("Reporting Metrics to CloudWatch.");
+
+ final long timestampMilliSec = clock.getTime();
+ List metricsData = new ArrayList<>();
+
+ for (Map.Entry entry : gauges.entrySet()) {
+ processGauge(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ }
+
+ for (Map.Entry entry : counters.entrySet()) {
+ processCounter(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ }
+
+ for (Map.Entry entry : histograms.entrySet()) {
+ processCounter(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ //TODO: Publish other Histogram metrics to cloud watch
+ }
+
+ for (Map.Entry entry : meters.entrySet()) {
+ processCounter(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ //TODO: Publish other Meter metrics to cloud watch
+ }
+
+ for (Map.Entry entry : timers.entrySet()) {
+ processCounter(entry.getKey(), entry.getValue(), timestampMilliSec, metricsData);
+ //TODO: Publish other Timer metrics to cloud watch
+ }
+
+ report(metricsData);
+ }
+
+ private void report(List metricsData) {
+ List> cloudWatchFutures = new ArrayList<>(metricsData.size());
+ List> partitions = new ArrayList<>();
+
+ for (int i = 0; i < metricsData.size(); i += maxDatumsPerRequest) {
+ int end = Math.min(metricsData.size(), i + maxDatumsPerRequest);
+ partitions.add(metricsData.subList(i, end));
+ }
+
+ for (List partition : partitions) {
+ PutMetricDataRequest request = new PutMetricDataRequest()
+ .withNamespace(namespace)
+ .withMetricData(partition);
+
+ cloudWatchFutures.add(cloudWatchClientAsync.putMetricDataAsync(request));
+ }
+
+ for (final Future cloudWatchFuture : cloudWatchFutures) {
+ try {
+ cloudWatchFuture.get(30, TimeUnit.SECONDS);
+ } catch (final Exception ex) {
+ LOG.error("Error reporting metrics to CloudWatch. The data in this CloudWatch request "
+ + "may have been discarded, and not made it to CloudWatch.", ex);
+ }
+ }
+ }
+
+ private void processGauge(final String metricName,
+ final Gauge> gauge,
+ final long timestampMilliSec,
+ final List metricData) {
+ Option.ofNullable(gauge.getValue())
+ .toJavaOptional()
+ .filter(value -> value instanceof Number)
+ .map(value -> (Number) value)
+ .ifPresent(value -> stageMetricDatum(metricName,
+ value.doubleValue(),
+ DIMENSION_GAUGE_TYPE_VALUE,
+ StandardUnit.None,
+ timestampMilliSec,
+ metricData));
+ }
+
+ private void processCounter(final String metricName,
+ final Counting counter,
+ final long timestampMilliSec,
+ final List metricData) {
+ stageMetricDatum(metricName,
+ counter.getCount(),
+ DIMENSION_COUNT_TYPE_VALUE,
+ StandardUnit.Count,
+ timestampMilliSec,
+ metricData);
+ }
+
+ private void stageMetricDatum(String metricName,
+ double metricValue,
+ String metricType,
+ StandardUnit standardUnit,
+ long timestampMilliSec,
+ List metricData) {
+ String[] metricNameParts = metricName.split("\\.", 2);
+ String tableName = metricNameParts[0];
+
+
+ metricData.add(new MetricDatum()
+ .withTimestamp(new Date(timestampMilliSec))
+ .withMetricName(prefix(metricNameParts[1]))
+ .withValue(metricValue)
+ .withDimensions(getDimensions(tableName, metricType))
+ .withUnit(standardUnit));
+ }
+
+ private List getDimensions(String tableName, String metricType) {
+ List dimensions = new ArrayList<>();
+ dimensions.add(new Dimension()
+ .withName(DIMENSION_TABLE_NAME_KEY)
+ .withValue(tableName));
+ dimensions.add(new Dimension()
+ .withName(DIMENSION_METRIC_TYPE_KEY)
+ .withValue(metricType));
+ return dimensions;
+ }
+
+ private String prefix(String... components) {
+ return MetricRegistry.name(prefix, components);
+ }
+
+ @Override
+ public void stop() {
+ try {
+ super.stop();
+ } finally {
+ try {
+ cloudWatchClientAsync.shutdown();
+ } catch (Exception ex) {
+ LOG.warn("Exception while shutting down CloudWatch client.", ex);
+ }
+ }
+ }
+}
diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java b/hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java
index cb52f66a8427..b6ea8232c1f8 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/AWSLockConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.hudi.config;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import com.amazonaws.regions.RegionUtils;
@@ -34,14 +35,16 @@ public class AWSLockConfiguration {
public static final ConfigProperty DYNAMODB_LOCK_TABLE_NAME = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table")
.noDefaultValue()
+ .sinceVersion("0.10.0")
.withDocumentation("For DynamoDB based lock provider, the name of the DynamoDB table acting as lock table");
public static final ConfigProperty DYNAMODB_LOCK_PARTITION_KEY = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "partition_key")
.noDefaultValue()
+ .sinceVersion("0.10.0")
.withInferFunction(cfg -> {
- if (cfg.contains(HoodieWriteConfig.TBL_NAME)) {
- return Option.of(cfg.getString(HoodieWriteConfig.TBL_NAME));
+ if (cfg.contains(HoodieTableConfig.NAME)) {
+ return Option.of(cfg.getString(HoodieTableConfig.NAME));
}
return Option.empty();
})
@@ -52,6 +55,7 @@ public class AWSLockConfiguration {
public static final ConfigProperty DYNAMODB_LOCK_REGION = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "region")
.defaultValue("us-east-1")
+ .sinceVersion("0.10.0")
.withInferFunction(cfg -> {
String regionFromEnv = System.getenv("AWS_REGION");
if (regionFromEnv != null) {
@@ -65,20 +69,24 @@ public class AWSLockConfiguration {
public static final ConfigProperty DYNAMODB_LOCK_BILLING_MODE = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "billing_mode")
.defaultValue(BillingMode.PAY_PER_REQUEST.name())
+ .sinceVersion("0.10.0")
.withDocumentation("For DynamoDB based lock provider, by default it is PAY_PER_REQUEST mode");
public static final ConfigProperty DYNAMODB_LOCK_READ_CAPACITY = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "read_capacity")
.defaultValue("20")
+ .sinceVersion("0.10.0")
.withDocumentation("For DynamoDB based lock provider, read capacity units when using PROVISIONED billing mode");
public static final ConfigProperty DYNAMODB_LOCK_WRITE_CAPACITY = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "write_capacity")
.defaultValue("10")
+ .sinceVersion("0.10.0")
.withDocumentation("For DynamoDB based lock provider, write capacity units when using PROVISIONED billing mode");
public static final ConfigProperty DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table_creation_timeout")
.defaultValue(String.valueOf(10 * 60 * 1000))
+ .sinceVersion("0.10.0")
.withDocumentation("For DynamoDB based lock provider, the maximum number of milliseconds to wait for creating DynamoDB table");
}
diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java
index 6c62b61528ad..405caafa1a90 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieAWSConfig.java
@@ -40,23 +40,27 @@
* Configurations used by the AWS credentials and AWS DynamoDB based lock.
*/
@Immutable
-@ConfigClassProperty(name = "AWS credential Configs",
- groupName = ConfigGroups.Names.AWS_DYNAMO_DB,
- description = "Configurations used for AWS credentials to get AWS resources.")
+@ConfigClassProperty(name = "Amazon Web Services Configs",
+ groupName = ConfigGroups.Names.AWS,
+ description = "Amazon Web Services configurations to access resources like Amazon DynamoDB (for locks),"
+ + " Amazon CloudWatch (metrics).")
public class HoodieAWSConfig extends HoodieConfig {
public static final ConfigProperty AWS_ACCESS_KEY = ConfigProperty
.key("hoodie.aws.access.key")
.noDefaultValue()
+ .sinceVersion("0.10.0")
.withDocumentation("AWS access key id");
public static final ConfigProperty AWS_SECRET_KEY = ConfigProperty
.key("hoodie.aws.secret.key")
.noDefaultValue()
+ .sinceVersion("0.10.0")
.withDocumentation("AWS secret key");
public static final ConfigProperty AWS_SESSION_TOKEN = ConfigProperty
.key("hoodie.aws.session.token")
.noDefaultValue()
+ .sinceVersion("0.10.0")
.withDocumentation("AWS session token");
private HoodieAWSConfig() {
diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/HoodieMetricsCloudWatchConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieMetricsCloudWatchConfig.java
new file mode 100644
index 000000000000..e4e46d5a1f7b
--- /dev/null
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/HoodieMetricsCloudWatchConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import java.util.Properties;
+
+@ConfigClassProperty(
+ name = "Metrics Configurations for Amazon CloudWatch",
+ groupName = ConfigGroups.Names.METRICS,
+ description =
+ "Enables reporting on Hudi metrics using Amazon CloudWatch. "
+ + " Hudi publishes metrics on every commit, clean, rollback etc.")
+public class HoodieMetricsCloudWatchConfig extends HoodieConfig {
+
+ public static final String CLOUDWATCH_PREFIX = "hoodie.metrics.cloudwatch";
+
+ public static final ConfigProperty REPORT_PERIOD_SECONDS = ConfigProperty
+ .key(CLOUDWATCH_PREFIX + ".report.period.seconds")
+ .defaultValue(60)
+ .sinceVersion("0.10.0")
+ .withDocumentation("Reporting interval in seconds");
+
+ public static final ConfigProperty METRIC_PREFIX = ConfigProperty
+ .key(CLOUDWATCH_PREFIX + ".metric.prefix")
+ .defaultValue("")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Metric prefix of reporter");
+
+ public static final ConfigProperty METRIC_NAMESPACE = ConfigProperty
+ .key(CLOUDWATCH_PREFIX + ".namespace")
+ .defaultValue("Hudi")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Namespace of reporter");
+ /*
+ Amazon CloudWatch allows a maximum of 20 metrics per request. Choosing this as the default maximum.
+ Reference: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html
+ */
+ public static final ConfigProperty MAX_DATUMS_PER_REQUEST =
+ ConfigProperty.key(CLOUDWATCH_PREFIX + ".maxDatumsPerRequest")
+ .defaultValue(20)
+ .sinceVersion("0.10.0")
+ .withDocumentation("Max number of Datums per request");
+
+ public HoodieMetricsCloudWatchConfig() {
+ super();
+ }
+
+ public static HoodieMetricsCloudWatchConfig.Builder newBuilder() {
+ return new HoodieMetricsCloudWatchConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private HoodieMetricsCloudWatchConfig hoodieMetricsCloudWatchConfig = new HoodieMetricsCloudWatchConfig();
+
+ public HoodieMetricsCloudWatchConfig.Builder fromProperties(Properties props) {
+ this.hoodieMetricsCloudWatchConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieMetricsCloudWatchConfig build() {
+ hoodieMetricsCloudWatchConfig.setDefaults(HoodieMetricsCloudWatchConfig.class.getName());
+ return hoodieMetricsCloudWatchConfig;
+ }
+ }
+}
diff --git a/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java
new file mode 100644
index 000000000000..85f551e6fda8
--- /dev/null
+++ b/hudi-aws/src/test/java/org/apache/hudi/aws/cloudwatch/TestCloudWatchReporter.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.cloudwatch;
+
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
+import com.amazonaws.services.cloudwatch.model.Dimension;
+import com.amazonaws.services.cloudwatch.model.MetricDatum;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.aws.cloudwatch.CloudWatchReporter.DIMENSION_COUNT_TYPE_VALUE;
+import static org.apache.hudi.aws.cloudwatch.CloudWatchReporter.DIMENSION_GAUGE_TYPE_VALUE;
+import static org.apache.hudi.aws.cloudwatch.CloudWatchReporter.DIMENSION_METRIC_TYPE_KEY;
+import static org.apache.hudi.aws.cloudwatch.CloudWatchReporter.DIMENSION_TABLE_NAME_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(MockitoExtension.class)
+public class TestCloudWatchReporter {
+
+ private static final String NAMESPACE = "Hudi Test";
+ private static final String PREFIX = "testPrefix";
+ private static final String TABLE_NAME = "testTable";
+ private static final int MAX_DATUMS_PER_REQUEST = 2;
+
+ @Mock
+ MetricRegistry metricRegistry;
+
+ @Mock
+ AmazonCloudWatchAsync cloudWatchAsync;
+
+ @Mock
+ CompletableFuture cloudWatchFuture;
+
+ @Captor
+ ArgumentCaptor putMetricDataRequestCaptor;
+
+ CloudWatchReporter reporter;
+
+ @BeforeEach
+ public void setup() {
+ reporter = CloudWatchReporter.forRegistry(metricRegistry)
+ .namespace(NAMESPACE)
+ .prefixedWith(PREFIX)
+ .maxDatumsPerRequest(MAX_DATUMS_PER_REQUEST)
+ .withClock(Clock.defaultClock())
+ .filter(MetricFilter.ALL)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build(cloudWatchAsync);
+
+ Mockito.when(cloudWatchAsync.putMetricDataAsync(ArgumentMatchers.any())).thenReturn(cloudWatchFuture);
+ }
+
+ @Test
+ public void testReporter() {
+ SortedMap gauges = new TreeMap<>();
+ Gauge gauge1 = () -> 100L;
+ Gauge gauge2 = () -> 100.1;
+ gauges.put(TABLE_NAME + ".gauge1", gauge1);
+ gauges.put(TABLE_NAME + ".gauge2", gauge2);
+
+ SortedMap counters = new TreeMap<>();
+ Counter counter1 = new Counter();
+ counter1.inc(200);
+ counters.put(TABLE_NAME + ".counter1", counter1);
+
+ SortedMap histograms = new TreeMap<>();
+ Histogram histogram1 = new Histogram(new ExponentiallyDecayingReservoir());
+ histogram1.update(300);
+ histograms.put(TABLE_NAME + ".histogram1", histogram1);
+
+ SortedMap meters = new TreeMap<>();
+ Meter meter1 = new Meter();
+ meter1.mark(400);
+ meters.put(TABLE_NAME + ".meter1", meter1);
+
+ SortedMap timers = new TreeMap<>();
+ Timer timer1 = new Timer();
+ timer1.update(100, TimeUnit.SECONDS);
+ timers.put(TABLE_NAME + ".timer1", timer1);
+
+ Mockito.when(metricRegistry.getGauges(MetricFilter.ALL)).thenReturn(gauges);
+ Mockito.when(metricRegistry.getCounters(MetricFilter.ALL)).thenReturn(counters);
+ Mockito.when(metricRegistry.getHistograms(MetricFilter.ALL)).thenReturn(histograms);
+ Mockito.when(metricRegistry.getMeters(MetricFilter.ALL)).thenReturn(meters);
+ Mockito.when(metricRegistry.getTimers(MetricFilter.ALL)).thenReturn(timers);
+
+ reporter.report();
+
+ // Since there are 6 metrics in total, and max datums per request is 2 we would expect 3 calls to CloudWatch
+ // with 2 datums in each
+ Mockito.verify(cloudWatchAsync, Mockito.times(3)).putMetricDataAsync(putMetricDataRequestCaptor.capture());
+ Assertions.assertEquals(NAMESPACE, putMetricDataRequestCaptor.getValue().getNamespace());
+
+ List putMetricDataRequests = putMetricDataRequestCaptor.getAllValues();
+ putMetricDataRequests.forEach(request -> assertEquals(2, request.getMetricData().size()));
+
+ List metricDataBatch1 = putMetricDataRequests.get(0).getMetricData();
+ assertEquals(PREFIX + ".gauge1", metricDataBatch1.get(0).getMetricName());
+ assertEquals(Double.valueOf(gauge1.getValue()), metricDataBatch1.get(0).getValue());
+ assertDimensions(metricDataBatch1.get(0).getDimensions(), DIMENSION_GAUGE_TYPE_VALUE);
+
+ assertEquals(PREFIX + ".gauge2", metricDataBatch1.get(1).getMetricName());
+ assertEquals(gauge2.getValue(), metricDataBatch1.get(1).getValue());
+ assertDimensions(metricDataBatch1.get(1).getDimensions(), DIMENSION_GAUGE_TYPE_VALUE);
+
+ List metricDataBatch2 = putMetricDataRequests.get(1).getMetricData();
+ assertEquals(PREFIX + ".counter1", metricDataBatch2.get(0).getMetricName());
+ assertEquals(counter1.getCount(), metricDataBatch2.get(0).getValue().longValue());
+ assertDimensions(metricDataBatch2.get(0).getDimensions(), DIMENSION_COUNT_TYPE_VALUE);
+
+ assertEquals(PREFIX + ".histogram1", metricDataBatch2.get(1).getMetricName());
+ assertEquals(histogram1.getCount(), metricDataBatch2.get(1).getValue().longValue());
+ assertDimensions(metricDataBatch2.get(1).getDimensions(), DIMENSION_COUNT_TYPE_VALUE);
+
+ List metricDataBatch3 = putMetricDataRequests.get(2).getMetricData();
+ assertEquals(PREFIX + ".meter1", metricDataBatch3.get(0).getMetricName());
+ assertEquals(meter1.getCount(), metricDataBatch3.get(0).getValue().longValue());
+ assertDimensions(metricDataBatch3.get(0).getDimensions(), DIMENSION_COUNT_TYPE_VALUE);
+
+ assertEquals(PREFIX + ".timer1", metricDataBatch3.get(1).getMetricName());
+ assertEquals(timer1.getCount(), metricDataBatch3.get(1).getValue().longValue());
+ assertDimensions(metricDataBatch3.get(1).getDimensions(), DIMENSION_COUNT_TYPE_VALUE);
+
+ reporter.stop();
+ Mockito.verify(cloudWatchAsync).shutdown();
+ }
+
+ private void assertDimensions(List actualDimensions, String metricTypeDimensionVal) {
+ assertEquals(2, actualDimensions.size());
+
+ Dimension expectedTableNameDimension = new Dimension()
+ .withName(DIMENSION_TABLE_NAME_KEY)
+ .withValue(TABLE_NAME);
+ Dimension expectedMetricTypeDimension = new Dimension()
+ .withName(DIMENSION_METRIC_TYPE_KEY)
+ .withValue(metricTypeDimensionVal);
+
+ assertEquals(expectedTableNameDimension, actualDimensions.get(0));
+ assertEquals(expectedMetricTypeDimension, actualDimensions.get(1));
+ }
+}
\ No newline at end of file
diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index c67621fbb9a3..d30ee298b182 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -36,6 +36,11 @@
hudi-common
${project.version}
+
+ org.apache.hudi
+ hudi-aws
+ ${project.version}
+
org.apache.hudi
hudi-timeline-service
@@ -94,7 +99,6 @@
io.prometheus
simpleclient_pushgateway
-
org.apache.hudi
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7988e9307522..21a5dbd42b42 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1562,6 +1562,22 @@ public List getDatadogMetricTags() {
HoodieMetricsDatadogConfig.METRIC_TAG_VALUES, ",").split("\\s*,\\s*")).collect(Collectors.toList());
}
+ public int getCloudWatchReportPeriodSeconds() {
+ return getInt(HoodieMetricsCloudWatchConfig.REPORT_PERIOD_SECONDS);
+ }
+
+ public String getCloudWatchMetricPrefix() {
+ return getString(HoodieMetricsCloudWatchConfig.METRIC_PREFIX);
+ }
+
+ public String getCloudWatchMetricNamespace() {
+ return getString(HoodieMetricsCloudWatchConfig.METRIC_NAMESPACE);
+ }
+
+ public int getCloudWatchMaxDatumsPerRequest() {
+ return getInt(HoodieMetricsCloudWatchConfig.MAX_DATUMS_PER_REQUEST);
+ }
+
public String getMetricReporterClassName() {
return getString(HoodieMetricsConfig.METRICS_REPORTER_CLASS_NAME);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 60369d72fb4f..ca386fb901b4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -22,6 +22,7 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.config.HoodieMetricsCloudWatchConfig;
import org.apache.hudi.metrics.MetricsReporterType;
import javax.annotation.concurrent.Immutable;
@@ -165,6 +166,8 @@ public HoodieMetricsConfig build() {
HoodieMetricsJmxConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.GRAPHITE,
HoodieMetricsGraphiteConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
+ hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.CLOUDWATCH,
+ HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
return hoodieMetricsConfig;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 24b34244e3aa..4a82eb92d872 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -241,6 +241,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
case PROMETHEUS_PUSHGATEWAY:
case CONSOLE:
case INMEMORY:
+ case CLOUDWATCH:
break;
default:
throw new HoodieMetadataException("Unsupported Metrics Reporter type " + writeConfig.getMetricsReporterType());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index 820c1a3e8887..dc9e80431b8d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -22,6 +22,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter;
import org.apache.hudi.metrics.datadog.DatadogMetricsReporter;
import com.codahale.metrics.MetricRegistry;
@@ -77,6 +78,9 @@ public static MetricsReporter createReporter(HoodieWriteConfig config, MetricReg
case CONSOLE:
reporter = new ConsoleMetricsReporter(registry);
break;
+ case CLOUDWATCH:
+ reporter = new CloudWatchMetricsReporter(config, registry);
+ break;
default:
LOG.error("Reporter type[" + type + "] is not supported.");
break;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
index 36b15a89ac88..3c8600159287 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
@@ -22,5 +22,5 @@
* Types of the reporter supported, hudi also supports user defined reporter.
*/
public enum MetricsReporterType {
- GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS
+ GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java
new file mode 100644
index 000000000000..c7134a10360f
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/cloudwatch/CloudWatchMetricsReporter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.cloudwatch;
+
+import org.apache.hudi.aws.cloudwatch.CloudWatchReporter;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.MetricsReporter;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Hudi Amazon CloudWatch metrics reporter. Responsible for reading Hoodie metrics configurations and hooking up with
+ * {@link org.apache.hudi.metrics.Metrics}. Internally delegates reporting tasks to {@link CloudWatchReporter}.
+ */
+public class CloudWatchMetricsReporter extends MetricsReporter {
+
+ private static final Logger LOG = LogManager.getLogger(CloudWatchMetricsReporter.class);
+
+ private final MetricRegistry registry;
+ private final HoodieWriteConfig config;
+ private final CloudWatchReporter reporter;
+
+ public CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
+ this.config = config;
+ this.registry = registry;
+ this.reporter = createCloudWatchReporter();
+ }
+
+ CloudWatchMetricsReporter(HoodieWriteConfig config, MetricRegistry registry, CloudWatchReporter reporter) {
+ this.config = config;
+ this.registry = registry;
+ this.reporter = reporter;
+ }
+
+ private CloudWatchReporter createCloudWatchReporter() {
+ return CloudWatchReporter.forRegistry(registry)
+ .prefixedWith(config.getCloudWatchMetricPrefix())
+ .namespace(config.getCloudWatchMetricNamespace())
+ .maxDatumsPerRequest(config.getCloudWatchMaxDatumsPerRequest())
+ .build(config.getProps());
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting CloudWatch Metrics Reporter.");
+ reporter.start(config.getCloudWatchReportPeriodSeconds(), TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void report() {
+ reporter.report();
+ }
+
+ @Override
+ public Closeable getReporter() {
+ return reporter;
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stopping CloudWatch Metrics Reporter.");
+ reporter.stop();
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java
new file mode 100644
index 000000000000..7901d8024651
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/cloudwatch/TestCloudWatchMetricsReporter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.cloudwatch;
+
+import org.apache.hudi.aws.cloudwatch.CloudWatchReporter;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import com.codahale.metrics.MetricRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestCloudWatchMetricsReporter {
+
+ @Mock
+ private HoodieWriteConfig config;
+
+ @Mock
+ private MetricRegistry registry;
+
+ @Mock
+ private CloudWatchReporter reporter;
+
+ @Test
+ public void testReporter() {
+ when(config.getCloudWatchReportPeriodSeconds()).thenReturn(30);
+ CloudWatchMetricsReporter metricsReporter = new CloudWatchMetricsReporter(config, registry, reporter);
+
+ metricsReporter.start();
+ verify(reporter, times(1)).start(30, TimeUnit.SECONDS);
+
+ metricsReporter.report();
+ verify(reporter, times(1)).report();
+
+ metricsReporter.stop();
+ verify(reporter, times(1)).stop();
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
index 18cd8042763b..fef00389d8c5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
@@ -32,7 +32,7 @@ public enum Names {
METRICS("Metrics Configs"),
RECORD_PAYLOAD("Record Payload Config"),
KAFKA_CONNECT("Kafka Connect Configs"),
- AWS_DYNAMO_DB("aws-dynamo-db");
+ AWS("Amazon Web Services Configs");
public final String name;