Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ If ``pinot.controller.authentication.type`` is set to ``PASSWORD`` then both ``p
If ``pinot.broker.authentication.type`` is set to ``PASSWORD`` then both ``pinot.broker.authentication.user`` and
``pinot.broker.authentication.password`` are required.

If ``pinot.controller-urls`` uses ``https`` scheme then TLS is enabled for all connections including brokers.

gRPC configuration properties
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airlift.units.MinDuration;

import javax.annotation.PostConstruct;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -239,6 +240,11 @@ public PinotConfig setGrpcEnabled(boolean grpcEnabled)
return this;
}

public boolean isTlsEnabled()
{
return "https".equalsIgnoreCase(getControllerUrls().get(0).getScheme());
}

public DataSize getTargetSegmentPageSize()
{
return this.targetSegmentPageSize;
Expand All @@ -258,4 +264,13 @@ public void validate()
!countDistinctPushdownEnabled || aggregationPushdownEnabled,
"Invalid configuration: pinot.aggregation-pushdown.enabled must be enabled if pinot.count-distinct-pushdown.enabled");
}

@AssertTrue(message = "All controller URLs must have the same scheme")
public boolean allUrlSchemesEqual()
{
return controllerUrls.stream()
.map(URI::getScheme)
.distinct()
.count() == 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.common.net.HostAndPort;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.http.client.JsonResponseHandler;
import io.airlift.http.client.Request;
import io.airlift.http.client.StaticBodyGenerator;
Expand Down Expand Up @@ -114,11 +116,12 @@ public class PinotClient
private static final String TABLE_SCHEMA_API_TEMPLATE = "tables/%s/schema";
private static final String ROUTING_TABLE_API_TEMPLATE = "debug/routingTable/%s";
private static final String TIME_BOUNDARY_API_TEMPLATE = "debug/timeBoundary/%s";
private static final String QUERY_URL_TEMPLATE = "http://%s/query/sql";
private static final String QUERY_URL_PATH = "query/sql";

private final List<URI> controllerUrls;
private final HttpClient httpClient;
private final PinotHostMapper pinotHostMapper;
private final String scheme;

private final NonEvictableLoadingCache<String, List<String>> brokersForTableCache;
private final NonEvictableLoadingCache<Object, Multimap<String, String>> allTablesCache;
Expand Down Expand Up @@ -152,6 +155,7 @@ public PinotClient(
this.brokerResponseCodec = requireNonNull(brokerResponseCodec, "brokerResponseCodec is null");
requireNonNull(config, "config is null");
this.pinotHostMapper = requireNonNull(pinotHostMapper, "pinotHostMapper is null");
this.scheme = config.isTlsEnabled() ? "https" : "http";

this.controllerUrls = config.getControllerUrls();
this.httpClient = requireNonNull(httpClient, "httpClient is null");
Expand Down Expand Up @@ -213,8 +217,9 @@ private <T> T sendHttpGetToControllerJson(String path, JsonCodec<T> codec)
{
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = ImmutableMultimap.builder();
controllerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token));
URI controllerPathUri = uriBuilderFrom(getControllerUrl()).appendPath(path).scheme(scheme).build();
return doHttpActionWithHeadersJson(
Request.Builder.prepareGet().setUri(uriBuilderFrom(getControllerUrl()).appendPath(path).build()),
Request.Builder.prepareGet().setUri(controllerPathUri),
Optional.empty(),
codec,
additionalHeadersBuilder.build());
Expand All @@ -224,8 +229,13 @@ private <T> T sendHttpGetToBrokerJson(String table, String path, JsonCodec<T> co
{
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = ImmutableMultimap.builder();
brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token));
URI brokerPathUri = HttpUriBuilder.uriBuilder()
.hostAndPort(HostAndPort.fromString(getBrokerHost(table)))
.scheme(scheme)
.appendPath(path)
.build();
return doHttpActionWithHeadersJson(
Request.Builder.prepareGet().setUri(URI.create(format("http://%s/%s", getBrokerHost(table), path))),
Request.Builder.prepareGet().setUri(brokerPathUri),
Optional.empty(),
codec,
additionalHeadersBuilder.build());
Expand Down Expand Up @@ -526,10 +536,14 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin
{
String queryRequest = QUERY_REQUEST_JSON_CODEC.toJson(new QueryRequest(query.getQuery()));
return doWithRetries(PinotSessionProperties.getPinotRetryCount(session), retryNumber -> {
String queryHost = getBrokerHost(query.getTable());
LOG.info("Query '%s' on broker host '%s'", queryHost, query.getQuery());
Request.Builder builder = Request.Builder.preparePost()
.setUri(URI.create(format(QUERY_URL_TEMPLATE, queryHost)));
URI queryPathUri = HttpUriBuilder.uriBuilder()
.hostAndPort(HostAndPort.fromString(getBrokerHost(query.getTable())))
.scheme(scheme)
.appendPath(QUERY_URL_PATH)
.build();
LOG.info("Query '%s' on broker host '%s'", query.getQuery(), queryPathUri);
Request.Builder builder = Request.Builder.preparePost().setUri(queryPathUri);

ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = ImmutableMultimap.builder();
brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token));
BrokerResponseNative response = doHttpActionWithHeadersJson(builder, Optional.of(queryRequest), brokerResponseCodec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestPinotConfig
{
Expand Down Expand Up @@ -52,7 +54,7 @@ public void testDefaults()
public void testExplicitPropertyMappings()
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("pinot.controller-urls", "host1:1111,host2:1111")
.put("pinot.controller-urls", "https://host1:1111,https://host2:1111")
.put("pinot.estimated-size-in-bytes-for-non-numeric-column", "30")
.put("pinot.connection-timeout", "8m")
.put("pinot.metadata-expiry", "1m")
Expand All @@ -69,7 +71,7 @@ public void testExplicitPropertyMappings()
.buildOrThrow();

PinotConfig expected = new PinotConfig()
.setControllerUrls("host1:1111,host2:1111")
.setControllerUrls("https://host1:1111,https://host2:1111")
.setEstimatedSizeInBytesForNonNumericColumn(30)
.setConnectionTimeout(new Duration(8, TimeUnit.MINUTES))
.setMetadataCacheExpiry(new Duration(1, TimeUnit.MINUTES))
Expand Down Expand Up @@ -97,4 +99,26 @@ public void testInvalidCountDistinctPushdown()
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid configuration: pinot.aggregation-pushdown.enabled must be enabled if pinot.count-distinct-pushdown.enabled");
}

@Test
public void testControllerUrls()
{
PinotConfig config = new PinotConfig();
config.setControllerUrls("my-controller-1:8443,my-controller-2:8443");
assertTrue(config.allUrlSchemesEqual());
assertFalse(config.isTlsEnabled());
config.setControllerUrls("http://my-controller-1:9000,http://my-controller-2:9000");
assertTrue(config.allUrlSchemesEqual());
assertFalse(config.isTlsEnabled());
config.setControllerUrls("https://my-controller-1:8443,https://my-controller-2:8443");
assertTrue(config.allUrlSchemesEqual());
assertTrue(config.isTlsEnabled());
config.setControllerUrls("my-controller-1:8443,http://my-controller-2:8443");
assertTrue(config.allUrlSchemesEqual());
assertFalse(config.isTlsEnabled());
config.setControllerUrls("http://my-controller-1:8443,https://my-controller-2:8443");
assertFalse(config.allUrlSchemesEqual());
config.setControllerUrls("my-controller-1:8443,https://my-controller-2:8443");
assertFalse(config.allUrlSchemesEqual());
}
}