Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ Property name Required Description
``pinot.aggregation-pushdown.enabled`` No Push down aggregation queries, default is ``true``.
``pinot.count-distinct-pushdown.enabled`` No Push down count distinct queries to Pinot, default is ``true``.
``pinot.target-segment-page-size`` No Max allowed page size for segment query, default is ``1MB``.
``pinot.extra-http-headers`` No Extra headers when sending HTTP based pinot requests to Pinot controller/broker.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have little concern to have such property that we can't test easily. For instance, when users try to set this property, face some issue and ask in Slack or somewhere, it' basically hard to help them in my opinion. Could you share more details about "customization, tracing, or tag information" and describe why dedicate properties don't work?

Copy link
Copy Markdown
Contributor Author

@xiangfu0 xiangfu0 Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One use case here is that we want to add extra headers to let Pinot track the sources and tags of which presto cluster it queries.

Another use case is to allow the Pinot side to use different auth mechanisms, e.g. API key.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

One use case here is that we want to add extra headers to let Pinot side track the sources and tags of which presto cluster it queries.

This doesn't explain why dedicate properties don't work. Do you mean Pinot doesn't have prepared configuration for sources and tags?

Another use case is to allow another auth mechanism in Pinot, e.g. using an API key.

We should add dedicated property when we want to support new auth mechanism.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the catch all solution for all the other cases.

During dev/staging process, it may not worth to patch Trino, config changes will be much simpler.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the purpose is for dev/staging process, the current property name is incorrect and we should hide from documentation in my opinion.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It's not for dev purpose. I'm just listing all the scenarios this thing can be useful.

This is the catch all solution to allow users to customize what to pass to Pinot side.

From users perspective, those Pinot side customization may not be put into open source code. And there will always be delay from implementing features in Pinot then enable it in Trino.

Headers are comma-separated, and each header key value pair is colon-separated,
e.g. ``k1:v1,k2:v2,k3:v3 with space`` with space, default is an empty map. Note
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions:

  • What happens when keys and values contain characters that are not valid characters in the HTTP message header syntax?
  • What happens if the user provides headers that conflict with standard HTTP headers (e.g., Content-Type, Accept, etc)
  • What's the backward compatibility story if the connector starts sending a header in a new version that the user had configured manually?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Technically user can send any character;
  2. Extra headers are set first, then standard HTTP headers. See method: doHttpActionWithHeadersJson();
  3. We call it user-side backward incompatible. Typically new features added may require a flag to turn on, so users should get a client-side warning or just an exception up to the feature implementor.

that if controller/broker authentications is enabled and ``Authorization`` header
is set, an exception will raise.
========================================================= ========== ==============================================================================

If ``pinot.controller.authentication.type`` is set to ``PASSWORD`` then both ``pinot.controller.authentication.user`` and
Expand Down
13 changes: 6 additions & 7 deletions plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@
<artifactId>validation-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8</version>
</dependency>

<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
Expand Down Expand Up @@ -461,13 +467,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import org.apache.commons.lang3.StringUtils;

import javax.annotation.PostConstruct;
import javax.validation.constraints.AssertTrue;
Expand All @@ -29,6 +31,7 @@

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -47,6 +50,7 @@
public class PinotConfig
{
private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings();
private static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings().withKeyValueSeparator(":");
Comment thread
xiangfu0 marked this conversation as resolved.
Outdated

private List<URI> controllerUrls = ImmutableList.of();

Expand All @@ -65,6 +69,7 @@ public class PinotConfig
private boolean countDistinctPushdownEnabled = true;
private boolean grpcEnabled = true;
private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE);
private Map<String, String> extraHttpHeaders = ImmutableMap.of();

@NotEmpty(message = "pinot.controller-urls cannot be empty")
public List<URI> getControllerUrls()
Expand Down Expand Up @@ -257,12 +262,29 @@ public PinotConfig setTargetSegmentPageSize(DataSize targetSegmentPageSize)
return this;
}

@NotNull
public Map<String, String> getExtraHttpHeaders()
{
return extraHttpHeaders;
}

