Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/prometheus.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Property Name Description
``prometheus.query.chunk.size.duration`` The duration of each query to Prometheus
``prometheus.max.query.range.duration`` Width of overall query to Prometheus, will be divided into query-chunk-size-duration queries
``prometheus.cache.ttl`` How long values from this config file are cached
``prometheus.auth.user`` Username for basic authentication
``prometheus.auth.password`` Password for basic authentication
``prometheus.bearer.token.file`` File holding bearer token if needed for access to Prometheus
``prometheus.read-timeout`` How much time a query to Prometheus has before timing out
======================================== ============================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import okhttp3.Credentials;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.OkHttpClient.Builder;
import okhttp3.Request;
Expand All @@ -39,8 +41,10 @@
import java.util.Set;
import java.util.function.Supplier;

import static com.google.common.net.HttpHeaders.AUTHORIZATION;
import static io.trino.plugin.prometheus.PrometheusErrorCode.PROMETHEUS_TABLES_METRICS_RETRIEVE_ERROR;
import static io.trino.plugin.prometheus.PrometheusErrorCode.PROMETHEUS_UNKNOWN_ERROR;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static io.trino.spi.type.TimestampWithTimeZoneType.createTimestampWithTimeZoneType;
import static io.trino.spi.type.TypeSignature.mapType;
import static io.trino.spi.type.VarcharType.VARCHAR;
Expand All @@ -55,7 +59,6 @@ public class PrometheusClient
static final String METRICS_ENDPOINT = "/api/v1/label/__name__/values";

private final OkHttpClient httpClient;
private final Optional<File> bearerTokenFile;
private final Supplier<Map<String, Object>> tableSupplier;
private final Type varcharMapType;

Expand All @@ -66,9 +69,11 @@ public PrometheusClient(PrometheusConnectorConfig config, JsonCodec<Map<String,
requireNonNull(metricCodec, "metricCodec is null");
requireNonNull(typeManager, "typeManager is null");

httpClient = new Builder().readTimeout(Duration.ofMillis(config.getReadTimeout().toMillis())).build();
Builder clientBuilder = new Builder().readTimeout(Duration.ofMillis(config.getReadTimeout().toMillis()));
setupBasicAuth(clientBuilder, config.getUser(), config.getPassword());
setupTokenAuth(clientBuilder, getBearerAuthInfoFromFile(config.getBearerTokenFile()));
this.httpClient = clientBuilder.build();

bearerTokenFile = config.getBearerTokenFile();
URI prometheusMetricsUri = getPrometheusMetricsURI(config.getPrometheusURI());
tableSupplier = Suppliers.memoizeWithExpiration(
() -> fetchMetrics(metricCodec, prometheusMetricsUri),
Expand Down Expand Up @@ -137,8 +142,6 @@ private Map<String, Object> fetchMetrics(JsonCodec<Map<String, Object>> metricsC
public byte[] fetchUri(URI uri)
{
Request.Builder requestBuilder = new Request.Builder().url(uri.toString());
getBearerAuthInfoFromFile().ifPresent(bearerToken -> requestBuilder.header("Authorization", "Bearer " + bearerToken));

Response response;
try {
response = httpClient.newCall(requestBuilder.build()).execute();
Expand All @@ -150,10 +153,10 @@ public byte[] fetchUri(URI uri)
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "Error reading metrics", e);
}

throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "Bad response " + response.code() + response.message());
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "Bad response " + response.code() + " " + response.message());
}

private Optional<String> getBearerAuthInfoFromFile()
private Optional<String> getBearerAuthInfoFromFile(Optional<File> bearerTokenFile)
{
return bearerTokenFile.map(tokenFileName -> {
try {
Expand All @@ -164,4 +167,38 @@ private Optional<String> getBearerAuthInfoFromFile()
}
});
}

private static void setupBasicAuth(OkHttpClient.Builder clientBuilder, Optional<String> user, Optional<String> password)
{
if (user.isPresent() && password.isPresent()) {
clientBuilder.addInterceptor(basicAuth(user.get(), password.get()));
}
}

private static void setupTokenAuth(OkHttpClient.Builder clientBuilder, Optional<String> accessToken)
{
accessToken.ifPresent(token -> clientBuilder.addInterceptor(tokenAuth(token)));
}

private static Interceptor basicAuth(String user, String password)
{
requireNonNull(user, "user is null");
requireNonNull(password, "password is null");
if (user.contains(":")) {
throw new TrinoException(GENERIC_USER_ERROR, "Illegal character ':' found in username");
}

String credential = Credentials.basic(user, password);
return chain -> chain.proceed(chain.request().newBuilder()
.header(AUTHORIZATION, credential)
.build());
}

