Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Property name Required Description
``pinot.aggregation-pushdown.enabled`` No Push down aggregation queries, default is ``true``.
``pinot.count-distinct-pushdown.enabled`` No Push down count distinct queries to Pinot, default is ``true``.
``pinot.target-segment-page-size`` No Max allowed page size for segment query, default is ``1MB``.
``pinot.proxy.enabled`` No Use Pinot Proxy for controller and broker requests, default is ``false``.
========================================================= ========== ==============================================================================

If ``pinot.controller.authentication.type`` is set to ``PASSWORD`` then both ``pinot.controller.authentication.user`` and
Expand Down Expand Up @@ -101,6 +102,7 @@ Property name Required Description
``pinot.grpc.tls.truststore-path`` No TLS truststore file location for gRPC connection, default is empty.
``pinot.grpc.tls.truststore-password`` No TLS truststore password, default is empty.
``pinot.grpc.tls.ssl-provider`` No SSL provider, default is ``JDK``.
``pinot.grpc.proxy-uri`` No Pinot Rest Proxy gRPC endpoint URI, default is null.
========================================================= ========== ==============================================================================

For more Apache Pinot TLS configurations, please also refer to `Configuring TLS/SSL <https://docs.pinot.apache.org/operators/tutorials/configuring-tls-ssl>`_.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class PinotConfig
private boolean aggregationPushdownEnabled = true;
private boolean countDistinctPushdownEnabled = true;
private boolean grpcEnabled = true;
private boolean proxyEnabled;
private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE);

@NotEmpty(message = "pinot.controller-urls cannot be empty")
Expand Down Expand Up @@ -245,6 +246,18 @@ public boolean isTlsEnabled()
return "https".equalsIgnoreCase(getControllerUrls().get(0).getScheme());
}

public boolean getProxyEnabled()
{
return proxyEnabled;
}

@Config("pinot.proxy.enabled")
public PinotConfig setProxyEnabled(boolean proxyEnabled)
{
this.proxyEnabled = proxyEnabled;
return this;
}

public DataSize getTargetSegmentPageSize()
{
return this.targetSegmentPageSize;
Expand Down Expand Up @@ -273,4 +286,13 @@ public boolean allUrlSchemesEqual()
.distinct()
.count() == 1;
}

@AssertTrue(message = "Using the rest proxy requires GRPC to be enabled by setting pinot.grpc.enabled=true")
public boolean proxyRestAndGrpcAreRequired()
{
if (proxyEnabled) {
return grpcEnabled;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public class PinotClient
private final HttpClient httpClient;
private final PinotHostMapper pinotHostMapper;
private final String scheme;
private final boolean proxyEnabled;

private final NonEvictableLoadingCache<String, List<String>> brokersForTableCache;
private final NonEvictableLoadingCache<Object, Multimap<String, String>> allTablesCache;
Expand Down Expand Up @@ -154,7 +155,9 @@ public PinotClient(
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)).jsonCodec(Schema.class);
this.brokerResponseCodec = requireNonNull(brokerResponseCodec, "brokerResponseCodec is null");
this.pinotHostMapper = requireNonNull(pinotHostMapper, "pinotHostMapper is null");
requireNonNull(config, "config is null");
this.scheme = config.isTlsEnabled() ? "https" : "http";
this.proxyEnabled = config.getProxyEnabled();

this.controllerUrls = config.getControllerUrls();
this.httpClient = requireNonNull(httpClient, "httpClient is null");
Expand Down Expand Up @@ -228,18 +231,22 @@ private <T> T sendHttpGetToBrokerJson(String table, String path, JsonCodec<T> co
{
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = ImmutableMultimap.builder();
brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token));
URI brokerPathUri = HttpUriBuilder.uriBuilder()
.hostAndPort(HostAndPort.fromString(getBrokerHost(table)))
.scheme(scheme)
.appendPath(path)
.build();
HttpUriBuilder httpUriBuilder = getBrokerHttpUriBuilder(getBrokerHost(table));
URI brokerPathUri = httpUriBuilder.scheme(scheme).appendPath(path).build();
return doHttpActionWithHeadersJson(
Request.Builder.prepareGet().setUri(brokerPathUri),
Optional.empty(),
codec,
additionalHeadersBuilder.build());
}

private HttpUriBuilder getBrokerHttpUriBuilder(String hostAndPort)
{
return proxyEnabled ?
HttpUriBuilder.uriBuilderFrom(getControllerUrl()) :
HttpUriBuilder.uriBuilder().hostAndPort(HostAndPort.fromString(hostAndPort));
}