@Config("pinot.extra-http-headers")
public PinotConfig setExtraHttpHeaders(String headers)
{
extraHttpHeaders = ImmutableMap.copyOf(MAP_SPLITTER.split(headers));
Comment thread
xiangfu0 marked this conversation as resolved.
Outdated
return this;
}

@PostConstruct
public void validate()
{
checkState(
!countDistinctPushdownEnabled || aggregationPushdownEnabled,
"Invalid configuration: pinot.aggregation-pushdown.enabled must be enabled if pinot.count-distinct-pushdown.enabled");
extraHttpHeaders.forEach((k, v) -> {
checkState(!StringUtils.isNotEmpty(v), "Found null value in extraHttpHeaders for key: " + k);
checkState(!StringUtils.isNotEmpty(k), "Found empty key in extraHttpHeaders");
});
}

@AssertTrue(message = "All controller URLs must have the same scheme")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotInsufficientServerResponseException;
import io.trino.plugin.pinot.PinotSessionProperties;
import io.trino.plugin.pinot.auth.PinotAuthenticationProvider;
import io.trino.plugin.pinot.auth.PinotBrokerAuthenticationProvider;
import io.trino.plugin.pinot.auth.PinotControllerAuthenticationProvider;
import io.trino.plugin.pinot.query.PinotQueryInfo;
Expand Down Expand Up @@ -92,6 +93,7 @@
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_AMBIGUOUS_TABLE_NAME;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_HTTP_ERROR;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNABLE_TO_FIND_BROKER;
import static io.trino.plugin.pinot.PinotMetadata.SCHEMA_NAME;
import static java.lang.String.format;
Expand Down Expand Up @@ -122,6 +124,7 @@ public class PinotClient
private final HttpClient httpClient;
private final PinotHostMapper pinotHostMapper;
private final String scheme;
private final Map<String, String> extraHttpHeaders;