private static Interceptor tokenAuth(String accessToken)
{
requireNonNull(accessToken, "accessToken is null");
return chain -> chain.proceed(chain.request().newBuilder()
.addHeader(AUTHORIZATION, "Bearer " + accessToken)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.inject.spi.Message;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.ConfigSecuritySensitive;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

Expand All @@ -37,6 +38,8 @@ public class PrometheusConnectorConfig
private Duration cacheDuration = new Duration(30, TimeUnit.SECONDS);
private Duration readTimeout = new Duration(10, TimeUnit.SECONDS);
private File bearerTokenFile;
private String user;
private String password;

@NotNull
public URI getPrometheusURI()
Expand Down Expand Up @@ -107,6 +110,33 @@ public PrometheusConnectorConfig setBearerTokenFile(File bearerTokenFile)
return this;
}

@NotNull
public Optional<String> getUser()
{
return Optional.ofNullable(user);
}

@Config("prometheus.auth.user")
public PrometheusConnectorConfig setUser(String user)
{
this.user = user;
return this;
}

@NotNull
public Optional<String> getPassword()
{
return Optional.ofNullable(password);
}

@Config("prometheus.auth.password")
@ConfigSecuritySensitive
public PrometheusConnectorConfig setPassword(String password)
Comment thread
ebyhr marked this conversation as resolved.
Outdated
{
this.password = password;
return this;
}

@MinDuration("1s")
public Duration getReadTimeout()
{
Expand All @@ -129,5 +159,11 @@ public void checkConfig()
if (maxQueryRangeDuration < queryChunkSizeDuration) {
throw new ConfigurationException(ImmutableList.of(new Message("prometheus.max.query.range.duration must be greater than prometheus.query.chunk.size.duration")));
}
if (getBearerTokenFile().isPresent() && (getUser().isPresent() || getPassword().isPresent())) {
Comment thread
ebyhr marked this conversation as resolved.
Outdated
throw new IllegalStateException("Either on of bearer token file or basic authentication should be used");
}
if (getUser().isPresent() ^ getPassword().isPresent()) {
throw new IllegalStateException("Both username and password must be set when using basic authentication");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.Session;
import io.trino.testing.DistributedQueryRunner;

import java.util.HashMap;
import java.util.Map;

import static io.airlift.testing.Closeables.closeAllSuppress;
Expand All @@ -32,17 +33,17 @@ public final class PrometheusQueryRunner
{
private PrometheusQueryRunner() {}

public static DistributedQueryRunner createPrometheusQueryRunner(PrometheusServer server, Map<String, String> extraProperties)
public static DistributedQueryRunner createPrometheusQueryRunner(PrometheusServer server, Map<String, String> extraProperties, Map<String, String> connectorProperties)
throws Exception
{
DistributedQueryRunner queryRunner = null;
try {
queryRunner = DistributedQueryRunner.builder(createSession()).setExtraProperties(extraProperties).build();

queryRunner.installPlugin(new PrometheusPlugin());
Map<String, String> properties = ImmutableMap.of(
"prometheus.uri", server.getUri().toString());
queryRunner.createCatalog("prometheus", "prometheus", properties);
connectorProperties = new HashMap<>(ImmutableMap.copyOf(connectorProperties));
connectorProperties.putIfAbsent("prometheus.uri", server.getUri().toString());
queryRunner.createCatalog("prometheus", "prometheus", connectorProperties);
return queryRunner;
}
catch (Throwable e) {
Expand Down Expand Up @@ -73,7 +74,7 @@ public static PrometheusClient createPrometheusClient(PrometheusServer server)
public static void main(String[] args)
throws Exception
{
DistributedQueryRunner queryRunner = createPrometheusQueryRunner(new PrometheusServer(), ImmutableMap.of("http-server.http.port", "8080"));
DistributedQueryRunner queryRunner = createPrometheusQueryRunner(new PrometheusServer(), ImmutableMap.of("http-server.http.port", "8080"), ImmutableMap.of());
Thread.sleep(10);
Logger log = Logger.get(PrometheusQueryRunner.class);
log.info("======== SERVER STARTED ========");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,41 @@
import java.net.URI;
import java.time.Duration;

import static org.testcontainers.utility.MountableFile.forClasspathResource;
import static org.testng.Assert.fail;

public class PrometheusServer
implements Closeable
{
private static final int PROMETHEUS_PORT = 9090;
private static final String PROMETHEUS_DOCKER_IMAGE = "prom/prometheus:v2.15.1";
private static final String DEFAULT_VERSION = "v2.15.1";
public static final String LATEST_VERSION = "v2.35.0";
private static final Integer MAX_TRIES = 120;
private static final Integer TIME_BETWEEN_TRIES_MILLIS = 1000;

public static final String USER = "admin";
public static final String PASSWORD = "password";

private final GenericContainer<?> dockerContainer;

public PrometheusServer()
{
this.dockerContainer = new GenericContainer<>(PROMETHEUS_DOCKER_IMAGE)
this(DEFAULT_VERSION, false);
}

public PrometheusServer(String version, boolean enableBasicAuth)
{
this.dockerContainer = new GenericContainer<>("prom/prometheus:" + version)
.withExposedPorts(PROMETHEUS_PORT)
.waitingFor(Wait.forHttp("/"))
.withStartupTimeout(Duration.ofSeconds(120));
// Basic authentication was introduced in v2.24.0
if (enableBasicAuth) {
this.dockerContainer
.withCommand("--config.file=/etc/prometheus/prometheus.yml", "--web.config.file=/etc/prometheus/web.yml")
.withCopyFileToContainer(forClasspathResource("web.yml"), "/etc/prometheus/web.yml")
.waitingFor(Wait.forHttp("/").withBasicCredentials(USER, PASSWORD));
}
this.dockerContainer.start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.prometheus;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.testng.annotations.Test;

import static io.trino.plugin.prometheus.MetadataUtil.METRIC_CODEC;
import static io.trino.plugin.prometheus.PrometheusQueryRunner.createPrometheusQueryRunner;
import static io.trino.plugin.prometheus.PrometheusServer.LATEST_VERSION;
import static io.trino.plugin.prometheus.PrometheusServer.PASSWORD;
import static io.trino.plugin.prometheus.PrometheusServer.USER;
import static io.trino.testing.assertions.Assert.assertEventually;
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestPrometheusBasicAuth
extends AbstractTestQueryFramework
{
private PrometheusServer server;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
server = closeAfterClass(new PrometheusServer(LATEST_VERSION, true));
return createPrometheusQueryRunner(server, ImmutableMap.of(), ImmutableMap.of("prometheus.auth.user", USER, "prometheus.auth.password", PASSWORD));
}

@Test
public void testSelect()
Comment thread
ebyhr marked this conversation as resolved.
Outdated
{
assertEventually(
new Duration(1, MINUTES),
new Duration(1, SECONDS),
() -> assertQuery("SHOW TABLES IN prometheus.default LIKE 'up'", "VALUES 'up'"));

assertQuery("SELECT labels['job'] FROM prometheus.default.up LIMIT 1", "VALUES 'prometheus'");
}

@Test
public void testInvalidCredential()
{
PrometheusConnectorConfig config = new PrometheusConnectorConfig();
config.setPrometheusURI(server.getUri());
config.setUser("invalid-user");
config.setPassword("invalid-password");
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TESTING_TYPE_MANAGER);
assertThatThrownBy(() -> client.getTableNames("default"))
.hasMessage("Bad response 401 Unauthorized");
}

@Test
public void testMissingCredential()
{
PrometheusConnectorConfig config = new PrometheusConnectorConfig();
config.setPrometheusURI(server.getUri());
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TESTING_TYPE_MANAGER);
assertThatThrownBy(() -> client.getTableNames("default"))
.hasMessage("Bad response 401 Unauthorized");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public void testDefaults()
.setMaxQueryRangeDuration(new Duration(21, DAYS))
.setCacheDuration(new Duration(30, SECONDS))
.setBearerTokenFile(null)
.setUser(null)
.setPassword(null)
.setReadTimeout(new Duration(10, SECONDS)));
}

Expand All @@ -54,6 +56,8 @@ public void testExplicitPropertyMappings()
.put("prometheus.max.query.range.duration", "1095d")
.put("prometheus.cache.ttl", "60s")
.put("prometheus.bearer.token.file", "/tmp/bearer_token.txt")
.put("prometheus.auth.user", "admin")
.put("prometheus.auth.password", "password")
.put("prometheus.read-timeout", "30s")
.buildOrThrow();

Expand All @@ -64,6 +68,8 @@ public void testExplicitPropertyMappings()
expected.setMaxQueryRangeDuration(new Duration(1095, DAYS));
expected.setCacheDuration(new Duration(60, SECONDS));
expected.setBearerTokenFile(new File("/tmp/bearer_token.txt"));
expected.setUser("admin");
expected.setPassword("password");
expected.setReadTimeout(new Duration(30, SECONDS));

assertFullMapping(properties, expected);
Expand All @@ -82,4 +88,24 @@ public void testFailOnDurationLessThanQueryChunkConfig()
.isInstanceOf(ConfigurationException.class)
.hasMessageContaining("prometheus.max.query.range.duration must be greater than prometheus.query.chunk.size.duration");
}

@Test
public void testInvalidAuth()
{
assertThatThrownBy(new PrometheusConnectorConfig().setBearerTokenFile(new File("/tmp/bearer_token.txt")).setUser("test")::checkConfig)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Either on of bearer token file or basic authentication should be used");

assertThatThrownBy(new PrometheusConnectorConfig().setBearerTokenFile(new File("/tmp/bearer_token.txt")).setPassword("test")::checkConfig)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Either on of bearer token file or basic authentication should be used");

assertThatThrownBy(new PrometheusConnectorConfig().setUser("test")::checkConfig)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Both username and password must be set when using basic authentication");

assertThatThrownBy(new PrometheusConnectorConfig().setPassword("test")::checkConfig)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Both username and password must be set when using basic authentication");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
this.server = new PrometheusServer();
return createPrometheusQueryRunner(server, ImmutableMap.of());
return createPrometheusQueryRunner(server, ImmutableMap.of(), ImmutableMap.of());
}

@Test
Expand Down
Loading