From a5769f968ed89605fd0a277408ea62698126d456 Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Fri, 1 Jul 2022 15:02:07 -0700 Subject: [PATCH] Add support for pinot proxy --- docs/src/main/sphinx/connector/pinot.rst | 2 + .../io/trino/plugin/pinot/PinotConfig.java | 22 ++++ .../plugin/pinot/client/PinotClient.java | 21 ++-- .../pinot/client/PinotGrpcDataFetcher.java | 31 +++-- .../PinotGrpcServerQueryClientConfig.java | 17 +++ .../query/PinotProxyGrpcRequestBuilder.java | 111 ++++++++++++++++++ .../trino/plugin/pinot/TestPinotConfig.java | 3 + .../TestPinotGrpcServerQueryClientConfig.java | 7 +- 8 files changed, 194 insertions(+), 20 deletions(-) create mode 100644 plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotProxyGrpcRequestBuilder.java diff --git a/docs/src/main/sphinx/connector/pinot.rst b/docs/src/main/sphinx/connector/pinot.rst index 34f6dce76f9c..9aecd3069169 100644 --- a/docs/src/main/sphinx/connector/pinot.rst +++ b/docs/src/main/sphinx/connector/pinot.rst @@ -74,6 +74,7 @@ Property name Required Description ``pinot.aggregation-pushdown.enabled`` No Push down aggregation queries, default is ``true``. ``pinot.count-distinct-pushdown.enabled`` No Push down count distinct queries to Pinot, default is ``true``. ``pinot.target-segment-page-size`` No Max allowed page size for segment query, default is ``1MB``. +``pinot.proxy.enabled`` No Use Pinot Proxy for controller and broker requests, default is ``false``. ========================================================= ========== ============================================================================== If ``pinot.controller.authentication.type`` is set to ``PASSWORD`` then both ``pinot.controller.authentication.user`` and @@ -101,6 +102,7 @@ Property name Required Description ``pinot.grpc.tls.truststore-path`` No TLS truststore file location for gRPC connection, default is empty. ``pinot.grpc.tls.truststore-password`` No TLS truststore password, default is empty. ``pinot.grpc.tls.ssl-provider`` No SSL provider, default is ``JDK``. +``pinot.grpc.proxy-uri`` No Pinot Rest Proxy gRPC endpoint URI, default is null. ========================================================= ========== ============================================================================== For more Apache Pinot TLS configurations, please also refer to `Configuring TLS/SSL `_. diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java index 84b9096e07ed..e5237f307ed5 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java @@ -64,6 +64,7 @@ public class PinotConfig private boolean aggregationPushdownEnabled = true; private boolean countDistinctPushdownEnabled = true; private boolean grpcEnabled = true; + private boolean proxyEnabled; private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE); @NotEmpty(message = "pinot.controller-urls cannot be empty") @@ -245,6 +246,18 @@ public boolean isTlsEnabled() return "https".equalsIgnoreCase(getControllerUrls().get(0).getScheme()); } + public boolean getProxyEnabled() + { + return proxyEnabled; + } + + @Config("pinot.proxy.enabled") + public PinotConfig setProxyEnabled(boolean proxyEnabled) + { + this.proxyEnabled = proxyEnabled; + return this; + } + public DataSize getTargetSegmentPageSize() { return this.targetSegmentPageSize; @@ -273,4 +286,13 @@ public boolean allUrlSchemesEqual() .distinct() .count() == 1; } + + @AssertTrue(message = "Using the rest proxy requires GRPC to be enabled by setting pinot.grpc.enabled=true") + public boolean proxyRestAndGrpcAreRequired() + { + if (proxyEnabled) { + return grpcEnabled; + } + return true; + } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java index 0be5856844c1..0818e030e154 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java @@ -122,6 +122,7 @@ public class PinotClient private final HttpClient httpClient; private final PinotHostMapper pinotHostMapper; private final String scheme; + private final boolean proxyEnabled; private final NonEvictableLoadingCache> brokersForTableCache; private final NonEvictableLoadingCache> allTablesCache; @@ -154,7 +155,9 @@ public PinotClient( .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)).jsonCodec(Schema.class); this.brokerResponseCodec = requireNonNull(brokerResponseCodec, "brokerResponseCodec is null"); this.pinotHostMapper = requireNonNull(pinotHostMapper, "pinotHostMapper is null"); + requireNonNull(config, "config is null"); this.scheme = config.isTlsEnabled() ? "https" : "http"; + this.proxyEnabled = config.getProxyEnabled(); this.controllerUrls = config.getControllerUrls(); this.httpClient = requireNonNull(httpClient, "httpClient is null"); @@ -228,11 +231,8 @@ private T sendHttpGetToBrokerJson(String table, String path, JsonCodec co { ImmutableMultimap.Builder additionalHeadersBuilder = ImmutableMultimap.builder(); brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token)); - URI brokerPathUri = HttpUriBuilder.uriBuilder() - .hostAndPort(HostAndPort.fromString(getBrokerHost(table))) - .scheme(scheme) - .appendPath(path) - .build(); + HttpUriBuilder httpUriBuilder = getBrokerHttpUriBuilder(getBrokerHost(table)); + URI brokerPathUri = httpUriBuilder.scheme(scheme).appendPath(path).build(); return doHttpActionWithHeadersJson( Request.Builder.prepareGet().setUri(brokerPathUri), Optional.empty(), @@ -240,6 +240,13 @@ private T sendHttpGetToBrokerJson(String table, String path, JsonCodec co additionalHeadersBuilder.build()); } + private HttpUriBuilder getBrokerHttpUriBuilder(String hostAndPort) + { + return proxyEnabled ? + HttpUriBuilder.uriBuilderFrom(getControllerUrl()) : + HttpUriBuilder.uriBuilder().hostAndPort(HostAndPort.fromString(hostAndPort)); + } + private URI getControllerUrl() { return controllerUrls.get(ThreadLocalRandom.current().nextInt(controllerUrls.size())); @@ -531,8 +538,8 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin { String queryRequest = QUERY_REQUEST_JSON_CODEC.toJson(new QueryRequest(query.getQuery())); return doWithRetries(PinotSessionProperties.getPinotRetryCount(session), retryNumber -> { - URI queryPathUri = HttpUriBuilder.uriBuilder() - .hostAndPort(HostAndPort.fromString(getBrokerHost(query.getTable()))) + HttpUriBuilder httpUriBuilder = getBrokerHttpUriBuilder(getBrokerHost(query.getTable())); + URI queryPathUri = httpUriBuilder .scheme(scheme) .appendPath(QUERY_URL_PATH) .build(); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java index 31d2482101bd..a8c7ed692b72 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java @@ -20,16 +20,14 @@ import io.trino.plugin.pinot.PinotErrorCode; import io.trino.plugin.pinot.PinotException; import io.trino.plugin.pinot.PinotSplit; +import io.trino.plugin.pinot.query.PinotProxyGrpcRequestBuilder; import io.trino.spi.connector.ConnectorSession; import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.proto.Server; -import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.utils.grpc.GrpcQueryClient; -import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; import org.apache.pinot.core.common.datatable.DataTableFactory; import org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys; import org.apache.pinot.spi.utils.CommonConstants.Query.Response.ResponseType; -import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -43,6 +41,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import static java.lang.Boolean.FALSE; import static java.util.Objects.requireNonNull; import static org.apache.pinot.common.config.GrpcConfig.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE; import static org.apache.pinot.common.config.GrpcConfig.CONFIG_USE_PLAIN_TEXT; @@ -189,9 +188,14 @@ public static class TlsGrpcQueryClientFactory private final GrpcConfig config; @Inject - public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientTlsConfig tlsConfig) + public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConfig, PinotGrpcServerQueryClientTlsConfig tlsConfig) { - ImmutableMap.Builder tlsConfigBuilder = ImmutableMap.builder(); + requireNonNull(grpcClientConfig, "grpcClientConfig is null"); + requireNonNull(tlsConfig, "tlsConfig is null"); + ImmutableMap.Builder tlsConfigBuilder = ImmutableMap.builder() + .put(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, String.valueOf(grpcClientConfig.getMaxInboundMessageSize().toBytes())) + .put(CONFIG_USE_PLAIN_TEXT, FALSE.toString()); + if (tlsConfig.getKeystorePath().isPresent()) { tlsConfigBuilder.put(KEYSTORE_TYPE, tlsConfig.getKeystoreType()); tlsConfigBuilder.put(KEYSTORE_PATH, tlsConfig.getKeystorePath().get()); @@ -220,6 +224,7 @@ public static class PinotGrpcServerQueryClient private final Map clientCache = new ConcurrentHashMap<>(); private final int grpcPort; private final GrpcQueryClientFactory grpcQueryClientFactory; + private final Optional proxyUri; private final Closer closer; private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcServerQueryClientConfig pinotGrpcServerQueryClientConfig, GrpcQueryClientFactory grpcQueryClientFactory, Closer closer) @@ -229,6 +234,7 @@ private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcSer this.grpcPort = pinotGrpcServerQueryClientConfig.getGrpcPort(); this.grpcQueryClientFactory = requireNonNull(grpcQueryClientFactory, "grpcQueryClientFactory is null"); this.closer = requireNonNull(closer, "closer is null"); + this.proxyUri = pinotGrpcServerQueryClientConfig.getProxyUri(); } public Iterator queryPinot(ConnectorSession session, String query, String serverHost, List segments) @@ -236,17 +242,20 @@ public Iterator queryPinot(ConnectorSession session, Str HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort); // GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default). GrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, hostAndPort -> { - GrpcQueryClient queryClient = grpcQueryClientFactory.create(hostAndPort); + GrpcQueryClient queryClient = proxyUri.isPresent() ? grpcQueryClientFactory.create(HostAndPort.fromString(proxyUri.get())) : grpcQueryClientFactory.create(hostAndPort); closer.register(queryClient::close); return queryClient; }); - BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); - GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder() + PinotProxyGrpcRequestBuilder grpcRequestBuilder = new PinotProxyGrpcRequestBuilder() .setSql(query) .setSegments(segments) - .setEnableStreaming(true) - .setBrokerRequest(brokerRequest); - return new ResponseIterator(client.submit(requestBuilder.build())); + .setEnableStreaming(true); + + if (proxyUri.isPresent()) { + grpcRequestBuilder.setHostName(mappedHostAndPort.getHost()).setPort(grpcPort); + } + Server.ServerRequest serverRequest = grpcRequestBuilder.build(); + return new ResponseIterator(client.submit(serverRequest)); } public static class ResponseIterator diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java index b3e1294f1fb5..4c1a98c92b3b 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java @@ -16,6 +16,8 @@ import io.airlift.configuration.Config; import io.airlift.units.DataSize; +import java.util.Optional; + import static org.apache.pinot.common.config.GrpcConfig.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE; public class PinotGrpcServerQueryClientConfig @@ -24,6 +26,7 @@ public class PinotGrpcServerQueryClientConfig private int grpcPort = 8090; private DataSize maxInboundMessageSize = DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE); private boolean usePlainText = true; + private Optional proxyUri = Optional.empty(); public int getMaxRowsPerSplitForSegmentQueries() { @@ -72,4 +75,18 @@ public PinotGrpcServerQueryClientConfig setUsePlainText(boolean usePlainText) this.usePlainText = usePlainText; return this; } + + public Optional getProxyUri() + { + return proxyUri; + } + + @Config("pinot.grpc.proxy-uri") + public PinotGrpcServerQueryClientConfig setProxyUri(String proxyUri) + { + if (proxyUri != null && !proxyUri.isEmpty()) { + this.proxyUri = Optional.of(proxyUri); + } + return this; + } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotProxyGrpcRequestBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotProxyGrpcRequestBuilder.java new file mode 100644 index 000000000000..23f4dcdef641 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotProxyGrpcRequestBuilder.java @@ -0,0 +1,111 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.query; + +import com.google.common.collect.ImmutableList; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.spi.utils.CommonConstants; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class PinotProxyGrpcRequestBuilder +{ + private static final String KEY_OF_PROXY_GRPC_FORWARD_HOST = "FORWARD_HOST"; + private static final String KEY_OF_PROXY_GRPC_FORWARD_PORT = "FORWARD_PORT"; + + private String hostName; + private int port = -1; + private int requestId; + private String brokerId = "unknown"; + private boolean enableTrace; + private boolean enableStreaming; + private String payloadType; + private String sql; + private List segments; + + public PinotProxyGrpcRequestBuilder setHostName(String hostName) + { + this.hostName = hostName; + return this; + } + + public PinotProxyGrpcRequestBuilder setPort(int port) + { + this.port = port; + return this; + } + + public PinotProxyGrpcRequestBuilder setRequestId(int requestId) + { + this.requestId = requestId; + return this; + } + + public PinotProxyGrpcRequestBuilder setBrokerId(String brokerId) + { + this.brokerId = brokerId; + return this; + } + + public PinotProxyGrpcRequestBuilder setEnableTrace(boolean enableTrace) + { + this.enableTrace = enableTrace; + return this; + } + + public PinotProxyGrpcRequestBuilder setEnableStreaming(boolean enableStreaming) + { + this.enableStreaming = enableStreaming; + return this; + } + + public PinotProxyGrpcRequestBuilder setSql(String sql) + { + payloadType = CommonConstants.Query.Request.PayloadType.SQL; + this.sql = sql; + return this; + } + + public PinotProxyGrpcRequestBuilder setSegments(List segments) + { + this.segments = ImmutableList.copyOf(segments); + return this; + } + + public Server.ServerRequest build() + { + if (!payloadType.equals(CommonConstants.Query.Request.PayloadType.SQL)) { + throw new RuntimeException("Only [SQL] Payload type is allowed: " + payloadType); + } + Map metadata = new HashMap<>(); + metadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Integer.toString(requestId)); + metadata.put(CommonConstants.Query.Request.MetadataKeys.BROKER_ID, brokerId); + metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE, Boolean.toString(enableTrace)); + metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING, Boolean.toString(enableStreaming)); + metadata.put(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE, payloadType); + if (this.hostName != null) { + metadata.put(KEY_OF_PROXY_GRPC_FORWARD_HOST, this.hostName); + } + if (this.port > 0) { + metadata.put(KEY_OF_PROXY_GRPC_FORWARD_PORT, String.valueOf(this.port)); + } + return Server.ServerRequest.newBuilder() + .putAllMetadata(metadata) + .setSql(sql) + .addAllSegments(segments) + .build(); + } +} diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java index 773b80f05d60..6101ecf4f111 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java @@ -47,6 +47,7 @@ public void testDefaults() .setAggregationPushdownEnabled(true) .setCountDistinctPushdownEnabled(true) .setGrpcEnabled(true) + .setProxyEnabled(false) .setTargetSegmentPageSize(DataSize.of(1, MEGABYTE))); } @@ -67,6 +68,7 @@ public void testExplicitPropertyMappings() .put("pinot.aggregation-pushdown.enabled", "false") .put("pinot.count-distinct-pushdown.enabled", "false") .put("pinot.grpc.enabled", "false") + .put("pinot.proxy.enabled", "true") .put("pinot.target-segment-page-size", "2MB") .buildOrThrow(); @@ -84,6 +86,7 @@ public void testExplicitPropertyMappings() .setAggregationPushdownEnabled(false) .setCountDistinctPushdownEnabled(false) .setGrpcEnabled(false) + .setProxyEnabled(true) .setTargetSegmentPageSize(DataSize.of(2, MEGABYTE)); ConfigAssertions.assertFullMapping(properties, expected); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java index 9b09ba2e29b6..5d109eec9879 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java @@ -33,7 +33,8 @@ public void testDefaults() .setMaxRowsPerSplitForSegmentQueries(Integer.MAX_VALUE - 1) .setGrpcPort(8090) .setUsePlainText(true) - .setMaxInboundMessageSize(DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE))); + .setMaxInboundMessageSize(DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE)) + .setProxyUri(null)); } @Test @@ -44,12 +45,14 @@ public void testExplicitPropertyMappings() .put("pinot.grpc.port", "8091") .put("pinot.grpc.use-plain-text", "false") .put("pinot.grpc.max-inbound-message-size", String.valueOf(DataSize.ofBytes(1))) + .put("pinot.grpc.proxy-uri", "my-pinot-proxy:8094") .buildOrThrow(); PinotGrpcServerQueryClientConfig expected = new PinotGrpcServerQueryClientConfig() .setMaxRowsPerSplitForSegmentQueries(10) .setGrpcPort(8091) .setUsePlainText(false) - .setMaxInboundMessageSize(DataSize.ofBytes(1)); + .setMaxInboundMessageSize(DataSize.ofBytes(1)) + .setProxyUri("my-pinot-proxy:8094"); ConfigAssertions.assertFullMapping(properties, expected); } }