diff --git a/docs/src/main/sphinx/connector/pinot.rst b/docs/src/main/sphinx/connector/pinot.rst index 2e3fe015dae0..f71e0b68f04c 100644 --- a/docs/src/main/sphinx/connector/pinot.rst +++ b/docs/src/main/sphinx/connector/pinot.rst @@ -74,6 +74,11 @@ Property name Required Description ``pinot.aggregation-pushdown.enabled`` No Push down aggregation queries, default is ``true``. ``pinot.count-distinct-pushdown.enabled`` No Push down count distinct queries to Pinot, default is ``true``. ``pinot.target-segment-page-size`` No Max allowed page size for segment query, default is ``1MB``. +``pinot.extra-http-headers`` No Extra headers when sending HTTP based pinot requests to Pinot controller/broker. + Headers are comma-separated, and each header key value pair is colon-separated, + e.g. ``k1:v1,k2:v2,k3:v3 with space`` with space, default is an empty map. Note + that if controller/broker authentications is enabled and ``Authorization`` header + is set, an exception will raise. ========================================================= ========== ============================================================================== If ``pinot.controller.authentication.type`` is set to ``PASSWORD`` then both ``pinot.controller.authentication.user`` and diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml index d422ea4fcf69..3180f98ad34c 100755 --- a/plugin/trino-pinot/pom.xml +++ b/plugin/trino-pinot/pom.xml @@ -163,6 +163,12 @@ validation-api + + org.apache.commons + commons-lang3 + 3.8 + + org.apache.helix helix-core @@ -461,13 +467,6 @@ runtime - - org.apache.commons - commons-lang3 - 3.8 - runtime - - org.apache.httpcomponents httpcore diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java index 84b9096e07ed..040111092b0a 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java @@ -15,12 +15,14 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.DefunctConfig; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.airlift.units.MinDuration; +import org.apache.commons.lang3.StringUtils; import javax.annotation.PostConstruct; import javax.validation.constraints.AssertTrue; @@ -29,6 +31,7 @@ import java.net.URI; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; @@ -47,6 +50,7 @@ public class PinotConfig { private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings(); + private static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings().withKeyValueSeparator(":"); private List controllerUrls = ImmutableList.of(); @@ -65,6 +69,7 @@ public class PinotConfig private boolean countDistinctPushdownEnabled = true; private boolean grpcEnabled = true; private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE); + private Map extraHttpHeaders = ImmutableMap.of(); @NotEmpty(message = "pinot.controller-urls cannot be empty") public List getControllerUrls() @@ -257,12 +262,29 @@ public PinotConfig setTargetSegmentPageSize(DataSize targetSegmentPageSize) return this; } + @NotNull + public Map getExtraHttpHeaders() + { + return extraHttpHeaders; + } + + @Config("pinot.extra-http-headers") + public PinotConfig setExtraHttpHeaders(String headers) + { + extraHttpHeaders = ImmutableMap.copyOf(MAP_SPLITTER.split(headers)); + return this; + } + @PostConstruct public void validate() { checkState( !countDistinctPushdownEnabled || aggregationPushdownEnabled, "Invalid configuration: pinot.aggregation-pushdown.enabled must be enabled if pinot.count-distinct-pushdown.enabled"); + extraHttpHeaders.forEach((k, v) -> { + checkState(!StringUtils.isNotEmpty(v), "Found null value in extraHttpHeaders for key: " + k); + checkState(!StringUtils.isNotEmpty(k), "Found empty key in extraHttpHeaders"); + }); } @AssertTrue(message = "All controller URLs must have the same scheme") diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java index ae8a1001ac0d..41c5d7169d3f 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java @@ -46,6 +46,7 @@ import io.trino.plugin.pinot.PinotException; import io.trino.plugin.pinot.PinotInsufficientServerResponseException; import io.trino.plugin.pinot.PinotSessionProperties; +import io.trino.plugin.pinot.auth.PinotAuthenticationProvider; import io.trino.plugin.pinot.auth.PinotBrokerAuthenticationProvider; import io.trino.plugin.pinot.auth.PinotControllerAuthenticationProvider; import io.trino.plugin.pinot.query.PinotQueryInfo; @@ -92,6 +93,7 @@ import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_AMBIGUOUS_TABLE_NAME; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; +import static io.trino.plugin.pinot.PinotErrorCode.PINOT_HTTP_ERROR; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNABLE_TO_FIND_BROKER; import static io.trino.plugin.pinot.PinotMetadata.SCHEMA_NAME; import static java.lang.String.format; @@ -122,6 +124,7 @@ public class PinotClient private final HttpClient httpClient; private final PinotHostMapper pinotHostMapper; private final String scheme; + private final Map extraHttpHeaders; private final NonEvictableLoadingCache> brokersForTableCache; private final NonEvictableLoadingCache> allTablesCache; @@ -159,6 +162,7 @@ public PinotClient( this.controllerUrls = config.getControllerUrls(); this.httpClient = requireNonNull(httpClient, "httpClient is null"); + this.extraHttpHeaders = config.getExtraHttpHeaders(); this.brokersForTableCache = buildNonEvictableCache( CacheBuilder.newBuilder() .expireAfterWrite(config.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS), @@ -200,7 +204,7 @@ protected T doHttpActionWithHeadersJson( } catch (UnexpectedResponseException e) { throw new PinotException( - PinotErrorCode.PINOT_HTTP_ERROR, + PINOT_HTTP_ERROR, Optional.empty(), format( "Unexpected response status: %d for request %s to url %s, with headers %s, full response %s", @@ -215,8 +219,7 @@ protected T doHttpActionWithHeadersJson( private T sendHttpGetToControllerJson(String path, JsonCodec codec) { - ImmutableMultimap.Builder additionalHeadersBuilder = ImmutableMultimap.builder(); - controllerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token)); + ImmutableMultimap.Builder additionalHeadersBuilder = buildAdditionalHeaders(controllerAuthenticationProvider, extraHttpHeaders); URI controllerPathUri = uriBuilderFrom(getControllerUrl()).appendPath(path).scheme(scheme).build(); return doHttpActionWithHeadersJson( Request.Builder.prepareGet().setUri(controllerPathUri), @@ -227,8 +230,7 @@ private T sendHttpGetToControllerJson(String path, JsonCodec codec) private T sendHttpGetToBrokerJson(String table, String path, JsonCodec codec) { - ImmutableMultimap.Builder additionalHeadersBuilder = ImmutableMultimap.builder(); - brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token)); + ImmutableMultimap.Builder additionalHeadersBuilder = buildAdditionalHeaders(brokerAuthenticationProvider, extraHttpHeaders); URI brokerPathUri = HttpUriBuilder.uriBuilder() .hostAndPort(HostAndPort.fromString(getBrokerHost(table))) .scheme(scheme) @@ -241,6 +243,21 @@ private T sendHttpGetToBrokerJson(String table, String path, JsonCodec co additionalHeadersBuilder.build()); } + public static ImmutableMultimap.Builder buildAdditionalHeaders(PinotAuthenticationProvider authenticationProvider, Map extraHttpHeaders) + { + Optional authenticationToken = authenticationProvider.getAuthenticationToken(); + if (authenticationToken.isPresent() && extraHttpHeaders.containsKey(AUTHORIZATION)) { + throw new PinotException( + PINOT_HTTP_ERROR, + Optional.empty(), + format("Duplicated Authorization header found in extra http header config when Pinot authentication is enabled.")); + } + ImmutableMultimap.Builder additionalHeadersBuilder = ImmutableMultimap.builder(); + authenticationToken.ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token)); + extraHttpHeaders.forEach((k, v) -> additionalHeadersBuilder.put(k, v)); + return additionalHeadersBuilder; + } + private URI getControllerUrl() { return controllerUrls.get(ThreadLocalRandom.current().nextInt(controllerUrls.size())); @@ -544,8 +561,7 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin LOG.info("Query '%s' on broker host '%s'", query.getQuery(), queryPathUri); Request.Builder builder = Request.Builder.preparePost().setUri(queryPathUri); - ImmutableMultimap.Builder additionalHeadersBuilder = ImmutableMultimap.builder(); - brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token)); + ImmutableMultimap.Builder additionalHeadersBuilder = buildAdditionalHeaders(brokerAuthenticationProvider, extraHttpHeaders); BrokerResponseNative response = doHttpActionWithHeadersJson(builder, Optional.of(queryRequest), brokerResponseCodec, additionalHeadersBuilder.build()); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotClient.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotClient.java index 43d79534f509..55ddc89c60bc 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotClient.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotClient.java @@ -13,32 +13,42 @@ */ package io.trino.plugin.pinot; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; +import com.google.common.net.HttpHeaders; import com.google.common.net.MediaType; import io.airlift.http.client.HttpClient; import io.airlift.http.client.HttpStatus; import io.airlift.http.client.testing.TestingHttpClient; import io.airlift.http.client.testing.TestingResponse; import io.airlift.units.Duration; +import io.trino.plugin.pinot.auth.PinotAuthenticationProvider; import io.trino.plugin.pinot.auth.PinotBrokerAuthenticationProvider; import io.trino.plugin.pinot.auth.PinotControllerAuthenticationProvider; import io.trino.plugin.pinot.auth.none.PinotEmptyAuthenticationProvider; +import io.trino.plugin.pinot.auth.password.PinotPasswordAuthenticationProvider; import io.trino.plugin.pinot.client.IdentityPinotHostMapper; import io.trino.plugin.pinot.client.PinotClient; -import io.trino.testing.assertions.Assert; import org.testng.annotations.Test; +import java.util.Map; import java.util.concurrent.TimeUnit; import static io.airlift.concurrent.Threads.threadsNamed; import static java.util.concurrent.Executors.newCachedThreadPool; +import static org.assertj.core.api.Assertions.assertThat; public class TestPinotClient { @Test public void testBrokersParsed() { - HttpClient httpClient = new TestingHttpClient(request -> TestingResponse.mockResponse(HttpStatus.OK, MediaType.JSON_UTF_8, "{\n" + + HttpClient httpClient = new TestingHttpClient(request -> { + assertThat(request.getHeaders().get("k1").get(0)).isEqualTo("v1"); + assertThat(request.getHeaders().get("k2").get(0)).isEqualTo("v2"); + assertThat(request.getHeaders().get("k3").get(0)).isEqualTo("some random v3"); + return TestingResponse.mockResponse(HttpStatus.OK, MediaType.JSON_UTF_8, "{\n" + " \"tableName\": \"dummy\",\n" + " \"brokers\": [\n" + " {\n" + @@ -76,10 +86,12 @@ public void testBrokersParsed() " ]\n" + " }\n" + " ]\n" + - "}")); + "}"); + }); PinotConfig pinotConfig = new PinotConfig() .setMetadataCacheExpiry(new Duration(1, TimeUnit.MILLISECONDS)) - .setControllerUrls("localhost:7900"); + .setControllerUrls("localhost:7900") + .setExtraHttpHeaders("k1:v1,k2:v2,k3:some random v3"); PinotClient pinotClient = new PinotClient( pinotConfig, new IdentityPinotHostMapper(), @@ -92,6 +104,26 @@ public void testBrokersParsed() PinotControllerAuthenticationProvider.create(PinotEmptyAuthenticationProvider.instance()), PinotBrokerAuthenticationProvider.create(PinotEmptyAuthenticationProvider.instance())); ImmutableSet brokers = ImmutableSet.copyOf(pinotClient.getAllBrokersForTable("dummy")); - Assert.assertEquals(ImmutableSet.of("dummy-broker-host1-datacenter1:6513", "dummy-broker-host2-datacenter1:6513", "dummy-broker-host3-datacenter1:6513", "dummy-broker-host4-datacenter1:6513"), brokers); + assertThat(ImmutableSet.of("dummy-broker-host1-datacenter1:6513", "dummy-broker-host2-datacenter1:6513", "dummy-broker-host3-datacenter1:6513", "dummy-broker-host4-datacenter1:6513")).isEqualTo(brokers); + } + + @Test + public void testBuildAdditionalHeaders() + { + Map extraHttpHeaders = ImmutableMap.of("k1", "v1", "k2", "value with space"); + PinotAuthenticationProvider authProvider = new PinotPasswordAuthenticationProvider("user", "password"); + ImmutableMultimap headers = PinotClient.buildAdditionalHeaders(authProvider, extraHttpHeaders).build(); + assertThat(headers.get("k1").iterator().next()).isEqualTo("v1"); + assertThat(headers.get("k2").iterator().next()).isEqualTo("value with space"); + assertThat(headers.get(HttpHeaders.AUTHORIZATION).iterator().next()).isEqualTo("Basic dXNlcjpwYXNzd29yZA=="); + } + + @Test(expectedExceptions = PinotException.class) + @SuppressWarnings("CheckReturnValue") + public void testBuildAdditionalHeadersWithAuthorizationException() + { + Map extraHttpHeaders = ImmutableMap.of("k1", "v1", HttpHeaders.AUTHORIZATION, "some random auth"); + PinotAuthenticationProvider authProvider = new PinotPasswordAuthenticationProvider("user", "password"); + PinotClient.buildAdditionalHeaders(authProvider, extraHttpHeaders).build(); } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java index 773b80f05d60..27c3e4b18d36 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java @@ -17,6 +17,7 @@ import io.airlift.configuration.testing.ConfigAssertions; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.trino.testing.assertions.Assert; import org.testng.annotations.Test; import java.util.Map; @@ -47,7 +48,8 @@ public void testDefaults() .setAggregationPushdownEnabled(true) .setCountDistinctPushdownEnabled(true) .setGrpcEnabled(true) - .setTargetSegmentPageSize(DataSize.of(1, MEGABYTE))); + .setTargetSegmentPageSize(DataSize.of(1, MEGABYTE)) + .setExtraHttpHeaders("")); } @Test @@ -68,6 +70,7 @@ public void testExplicitPropertyMappings() .put("pinot.count-distinct-pushdown.enabled", "false") .put("pinot.grpc.enabled", "false") .put("pinot.target-segment-page-size", "2MB") + .put("pinot.extra-http-headers", "k1:v1,k2:v2,k3:some random v3") .buildOrThrow(); PinotConfig expected = new PinotConfig() @@ -84,11 +87,21 @@ public void testExplicitPropertyMappings() .setAggregationPushdownEnabled(false) .setCountDistinctPushdownEnabled(false) .setGrpcEnabled(false) - .setTargetSegmentPageSize(DataSize.of(2, MEGABYTE)); + .setTargetSegmentPageSize(DataSize.of(2, MEGABYTE)) + .setExtraHttpHeaders("k1:v1,k2:v2,k3:some random v3"); ConfigAssertions.assertFullMapping(properties, expected); } + @Test + public void testExtraHttpMetadata() + { + PinotConfig pinotConfig = new PinotConfig().setExtraHttpHeaders("k1:v1,k2:v2,k3:some random v3"); + Assert.assertEquals("v1", pinotConfig.getExtraHttpHeaders().get("k1")); + Assert.assertEquals("v2", pinotConfig.getExtraHttpHeaders().get("k2")); + Assert.assertEquals("some random v3", pinotConfig.getExtraHttpHeaders().get("k3")); + } + @Test public void testInvalidCountDistinctPushdown() {