diff --git a/docs/src/main/sphinx/connector/pinot.rst b/docs/src/main/sphinx/connector/pinot.rst index 92a39963dd2e..2e3fe015dae0 100644 --- a/docs/src/main/sphinx/connector/pinot.rst +++ b/docs/src/main/sphinx/connector/pinot.rst @@ -82,6 +82,8 @@ If ``pinot.controller.authentication.type`` is set to ``PASSWORD`` then both ``p If ``pinot.broker.authentication.type`` is set to ``PASSWORD`` then both ``pinot.broker.authentication.user`` and ``pinot.broker.authentication.password`` are required. +If ``pinot.controller-urls`` uses ``https`` scheme then TLS is enabled for all connections including brokers. + gRPC configuration properties ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java index 8318a865bc72..84b9096e07ed 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java @@ -23,6 +23,7 @@ import io.airlift.units.MinDuration; import javax.annotation.PostConstruct; +import javax.validation.constraints.AssertTrue; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; @@ -239,6 +240,11 @@ public PinotConfig setGrpcEnabled(boolean grpcEnabled) return this; } + public boolean isTlsEnabled() + { + return "https".equalsIgnoreCase(getControllerUrls().get(0).getScheme()); + } + public DataSize getTargetSegmentPageSize() { return this.targetSegmentPageSize; @@ -258,4 +264,13 @@ public void validate() !countDistinctPushdownEnabled || aggregationPushdownEnabled, "Invalid configuration: pinot.aggregation-pushdown.enabled must be enabled if pinot.count-distinct-pushdown.enabled"); } + + @AssertTrue(message = "All controller URLs must have the same scheme") + public boolean allUrlSchemesEqual() + { + return controllerUrls.stream() + .map(URI::getScheme) + .distinct() + .count() == 1; + } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java index 268b805996c2..ae8a1001ac0d 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java @@ -27,7 +27,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; +import com.google.common.net.HostAndPort; import io.airlift.http.client.HttpClient; +import io.airlift.http.client.HttpUriBuilder; import io.airlift.http.client.JsonResponseHandler; import io.airlift.http.client.Request; import io.airlift.http.client.StaticBodyGenerator; @@ -114,11 +116,12 @@ public class PinotClient private static final String TABLE_SCHEMA_API_TEMPLATE = "tables/%s/schema"; private static final String ROUTING_TABLE_API_TEMPLATE = "debug/routingTable/%s"; private static final String TIME_BOUNDARY_API_TEMPLATE = "debug/timeBoundary/%s"; - private static final String QUERY_URL_TEMPLATE = "http://%s/query/sql"; + private static final String QUERY_URL_PATH = "query/sql"; private final List controllerUrls; private final HttpClient httpClient; private final PinotHostMapper pinotHostMapper; + private final String scheme; private final NonEvictableLoadingCache> brokersForTableCache; private final NonEvictableLoadingCache> allTablesCache; @@ -152,6 +155,7 @@ public PinotClient( this.brokerResponseCodec = requireNonNull(brokerResponseCodec, "brokerResponseCodec is null"); requireNonNull(config, "config is null"); this.pinotHostMapper = requireNonNull(pinotHostMapper, "pinotHostMapper is null"); + this.scheme = config.isTlsEnabled() ? "https" : "http"; this.controllerUrls = config.getControllerUrls(); this.httpClient = requireNonNull(httpClient, "httpClient is null"); @@ -213,8 +217,9 @@ private T sendHttpGetToControllerJson(String path, JsonCodec codec) { ImmutableMultimap.Builder additionalHeadersBuilder = ImmutableMultimap.builder(); controllerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token)); + URI controllerPathUri = uriBuilderFrom(getControllerUrl()).appendPath(path).scheme(scheme).build(); return doHttpActionWithHeadersJson( - Request.Builder.prepareGet().setUri(uriBuilderFrom(getControllerUrl()).appendPath(path).build()), + Request.Builder.prepareGet().setUri(controllerPathUri), Optional.empty(), codec, additionalHeadersBuilder.build()); @@ -224,8 +229,13 @@ private T sendHttpGetToBrokerJson(String table, String path, JsonCodec co { ImmutableMultimap.Builder additionalHeadersBuilder = ImmutableMultimap.builder(); brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token)); + URI brokerPathUri = HttpUriBuilder.uriBuilder() + .hostAndPort(HostAndPort.fromString(getBrokerHost(table))) + .scheme(scheme) + .appendPath(path) + .build(); return doHttpActionWithHeadersJson( - Request.Builder.prepareGet().setUri(URI.create(format("http://%s/%s", getBrokerHost(table), path))), + Request.Builder.prepareGet().setUri(brokerPathUri), Optional.empty(), codec, additionalHeadersBuilder.build()); @@ -526,10 +536,14 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin { String queryRequest = QUERY_REQUEST_JSON_CODEC.toJson(new QueryRequest(query.getQuery())); return doWithRetries(PinotSessionProperties.getPinotRetryCount(session), retryNumber -> { - String queryHost = getBrokerHost(query.getTable()); - LOG.info("Query '%s' on broker host '%s'", queryHost, query.getQuery()); - Request.Builder builder = Request.Builder.preparePost() - .setUri(URI.create(format(QUERY_URL_TEMPLATE, queryHost))); + URI queryPathUri = HttpUriBuilder.uriBuilder() + .hostAndPort(HostAndPort.fromString(getBrokerHost(query.getTable()))) + .scheme(scheme) + .appendPath(QUERY_URL_PATH) + .build(); + LOG.info("Query '%s' on broker host '%s'", query.getQuery(), queryPathUri); + Request.Builder builder = Request.Builder.preparePost().setUri(queryPathUri); + ImmutableMultimap.Builder additionalHeadersBuilder = ImmutableMultimap.builder(); brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token)); BrokerResponseNative response = doHttpActionWithHeadersJson(builder, Optional.of(queryRequest), brokerResponseCodec, diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java index fcd75036696d..773b80f05d60 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java @@ -24,6 +24,8 @@ import static io.airlift.units.DataSize.Unit.MEGABYTE; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; public class TestPinotConfig { @@ -52,7 +54,7 @@ public void testDefaults() public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() - .put("pinot.controller-urls", "host1:1111,host2:1111") + .put("pinot.controller-urls", "https://host1:1111,https://host2:1111") .put("pinot.estimated-size-in-bytes-for-non-numeric-column", "30") .put("pinot.connection-timeout", "8m") .put("pinot.metadata-expiry", "1m") @@ -69,7 +71,7 @@ public void testExplicitPropertyMappings() .buildOrThrow(); PinotConfig expected = new PinotConfig() - .setControllerUrls("host1:1111,host2:1111") + .setControllerUrls("https://host1:1111,https://host2:1111") .setEstimatedSizeInBytesForNonNumericColumn(30) .setConnectionTimeout(new Duration(8, TimeUnit.MINUTES)) .setMetadataCacheExpiry(new Duration(1, TimeUnit.MINUTES)) @@ -97,4 +99,26 @@ public void testInvalidCountDistinctPushdown() .isInstanceOf(IllegalStateException.class) .hasMessage("Invalid configuration: pinot.aggregation-pushdown.enabled must be enabled if pinot.count-distinct-pushdown.enabled"); } + + @Test + public void testControllerUrls() + { + PinotConfig config = new PinotConfig(); + config.setControllerUrls("my-controller-1:8443,my-controller-2:8443"); + assertTrue(config.allUrlSchemesEqual()); + assertFalse(config.isTlsEnabled()); + config.setControllerUrls("http://my-controller-1:9000,http://my-controller-2:9000"); + assertTrue(config.allUrlSchemesEqual()); + assertFalse(config.isTlsEnabled()); + config.setControllerUrls("https://my-controller-1:8443,https://my-controller-2:8443"); + assertTrue(config.allUrlSchemesEqual()); + assertTrue(config.isTlsEnabled()); + config.setControllerUrls("my-controller-1:8443,http://my-controller-2:8443"); + assertTrue(config.allUrlSchemesEqual()); + assertFalse(config.isTlsEnabled()); + config.setControllerUrls("http://my-controller-1:8443,https://my-controller-2:8443"); + assertFalse(config.allUrlSchemesEqual()); + config.setControllerUrls("my-controller-1:8443,https://my-controller-2:8443"); + assertFalse(config.allUrlSchemesEqual()); + } }