Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support amazon managed service for prometheus with sigv4 authentication #2225

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ project(':cruise-control') {
implementation 'com.google.code.findbugs:jsr305:3.0.2'
// Temporary pin for vulnerability
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'
// AWS SDK for signing and region management
implementation "software.amazon.awssdk:auth:${awsSdkVersion}"
implementation "software.amazon.awssdk:regions:${awsSdkVersion}"
implementation "software.amazon.awssdk:sts:${awsSdkVersion}"

// OkHttp for HTTP requests
implementation 'com.squareup.okhttp3:okhttp:4.11.0'

api "io.vertx:vertx-web-openapi:${vertxVersion}"
api "io.vertx:vertx-core:${vertxVersion}"
api "io.vertx:vertx-web:${vertxVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

package com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus;

import static com.linkedin.cruisecontrol.common.utils.Utils.*;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.*;

import com.google.gson.Gson;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.model.PrometheusQueryResult;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.model.PrometheusResponse;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -13,20 +19,20 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import com.google.gson.Gson;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.model.PrometheusQueryResult;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.model.PrometheusResponse;

import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.SEC_TO_MS;
import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;

/**
* This class provides an adapter to make queries to a Prometheus Server to fetch metric values.
Expand All @@ -39,17 +45,23 @@ class PrometheusAdapter {
private static final String START = "start";
private static final String END = "end";
private static final String STEP = "step";
private static final String SERVICE_NAME = "aps";

private final CloseableHttpClient _httpClient;
protected final HttpHost _prometheusEndpoint;
protected final URIBuilder _prometheusEndpoint;
protected final int _samplingIntervalMs;
private final String _region;
private final Boolean _use_sigv4;

PrometheusAdapter(CloseableHttpClient httpClient,
HttpHost prometheusEndpoint,
int samplingIntervalMs) {
URIBuilder prometheusEndpoint,
int samplingIntervalMs,
String region) {
_httpClient = validateNotNull(httpClient, "httpClient cannot be null.");
_prometheusEndpoint = validateNotNull(prometheusEndpoint, "prometheusEndpoint cannot be null.");
_samplingIntervalMs = samplingIntervalMs;
_region = region;
_use_sigv4 = region != null;
}

public int samplingIntervalMs() {
Expand All @@ -59,8 +71,7 @@ public int samplingIntervalMs() {
public List<PrometheusQueryResult> queryMetric(String queryString,
long startTimeMs,
long endTimeMs) throws IOException {
URI queryUri = URI.create(_prometheusEndpoint.toURI() + QUERY_RANGE_API_PATH);
HttpPost httpPost = new HttpPost(queryUri);


List<NameValuePair> data = new ArrayList<>();
data.add(new BasicNameValuePair(QUERY, queryString));
Expand All @@ -73,7 +84,15 @@ They accept values with a decimal point (up to 64 bits). The samples returned ar
// step is expected to be in seconds, and accept values with a decimal point (up to 64 bits).
data.add(new BasicNameValuePair(STEP, String.valueOf((double) _samplingIntervalMs / SEC_TO_MS)));

httpPost.setEntity(new UrlEncodedFormEntity(data));
String queryParams = URLEncodedUtils.format(data, StandardCharsets.UTF_8);
URI queryUri = URI.create(_prometheusEndpoint.toString() + QUERY_RANGE_API_PATH + "?" + queryParams);
HttpPost httpPost = new HttpPost(queryUri);

// Sign the request if SigV4 is enabled
if (_use_sigv4) {
signRequest(httpPost, queryUri);
}

try (CloseableHttpResponse response = _httpClient.execute(httpPost)) {
int responseCode = response.getStatusLine().getStatusCode();
HttpEntity entity = response.getEntity();
Expand Down Expand Up @@ -103,4 +122,28 @@ They accept values with a decimal point (up to 64 bits). The samples returned ar
return prometheusResponse.data().result();
}
}

private void signRequest(HttpPost httpPost, URI queryUri) throws IOException {
// Get AWS credentials
DefaultCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
AwsCredentials credentials = credentialsProvider.resolveCredentials();

// Create the HTTP request
SdkHttpFullRequest.Builder httpRequest = SdkHttpFullRequest.builder()
.uri(queryUri)
.method(SdkHttpMethod.POST)
.putHeader("Content-Type", "application/text");

AwsV4HttpSigner signer = AwsV4HttpSigner.create();

SignedRequest signedRequest =
signer.sign(r -> r.identity(credentials)
.request(httpRequest.build())
.putProperty(AwsV4HttpSigner.SERVICE_SIGNING_NAME, SERVICE_NAME)
.putProperty(AwsV4HttpSigner.REGION_NAME, _region));

signedRequest.request().headers().forEach((key, values) ->
httpPost.setHeader(key, values.get(0)));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
package com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.common.Cluster;
Expand Down Expand Up @@ -61,6 +62,9 @@ public class PrometheusMetricSampler extends AbstractMetricSampler {
static final String PROMETHEUS_QUERY_SUPPLIER_CONFIG = "prometheus.query.supplier";
private static final Class<?> DEFAULT_PROMETHEUS_QUERY_SUPPLIER = DefaultPrometheusQuerySupplier.class;

// Add new config constant
static final String PROMETHEUS_SIGV4_REGION_CONFIG = "prometheus.sigv4.region";

private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricSampler.class);

protected int _samplingIntervalMs;
Expand Down Expand Up @@ -106,16 +110,18 @@ private void configurePrometheusAdapter(Map<String, ?> configs) {
}

try {
HttpHost host = HttpHost.create(endpoint);
if (host.getPort() < 0) {
throw new IllegalArgumentException();
}
String endpointWithScheme = endpoint.startsWith("http") ? endpoint : "http://" + endpoint;
URIBuilder uriBuilder = new URIBuilder(endpointWithScheme);
_httpClient = HttpClients.createDefault();
_prometheusAdapter = new PrometheusAdapter(_httpClient, host, _samplingIntervalMs);
} catch (IllegalArgumentException ex) {

// Get region config
String region = (String) configs.get(PROMETHEUS_SIGV4_REGION_CONFIG);

_prometheusAdapter = new PrometheusAdapter(_httpClient, uriBuilder, _samplingIntervalMs, region);
} catch (URISyntaxException ex) {
throw new ConfigException(
String.format("Prometheus endpoint URI is malformed, "
+ "expected schema://host:port, provided %s", endpoint), ex);
+ "expected schema://host:port[/path], provided %s", endpoint), ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.localserver.LocalServerTestBase;
import org.apache.http.protocol.HttpContext;
Expand Down Expand Up @@ -44,9 +44,9 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte
}
});

HttpHost httpHost = this.start();
URIBuilder httpHost = new URIBuilder(this.start().toURI());
PrometheusAdapter prometheusAdapter
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS);
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS, null);
final List<PrometheusQueryResult> prometheusQueryResults = prometheusAdapter.queryMetric(
"kafka_server_BrokerTopicMetrics_OneMinuteRate{name=\"BytesOutPerSec\",topic=\"\"}",
START_TIME_MS, END_TIME_MS);
Expand Down Expand Up @@ -136,9 +136,9 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte
}
});

HttpHost httpHost = this.start();
URIBuilder httpHost = new URIBuilder(this.start().toURI());
PrometheusAdapter prometheusAdapter
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS);
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS, null);

prometheusAdapter.queryMetric(
"kafka_server_BrokerTopicMetrics_OneMinuteRate{name=\"BytesOutPerSec\",topic=\"\"}",
Expand All @@ -156,9 +156,9 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte
}
});

HttpHost httpHost = this.start();
URIBuilder httpHost = new URIBuilder(this.start().toURI());
PrometheusAdapter prometheusAdapter
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS);
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS, null);

prometheusAdapter.queryMetric(
"kafka_server_BrokerTopicMetrics_OneMinuteRate{name=\"BytesOutPerSec\",topic=\"\"}",
Expand All @@ -176,9 +176,9 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte
}
});

HttpHost httpHost = this.start();
URIBuilder httpHost = new URIBuilder(this.start().toURI());
PrometheusAdapter prometheusAdapter
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS);
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS, null);

prometheusAdapter.queryMetric(
"kafka_server_BrokerTopicMetrics_OneMinuteRate{name=\"BytesOutPerSec\",topic=\"\"}",
Expand All @@ -196,9 +196,9 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte
}
});

HttpHost httpHost = this.start();
URIBuilder httpHost = new URIBuilder(this.start().toURI());
PrometheusAdapter prometheusAdapter
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS);
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS, null);

prometheusAdapter.queryMetric(
"kafka_server_BrokerTopicMetrics_OneMinuteRate{name=\"BytesOutPerSec\",topic=\"\"}",
Expand All @@ -216,9 +216,9 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte
}
});

HttpHost httpHost = this.start();
URIBuilder httpHost = new URIBuilder(this.start().toURI());
PrometheusAdapter prometheusAdapter
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS);
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS, null);

prometheusAdapter.queryMetric(
"kafka_server_BrokerTopicMetrics_OneMinuteRate{name=\"BytesOutPerSec\",topic=\"\"}",
Expand All @@ -236,9 +236,9 @@ public void handle(HttpRequest request, HttpResponse response, HttpContext conte
}
});

HttpHost httpHost = this.start();
URIBuilder httpHost = new URIBuilder(this.start().toURI());
PrometheusAdapter prometheusAdapter
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS);
= new PrometheusAdapter(this.httpclient, httpHost, SAMPLING_INTERVAL_MS, null);

prometheusAdapter.queryMetric(
"kafka_server_BrokerTopicMetrics_OneMinuteRate{name=\"BytesOutPerSec\",topic=\"\"}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,26 @@ public void setUp() {
_prometheusQueryMap = prometheusQuerySupplier.get();
}

@Test(expected = ConfigException.class)
@Test
public void testConfigureWithPrometheusEndpointNoPortFails() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "http://kafka-cluster-1.org");
addCapacityConfig(config);
_prometheusMetricSampler.configure(config);
}

@Test(expected = ConfigException.class)
public void testConfigureWithPrometheusEndpointNegativePortFails() throws Exception {
@Test
public void testConfigureWithPrometheusEndpointNoSchemaDoesNotFail() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "http://kafka-cluster-1.org:-20");
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "kafka-cluster-1.org:9090");
addCapacityConfig(config);
_prometheusMetricSampler.configure(config);
}

@Test
public void testConfigureWithPrometheusEndpointNoSchemaDoesNotFail() throws Exception {
public void testConfigureWithPrometheusEndpointWithPathDoesNotFail() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "kafka-cluster-1.org:9090");
config.put(PROMETHEUS_SERVER_ENDPOINT_CONFIG, "https://kafka-cluster-1.org/path/to/query");
addCapacityConfig(config);
_prometheusMetricSampler.configure(config);
}
Expand Down
2 changes: 1 addition & 1 deletion docs/wiki/User Guide/Configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ We are still trying to improve cruise control. And following are some configurat
### PrometheusMetricSampler configurations
| Name | Type | Required? | Default Value | Description |
|------------------------------------------------------|---------|-----------|---------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prometheus.server.endpoint | String | Y | | The HTTP endpoint of the Prometheus server which is to be used as a source for sampling metrics. |
| prometheus.server.endpoint | String | Y | | The HTTP endpoint of the Prometheus server which is to be used as a source for sampling metrics. The endpoint format is `[scheme://]host[:port][/path/to/query]`, the default value of scheme is http if you don't have. |
| prometheus.query.resolution.step.ms | Integer | N | 60,000 | The resolution of the Prometheus query made to the server. If this is set to 30 seconds for a 2 minutes query interval, the query returns with 4 values, which are then aggregated into the metric sample. |
| prometheus.query.supplier | Class | N | com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.DefaultPrometheusQuerySupplier | The class that supplies the Prometheus queries corresponding to Kafka raw metrics. If there are no customizations done when configuring Prometheus node exporter, the default class should work fine. |
| prometheus.broker.metrics.scraping.frequency.seconds | Integer | N | 60 | The scraping frequency with which Prometheus scrapes metrics from Kafka brokers. This value is used by DefaultPrometheusQuerySupplier to construct the iRate query that is used to get broker cpu metrics. |
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ zookeeperVersion=3.9.3
nettyVersion=4.1.114.Final
jettyVersion=9.4.56.v20240826
vertxVersion=4.5.8
awsSdkVersion=2.29.23