private URI getControllerUrl()
{
return controllerUrls.get(ThreadLocalRandom.current().nextInt(controllerUrls.size()));
Expand Down Expand Up @@ -531,8 +538,8 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin
{
String queryRequest = QUERY_REQUEST_JSON_CODEC.toJson(new QueryRequest(query.getQuery()));
return doWithRetries(PinotSessionProperties.getPinotRetryCount(session), retryNumber -> {
URI queryPathUri = HttpUriBuilder.uriBuilder()
.hostAndPort(HostAndPort.fromString(getBrokerHost(query.getTable())))
HttpUriBuilder httpUriBuilder = getBrokerHttpUriBuilder(getBrokerHost(query.getTable()));
URI queryPathUri = httpUriBuilder
.scheme(scheme)
.appendPath(QUERY_URL_PATH)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@
import io.trino.plugin.pinot.PinotErrorCode;
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.plugin.pinot.query.PinotProxyGrpcRequestBuilder;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.ResponseType;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;

import javax.annotation.PreDestroy;
import javax.inject.Inject;
Expand All @@ -43,6 +41,7 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import static java.lang.Boolean.FALSE;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.config.GrpcConfig.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE;
import static org.apache.pinot.common.config.GrpcConfig.CONFIG_USE_PLAIN_TEXT;
Expand Down Expand Up @@ -189,9 +188,14 @@ public static class TlsGrpcQueryClientFactory
private final GrpcConfig config;

@Inject
public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientTlsConfig tlsConfig)
public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConfig, PinotGrpcServerQueryClientTlsConfig tlsConfig)
{
ImmutableMap.Builder<String, Object> tlsConfigBuilder = ImmutableMap.builder();
requireNonNull(grpcClientConfig, "grpcClientConfig is null");
requireNonNull(tlsConfig, "tlsConfig is null");
ImmutableMap.Builder<String, Object> tlsConfigBuilder = ImmutableMap.<String, Object>builder()
.put(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, String.valueOf(grpcClientConfig.getMaxInboundMessageSize().toBytes()))
.put(CONFIG_USE_PLAIN_TEXT, FALSE.toString());

if (tlsConfig.getKeystorePath().isPresent()) {
tlsConfigBuilder.put(KEYSTORE_TYPE, tlsConfig.getKeystoreType());
tlsConfigBuilder.put(KEYSTORE_PATH, tlsConfig.getKeystorePath().get());
Expand Down Expand Up @@ -220,6 +224,7 @@ public static class PinotGrpcServerQueryClient
private final Map<HostAndPort, GrpcQueryClient> clientCache = new ConcurrentHashMap<>();
private final int grpcPort;
private final GrpcQueryClientFactory grpcQueryClientFactory;
private final Optional<String> proxyUri;
private final Closer closer;

private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcServerQueryClientConfig pinotGrpcServerQueryClientConfig, GrpcQueryClientFactory grpcQueryClientFactory, Closer closer)
Expand All @@ -229,24 +234,28 @@ private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcSer
this.grpcPort = pinotGrpcServerQueryClientConfig.getGrpcPort();
this.grpcQueryClientFactory = requireNonNull(grpcQueryClientFactory, "grpcQueryClientFactory is null");
this.closer = requireNonNull(closer, "closer is null");
this.proxyUri = pinotGrpcServerQueryClientConfig.getProxyUri();
}

public Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, String query, String serverHost, List<String> segments)
{
HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort);
// GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default).
GrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, hostAndPort -> {
GrpcQueryClient queryClient = grpcQueryClientFactory.create(hostAndPort);
GrpcQueryClient queryClient = proxyUri.isPresent() ? grpcQueryClientFactory.create(HostAndPort.fromString(proxyUri.get())) : grpcQueryClientFactory.create(hostAndPort);
closer.register(queryClient::close);
return queryClient;
});
BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder()
PinotProxyGrpcRequestBuilder grpcRequestBuilder = new PinotProxyGrpcRequestBuilder()
.setSql(query)
.setSegments(segments)
.setEnableStreaming(true)
.setBrokerRequest(brokerRequest);
return new ResponseIterator(client.submit(requestBuilder.build()));
.setEnableStreaming(true);

if (proxyUri.isPresent()) {
grpcRequestBuilder.setHostName(mappedHostAndPort.getHost()).setPort(grpcPort);
}
Server.ServerRequest serverRequest = grpcRequestBuilder.build();
return new ResponseIterator(client.submit(serverRequest));
}

public static class ResponseIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.airlift.configuration.Config;
import io.airlift.units.DataSize;

import java.util.Optional;

import static org.apache.pinot.common.config.GrpcConfig.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE;

public class PinotGrpcServerQueryClientConfig
Expand All @@ -24,6 +26,7 @@ public class PinotGrpcServerQueryClientConfig
private int grpcPort = 8090;
private DataSize maxInboundMessageSize = DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE);
private boolean usePlainText = true;
private Optional<String> proxyUri = Optional.empty();

public int getMaxRowsPerSplitForSegmentQueries()
{
Expand Down Expand Up @@ -72,4 +75,18 @@ public PinotGrpcServerQueryClientConfig setUsePlainText(boolean usePlainText)
this.usePlainText = usePlainText;
return this;
}

public Optional<String> getProxyUri()
{
return proxyUri;
}