private final NonEvictableLoadingCache<String, List<String>> brokersForTableCache;
private final NonEvictableLoadingCache<Object, Multimap<String, String>> allTablesCache;
Expand Down Expand Up @@ -159,6 +162,7 @@ public PinotClient(

this.controllerUrls = config.getControllerUrls();
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.extraHttpHeaders = config.getExtraHttpHeaders();
this.brokersForTableCache = buildNonEvictableCache(
CacheBuilder.newBuilder()
.expireAfterWrite(config.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS),
Expand Down Expand Up @@ -200,7 +204,7 @@ protected <T> T doHttpActionWithHeadersJson(
}
catch (UnexpectedResponseException e) {
throw new PinotException(
PinotErrorCode.PINOT_HTTP_ERROR,
PINOT_HTTP_ERROR,
Optional.empty(),
format(
"Unexpected response status: %d for request %s to url %s, with headers %s, full response %s",
Expand All @@ -215,8 +219,7 @@ protected <T> T doHttpActionWithHeadersJson(

private <T> T sendHttpGetToControllerJson(String path, JsonCodec<T> codec)
{
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = ImmutableMultimap.builder();
controllerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token));
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = buildAdditionalHeaders(controllerAuthenticationProvider, extraHttpHeaders);
URI controllerPathUri = uriBuilderFrom(getControllerUrl()).appendPath(path).scheme(scheme).build();
return doHttpActionWithHeadersJson(
Request.Builder.prepareGet().setUri(controllerPathUri),
Expand All @@ -227,8 +230,7 @@ private <T> T sendHttpGetToControllerJson(String path, JsonCodec<T> codec)

private <T> T sendHttpGetToBrokerJson(String table, String path, JsonCodec<T> codec)
{
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = ImmutableMultimap.builder();
brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token));
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = buildAdditionalHeaders(brokerAuthenticationProvider, extraHttpHeaders);
URI brokerPathUri = HttpUriBuilder.uriBuilder()
.hostAndPort(HostAndPort.fromString(getBrokerHost(table)))
.scheme(scheme)
Expand All @@ -241,6 +243,21 @@ private <T> T sendHttpGetToBrokerJson(String table, String path, JsonCodec<T> co
additionalHeadersBuilder.build());
}

public static ImmutableMultimap.Builder<String, String> buildAdditionalHeaders(PinotAuthenticationProvider authenticationProvider, Map<String, String> extraHttpHeaders)
{
Optional<String> authenticationToken = authenticationProvider.getAuthenticationToken();
if (authenticationToken.isPresent() && extraHttpHeaders.containsKey(AUTHORIZATION)) {
throw new PinotException(
PINOT_HTTP_ERROR,
Optional.empty(),
format("Duplicated Authorization header found in extra http header config when Pinot authentication is enabled."));
}
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = ImmutableMultimap.builder();
authenticationToken.ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token));
extraHttpHeaders.forEach((k, v) -> additionalHeadersBuilder.put(k, v));
return additionalHeadersBuilder;
}

private URI getControllerUrl()
{
return controllerUrls.get(ThreadLocalRandom.current().nextInt(controllerUrls.size()));
Expand Down Expand Up @@ -544,8 +561,7 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin
LOG.info("Query '%s' on broker host '%s'", query.getQuery(), queryPathUri);
Request.Builder builder = Request.Builder.preparePost().setUri(queryPathUri);

ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = ImmutableMultimap.builder();
brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token));
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = buildAdditionalHeaders(brokerAuthenticationProvider, extraHttpHeaders);
BrokerResponseNative response = doHttpActionWithHeadersJson(builder, Optional.of(queryRequest), brokerResponseCodec,
additionalHeadersBuilder.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,42 @@
*/
package io.trino.plugin.pinot;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HttpHeaders;
import com.google.common.net.MediaType;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.units.Duration;
import io.trino.plugin.pinot.auth.PinotAuthenticationProvider;
import io.trino.plugin.pinot.auth.PinotBrokerAuthenticationProvider;
import io.trino.plugin.pinot.auth.PinotControllerAuthenticationProvider;
import io.trino.plugin.pinot.auth.none.PinotEmptyAuthenticationProvider;
import io.trino.plugin.pinot.auth.password.PinotPasswordAuthenticationProvider;
import io.trino.plugin.pinot.client.IdentityPinotHostMapper;
import io.trino.plugin.pinot.client.PinotClient;
import io.trino.testing.assertions.Assert;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static io.airlift.concurrent.Threads.threadsNamed;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.assertj.core.api.Assertions.assertThat;

public class TestPinotClient
{
@Test
public void testBrokersParsed()
{
HttpClient httpClient = new TestingHttpClient(request -> TestingResponse.mockResponse(HttpStatus.OK, MediaType.JSON_UTF_8, "{\n" +
HttpClient httpClient = new TestingHttpClient(request -> {
assertThat(request.getHeaders().get("k1").get(0)).isEqualTo("v1");
assertThat(request.getHeaders().get("k2").get(0)).isEqualTo("v2");
assertThat(request.getHeaders().get("k3").get(0)).isEqualTo("some random v3");
return TestingResponse.mockResponse(HttpStatus.OK, MediaType.JSON_UTF_8, "{\n" +
" \"tableName\": \"dummy\",\n" +
" \"brokers\": [\n" +
" {\n" +
Expand Down Expand Up @@ -76,10 +86,12 @@ public void testBrokersParsed()
" ]\n" +
" }\n" +
" ]\n" +
"}"));
"}");
});
PinotConfig pinotConfig = new PinotConfig()
.setMetadataCacheExpiry(new Duration(1, TimeUnit.MILLISECONDS))
.setControllerUrls("localhost:7900");
.setControllerUrls("localhost:7900")
.setExtraHttpHeaders("k1:v1,k2:v2,k3:some random v3");
PinotClient pinotClient = new PinotClient(
pinotConfig,
new IdentityPinotHostMapper(),
Expand All @@ -92,6 +104,26 @@ public void testBrokersParsed()
PinotControllerAuthenticationProvider.create(PinotEmptyAuthenticationProvider.instance()),
PinotBrokerAuthenticationProvider.create(PinotEmptyAuthenticationProvider.instance()));
ImmutableSet<String> brokers = ImmutableSet.copyOf(pinotClient.getAllBrokersForTable("dummy"));
Assert.assertEquals(ImmutableSet.of("dummy-broker-host1-datacenter1:6513", "dummy-broker-host2-datacenter1:6513", "dummy-broker-host3-datacenter1:6513", "dummy-broker-host4-datacenter1:6513"), brokers);
assertThat(ImmutableSet.of("dummy-broker-host1-datacenter1:6513", "dummy-broker-host2-datacenter1:6513", "dummy-broker-host3-datacenter1:6513", "dummy-broker-host4-datacenter1:6513")).isEqualTo(brokers);
}

@Test
public void testBuildAdditionalHeaders()
{
Map<String, String> extraHttpHeaders = ImmutableMap.of("k1", "v1", "k2", "value with space");
PinotAuthenticationProvider authProvider = new PinotPasswordAuthenticationProvider("user", "password");
ImmutableMultimap<String, String> headers = PinotClient.buildAdditionalHeaders(authProvider, extraHttpHeaders).build();
assertThat(headers.get("k1").iterator().next()).isEqualTo("v1");
assertThat(headers.get("k2").iterator().next()).isEqualTo("value with space");
assertThat(headers.get(HttpHeaders.AUTHORIZATION).iterator().next()).isEqualTo("Basic dXNlcjpwYXNzd29yZA==");
}

@Test(expectedExceptions = PinotException.class)
@SuppressWarnings("CheckReturnValue")
public void testBuildAdditionalHeadersWithAuthorizationException()
{
Map<String, String> extraHttpHeaders = ImmutableMap.of("k1", "v1", HttpHeaders.AUTHORIZATION, "some random auth");
PinotAuthenticationProvider authProvider = new PinotPasswordAuthenticationProvider("user", "password");
PinotClient.buildAdditionalHeaders(authProvider, extraHttpHeaders).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.configuration.testing.ConfigAssertions;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.testing.assertions.Assert;
import org.testng.annotations.Test;

import java.util.Map;
Expand Down Expand Up @@ -47,7 +48,8 @@ public void testDefaults()
.setAggregationPushdownEnabled(true)
.setCountDistinctPushdownEnabled(true)
.setGrpcEnabled(true)
.setTargetSegmentPageSize(DataSize.of(1, MEGABYTE)));
.setTargetSegmentPageSize(DataSize.of(1, MEGABYTE))
.setExtraHttpHeaders(""));
}

@Test
Expand All @@ -68,6 +70,7 @@ public void testExplicitPropertyMappings()
.put("pinot.count-distinct-pushdown.enabled", "false")
.put("pinot.grpc.enabled", "false")
.put("pinot.target-segment-page-size", "2MB")
.put("pinot.extra-http-headers", "k1:v1,k2:v2,k3:some random v3")
.buildOrThrow();

PinotConfig expected = new PinotConfig()
Expand All @@ -84,11 +87,21 @@ public void testExplicitPropertyMappings()
.setAggregationPushdownEnabled(false)
.setCountDistinctPushdownEnabled(false)
.setGrpcEnabled(false)
.setTargetSegmentPageSize(DataSize.of(2, MEGABYTE));
.setTargetSegmentPageSize(DataSize.of(2, MEGABYTE))
.setExtraHttpHeaders("k1:v1,k2:v2,k3:some random v3");

ConfigAssertions.assertFullMapping(properties, expected);
}

@Test
public void testExtraHttpMetadata()
{
PinotConfig pinotConfig = new PinotConfig().setExtraHttpHeaders("k1:v1,k2:v2,k3:some random v3");
Assert.assertEquals("v1", pinotConfig.getExtraHttpHeaders().get("k1"));
Assert.assertEquals("v2", pinotConfig.getExtraHttpHeaders().get("k2"));
Assert.assertEquals("some random v3", pinotConfig.getExtraHttpHeaders().get("k3"));
}

@Test
public void testInvalidCountDistinctPushdown()
{
Expand Down