Skip to content

Commit

Permalink
Add session properties for prometheus connector
Browse files Browse the repository at this point in the history
Size of different metrics may differ hence adding the session properties
  • Loading branch information
amitkharb committed Aug 22, 2024
1 parent b5ea2c9 commit 61f152a
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 12 deletions.
6 changes: 4 additions & 2 deletions docs/src/main/sphinx/connector/prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ The following configuration properties are available:
- Where to find Prometheus coordinator host.
- `http://localhost:9090`
* - `prometheus.query.chunk.size.duration`
- The duration of each query to Prometheus.
- The duration of each query to Prometheus.
The equivalent catalog session property is `query_chunk_size_duration`.
- `1d`
* - `prometheus.max.query.range.duration`
- Width of overall query to Prometheus, will be divided into
`prometheus.query.chunk.size.duration` queries.
`prometheus.query.chunk.size.duration` queries.
The equivalent catalog session property is `max_query_range_duration`.
- `21d`
* - `prometheus.cache.ttl`
- How long values from this config file are cached.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
import com.google.inject.Inject;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.log.Logger;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;

import java.util.List;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.prometheus.PrometheusTransactionHandle.INSTANCE;
import static java.util.Objects.requireNonNull;

Expand All @@ -36,18 +42,23 @@ public class PrometheusConnector
private final PrometheusMetadata metadata;
private final PrometheusSplitManager splitManager;
private final PrometheusRecordSetProvider recordSetProvider;
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public PrometheusConnector(
LifeCycleManager lifeCycleManager,
PrometheusMetadata metadata,
PrometheusSplitManager splitManager,
PrometheusRecordSetProvider recordSetProvider)
PrometheusRecordSetProvider recordSetProvider,
Set<SessionPropertiesProvider> sessionPropertiesProviders)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
.flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
.collect(toImmutableList());
}

@Override
Expand All @@ -74,6 +85,12 @@ public ConnectorRecordSetProvider getRecordSetProvider()
return recordSetProvider;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}

@Override
public final void shutdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.trino.plugin.base.session.SessionPropertiesProvider;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;