@Config("pinot.grpc.proxy-uri")
public PinotGrpcServerQueryClientConfig setProxyUri(String proxyUri)
{
if (proxyUri != null && !proxyUri.isEmpty()) {
this.proxyUri = Optional.of(proxyUri);
}
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.query;

import com.google.common.collect.ImmutableList;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.spi.utils.CommonConstants;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PinotProxyGrpcRequestBuilder
{
private static final String KEY_OF_PROXY_GRPC_FORWARD_HOST = "FORWARD_HOST";
private static final String KEY_OF_PROXY_GRPC_FORWARD_PORT = "FORWARD_PORT";

private String hostName;
private int port = -1;
private int requestId;
private String brokerId = "unknown";
private boolean enableTrace;
private boolean enableStreaming;
private String payloadType;
private String sql;
private List<String> segments;

public PinotProxyGrpcRequestBuilder setHostName(String hostName)
{
this.hostName = hostName;
return this;
}

public PinotProxyGrpcRequestBuilder setPort(int port)
{
this.port = port;
return this;
}

public PinotProxyGrpcRequestBuilder setRequestId(int requestId)
{
this.requestId = requestId;
return this;
}

public PinotProxyGrpcRequestBuilder setBrokerId(String brokerId)
{
this.brokerId = brokerId;
return this;
}

public PinotProxyGrpcRequestBuilder setEnableTrace(boolean enableTrace)
{
this.enableTrace = enableTrace;
return this;
}

public PinotProxyGrpcRequestBuilder setEnableStreaming(boolean enableStreaming)
{
this.enableStreaming = enableStreaming;
return this;
}

public PinotProxyGrpcRequestBuilder setSql(String sql)
{
payloadType = CommonConstants.Query.Request.PayloadType.SQL;
this.sql = sql;
return this;
}

public PinotProxyGrpcRequestBuilder setSegments(List<String> segments)
{
this.segments = ImmutableList.copyOf(segments);
return this;
}

public Server.ServerRequest build()
{
if (!payloadType.equals(CommonConstants.Query.Request.PayloadType.SQL)) {
throw new RuntimeException("Only [SQL] Payload type is allowed: " + payloadType);
}
Map<String, String> metadata = new HashMap<>();
metadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Integer.toString(requestId));
metadata.put(CommonConstants.Query.Request.MetadataKeys.BROKER_ID, brokerId);
metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE, Boolean.toString(enableTrace));
metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING, Boolean.toString(enableStreaming));
metadata.put(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE, payloadType);
if (this.hostName != null) {
metadata.put(KEY_OF_PROXY_GRPC_FORWARD_HOST, this.hostName);
}
if (this.port > 0) {
metadata.put(KEY_OF_PROXY_GRPC_FORWARD_PORT, String.valueOf(this.port));
}
return Server.ServerRequest.newBuilder()
.putAllMetadata(metadata)
.setSql(sql)
.addAllSegments(segments)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void testDefaults()
.setAggregationPushdownEnabled(true)
.setCountDistinctPushdownEnabled(true)
.setGrpcEnabled(true)
.setProxyEnabled(false)
.setTargetSegmentPageSize(DataSize.of(1, MEGABYTE)));
}

Expand All @@ -67,6 +68,7 @@ public void testExplicitPropertyMappings()
.put("pinot.aggregation-pushdown.enabled", "false")
.put("pinot.count-distinct-pushdown.enabled", "false")
.put("pinot.grpc.enabled", "false")
.put("pinot.proxy.enabled", "true")
.put("pinot.target-segment-page-size", "2MB")
.buildOrThrow();

Expand All @@ -84,6 +86,7 @@ public void testExplicitPropertyMappings()
.setAggregationPushdownEnabled(false)
.setCountDistinctPushdownEnabled(false)
.setGrpcEnabled(false)
.setProxyEnabled(true)
.setTargetSegmentPageSize(DataSize.of(2, MEGABYTE));

ConfigAssertions.assertFullMapping(properties, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public void testDefaults()
.setMaxRowsPerSplitForSegmentQueries(Integer.MAX_VALUE - 1)
.setGrpcPort(8090)
.setUsePlainText(true)
.setMaxInboundMessageSize(DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE)));
.setMaxInboundMessageSize(DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE))
.setProxyUri(null));
}

@Test
Expand All @@ -44,12 +45,14 @@ public void testExplicitPropertyMappings()
.put("pinot.grpc.port", "8091")
.put("pinot.grpc.use-plain-text", "false")
.put("pinot.grpc.max-inbound-message-size", String.valueOf(DataSize.ofBytes(1)))
.put("pinot.grpc.proxy-uri", "my-pinot-proxy:8094")
.buildOrThrow();
PinotGrpcServerQueryClientConfig expected = new PinotGrpcServerQueryClientConfig()
.setMaxRowsPerSplitForSegmentQueries(10)
.setGrpcPort(8091)
.setUsePlainText(false)
.setMaxInboundMessageSize(DataSize.ofBytes(1));
.setMaxInboundMessageSize(DataSize.ofBytes(1))
.setProxyUri("my-pinot-proxy:8094");
ConfigAssertions.assertFullMapping(properties, expected);
}
}