Expand All @@ -32,8 +34,11 @@ public void configure(Binder binder)
binder.bind(PrometheusSplitManager.class).in(Scopes.SINGLETON);
binder.bind(PrometheusClock.class).in(Scopes.SINGLETON);
binder.bind(PrometheusRecordSetProvider.class).in(Scopes.SINGLETON);
binder.bind(PrometheusSessionProperties.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(PrometheusConnectorConfig.class);

newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(PrometheusSessionProperties.class).in(Scopes.SINGLETON);

jsonCodecBinder(binder).bindMapJsonCodec(String.class, Object.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.prometheus;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;

import java.util.List;

import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;

public final class PrometheusSessionProperties
implements SessionPropertiesProvider
{
private static final String QUERY_CHUNK_SIZE_DURATION = "query_chunk_size_duration";
private static final String MAX_QUERY_RANGE_DURATION = "max_query_range_duration";

private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public PrometheusSessionProperties(PrometheusConnectorConfig connectorConfig)
{
sessionProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(durationProperty(
QUERY_CHUNK_SIZE_DURATION,
"The duration of each query to Prometheus",
connectorConfig.getQueryChunkSizeDuration(),
false))
.add(durationProperty(
MAX_QUERY_RANGE_DURATION,
"Width of overall query to Prometheus, will be divided into query_chunk_size_duration queries",
connectorConfig.getMaxQueryRangeDuration(),
false))
.build();
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}

public static Duration getQueryChunkSize(ConnectorSession session)
{
return session.getProperty(QUERY_CHUNK_SIZE_DURATION, Duration.class);
}

public static Duration getMaxQueryRange(ConnectorSession session)
{
return session.getProperty(MAX_QUERY_RANGE_DURATION, Duration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@

import static io.trino.plugin.prometheus.PrometheusClient.TIMESTAMP_COLUMN_TYPE;
import static io.trino.plugin.prometheus.PrometheusErrorCode.PROMETHEUS_UNKNOWN_ERROR;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getMaxQueryRange;
import static io.trino.plugin.prometheus.PrometheusSessionProperties.getQueryChunkSize;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static java.time.Instant.ofEpochMilli;
import static java.util.Objects.requireNonNull;
Expand All @@ -59,17 +61,13 @@ public class PrometheusSplitManager
private final PrometheusClock prometheusClock;

private final URI prometheusURI;
private final Duration maxQueryRangeDuration;
private final Duration queryChunkSizeDuration;

@Inject
public PrometheusSplitManager(PrometheusClient prometheusClient, PrometheusClock prometheusClock, PrometheusConnectorConfig config)
{
this.prometheusClient = requireNonNull(prometheusClient, "prometheusClient is null");
this.prometheusClock = requireNonNull(prometheusClock, "prometheusClock is null");
this.prometheusURI = config.getPrometheusURI();
this.maxQueryRangeDuration = config.getMaxQueryRangeDuration();
this.queryChunkSizeDuration = config.getQueryChunkSizeDuration();
}

@Override
Expand All @@ -87,6 +85,10 @@ public ConnectorSplitSource getSplits(
if (table == null) {
throw new TableNotFoundException(tableHandle.toSchemaTableName());
}

Duration maxQueryRangeDuration = getMaxQueryRange(session);
Duration queryChunkSizeDuration = getQueryChunkSize(session);

List<ConnectorSplit> splits = generateTimesForSplits(prometheusClock.now(), maxQueryRangeDuration, queryChunkSizeDuration, tableHandle)
.stream()
.map(time -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package io.trino.plugin.prometheus;

import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorSession;
import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -110,6 +113,7 @@ public void testDescribeTable()
"('value', 'double', '', '')");
}

// TODO rewrite this test based on query.
@Test
public void testCorrectNumberOfSplitsCreated()
{
Expand All @@ -120,9 +124,13 @@ public void testCorrectNumberOfSplitsCreated()
config.setCacheDuration(new Duration(30, SECONDS));
PrometheusTable table = client.getTable("default", "up");
PrometheusSplitManager splitManager = new PrometheusSplitManager(client, new PrometheusClock(), config);
PrometheusSessionProperties sessionProperties = new PrometheusSessionProperties(config);
ConnectorSession session = TestingConnectorSession.builder()
.setPropertyMetadata(sessionProperties.getSessionProperties())
.build();
ConnectorSplitSource splits = splitManager.getSplits(
null,
null,
session,
newTableHandle("default", table.name()),
(DynamicFilter) null,
Constraint.alwaysTrue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airlift.json.JsonCodec;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.Constraint;
Expand All @@ -26,6 +27,7 @@
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.testing.TestingConnectorSession;
import org.apache.http.NameValuePair;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -124,9 +126,13 @@ public void testQueryWithTableNameNeedingURLEncodeInSplits()
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TESTING_TYPE_MANAGER);
PrometheusTable table = client.getTable("default", "up now");
PrometheusSplitManager splitManager = new PrometheusSplitManager(client, fixedClockAt(now), config);
PrometheusSessionProperties sessionProperties = new PrometheusSessionProperties(config);
ConnectorSession session = TestingConnectorSession.builder()
.setPropertyMetadata(sessionProperties.getSessionProperties())
.build();
ConnectorSplitSource splits = splitManager.getSplits(
null,
null,
session,
newTableHandle("default", table.name()),
(DynamicFilter) null,
Constraint.alwaysTrue());
Expand All @@ -148,9 +154,13 @@ public void testQueryDividedIntoSplitsFirstSplitHasRightTime()
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TESTING_TYPE_MANAGER);
PrometheusTable table = client.getTable("default", "up");
PrometheusSplitManager splitManager = new PrometheusSplitManager(client, fixedClockAt(now), config);
PrometheusSessionProperties sessionProperties = new PrometheusSessionProperties(config);
ConnectorSession session = TestingConnectorSession.builder()
.setPropertyMetadata(sessionProperties.getSessionProperties())
.build();
ConnectorSplitSource splits = splitManager.getSplits(
null,
null,
session,
newTableHandle("default", table.name()),
(DynamicFilter) null,
Constraint.alwaysTrue());
Expand All @@ -172,9 +182,13 @@ public void testQueryDividedIntoSplitsLastSplitHasRightTime()
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TESTING_TYPE_MANAGER);
PrometheusTable table = client.getTable("default", "up");
PrometheusSplitManager splitManager = new PrometheusSplitManager(client, fixedClockAt(now), config);
PrometheusSessionProperties sessionProperties = new PrometheusSessionProperties(config);
ConnectorSession session = TestingConnectorSession.builder()
.setPropertyMetadata(sessionProperties.getSessionProperties())
.build();
ConnectorSplitSource splitsMaybe = splitManager.getSplits(
null,
null,
session,
newTableHandle("default", table.name()),
(DynamicFilter) null,
Constraint.alwaysTrue());
Expand All @@ -197,9 +211,13 @@ public void testQueryDividedIntoSplitsShouldHaveCorrectSpacingBetweenTimes()
PrometheusClient client = new PrometheusClient(config, METRIC_CODEC, TESTING_TYPE_MANAGER);
PrometheusTable table = client.getTable("default", "up");
PrometheusSplitManager splitManager = new PrometheusSplitManager(client, fixedClockAt(now), config);
PrometheusSessionProperties sessionProperties = new PrometheusSessionProperties(config);
ConnectorSession session = TestingConnectorSession.builder()
.setPropertyMetadata(sessionProperties.getSessionProperties())
.build();
ConnectorSplitSource splits = splitManager.getSplits(
null,
null,
session,
newTableHandle("default", table.name()),
(DynamicFilter) null,
Constraint.alwaysTrue());
Expand Down

0 comments on commit 61f152a

Please sign in to comment.